update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / dts / subscriber / core.py
index dd2513e..53c2e8c 100644 (file)
@@ -1,6 +1,6 @@
 """
-# 
-#   Copyright 2016 RIFT.IO Inc
+#
+#   Copyright 2016-2017 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -27,6 +27,9 @@ import asyncio
 
 from gi.repository import (RwDts as rwdts, ProtobufC)
 import rift.tasklets
+from rift.mano.utils.project import (
+    get_add_delete_update_cfgs,
+    )
 
 from ..core import DtsHandler
 
@@ -35,11 +38,11 @@ class SubscriberDtsHandler(DtsHandler):
     """A common class for all subscribers.
     """
     @classmethod
-    def from_tasklet(cls, tasklet, callback=None):
+    def from_project(cls, proj, callback=None):
         """Convenience method to build the object from tasklet
 
         Args:
-            tasklet (rift.tasklets.Tasklet): Tasklet
+            proj (rift.mano.utils.project.ManoProject): Project
             callback (None, optional): Callable, which will be invoked on
                     subscriber changes.
 
@@ -48,20 +51,41 @@ class SubscriberDtsHandler(DtsHandler):
                 msg: The Gi Object msg from DTS
                 action(rwdts.QueryAction): Action type
         """
-        return cls(tasklet.log, tasklet.dts, tasklet.loop, callback=callback)
+        return cls(proj.log, proj.dts, proj.loop, proj, callback=callback)
 
-    def __init__(self, log, dts, loop, callback=None):
-        super().__init__(log, dts, loop)
+    def __init__(self, log, dts, loop, project, callback=None):
+        super().__init__(log, dts, loop, project)
         self.callback = callback
 
+    @abc.abstractmethod
+    def get_xpath(self):
+        """
+        Returns:
+           str: xpath
+        """
+        pass
+
     def get_reg_flags(self):
         """Default set of REG flags, can be over-ridden by sub classes.
-        
+
         Returns:
             Set of rwdts.Flag types.
         """
         return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY|rwdts.Flag.CACHE
 
+    @asyncio.coroutine
+    def data(self):
+        itr = yield from self.dts.query_read(
+                self.get_xpath())
+
+        values = []
+        for res in itr:
+            result = yield from res
+            result = result.result
+            values.append(result)
+
+        return values
+
 
 
 class AbstractOpdataSubscriber(SubscriberDtsHandler):
@@ -70,29 +94,32 @@ class AbstractOpdataSubscriber(SubscriberDtsHandler):
 
     Opdata subscriber can be created in one step by subclassing and implementing
     the MANDATORY get_xpath() method
-    
+
     """
-    @abc.abstractmethod
-    def get_xpath(self):
-        """
-        Returns:
-           str: xpath
-        """
-        pass
 
     @asyncio.coroutine
     def register(self):
         """Triggers the registration
         """
+
+        if self._reg:
+            self._log.warning("RPC already registered for project {}".
+                              format(self._project.name))
+            return
+
         xacts = {}
 
         def on_commit(xact_info):
-            xact_id = xact_info.handle.get_xact().id
-            if xact_id in xacts:
-                msg, action = xacts.pop(xact_id)
+            try:
+                xact_id = xact_info.handle.get_xact().id
+                if xact_id in xacts:
+                    msg, action = xacts.pop(xact_id)
 
-                if self.callback:
-                    self.callback(msg, action)
+                    if self.callback:
+                        self.callback(msg, action)
+            except Exception as e:
+                self.log.error("Exception when committing data for registration:{} exception:{}".format(self.get_xpath(), e))
+                self.log.exception(e)
 
             return rwdts.MemberRspCode.ACTION_OK
 
@@ -105,8 +132,11 @@ class AbstractOpdataSubscriber(SubscriberDtsHandler):
             except Exception as e:
                 self.log.exception(e)
 
-            finally:
+            try:
                 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+            except rift.tasklets.dts.ResponseError as e:
+                self._log.warning("Reg handle is None during action {} for {}: {}".
+                                  format(action, self.__class__, e))
 
         reg_event = asyncio.Event(loop=self.loop)
 
@@ -120,17 +150,14 @@ class AbstractOpdataSubscriber(SubscriberDtsHandler):
                 on_commit=on_commit
                 )
 
-        self.reg = yield from self.dts.register(
-                xpath=self.get_xpath(),
+        self._reg = yield from self.dts.register(
+                xpath=self.project.add_project(self.get_xpath()),
                 flags=self.get_reg_flags(),
                 handler=handler)
 
         # yield from reg_event.wait()
 
-        assert self.reg is not None
-
-    def deregister(self):
-        self.reg.deregister()
+        assert self._reg is not None
 
 
 class AbstractConfigSubscriber(SubscriberDtsHandler):
@@ -139,7 +166,7 @@ class AbstractConfigSubscriber(SubscriberDtsHandler):
 
     Config subscriber can be created in one step by subclassing and implementing
     the MANDATORY get_xpath() method
-    
+
     """
     KEY = "msgs"
 
@@ -151,42 +178,31 @@ class AbstractConfigSubscriber(SubscriberDtsHandler):
     def key_name(self):
         pass
 
-    def get_add_delete_update_cfgs(self, dts_member_reg, xact, key_name):
-        # Unforunately, it is currently difficult to figure out what has exactly
-        # changed in this xact without Pbdelta support (RIFT-4916)
-        # As a workaround, we can fetch the pre and post xact elements and
-        # perform a comparison to figure out adds/deletes/updates
-        xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
-        curr_cfgs = list(dts_member_reg.elements)
-
-        xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
-        curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
-
-        # Find Adds
-        added_keys = set(xact_key_map) - set(curr_key_map)
-        added_cfgs = [xact_key_map[key] for key in added_keys]
-
-        # Find Deletes
-        deleted_keys = set(curr_key_map) - set(xact_key_map)
-        deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
-
-        # Find Updates
-        updated_keys = set(curr_key_map) & set(xact_key_map)
-        updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
-
-        return added_cfgs, deleted_cfgs, updated_cfgs
-
     @asyncio.coroutine
     def register(self):
         """ Register for VNFD configuration"""
 
         def on_apply(dts, acg, xact, action, scratch):
             """Apply the  configuration"""
-            is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
-
-
-            add_cfgs, delete_cfgs, update_cfgs = self.get_add_delete_update_cfgs(
-                    dts_member_reg=self.reg,
+            if xact.xact is None:
+                if action == rwdts.AppconfAction.INSTALL:
+                    try:
+                        if self._reg:
+                            for cfg in self._reg.elements:
+                                if self.callback:
+                                    self.callback(cfg, rwdts.QueryAction.CREATE)
+
+                        else:
+                            self._log.error("Reg handle is None during action {} for {}".
+                                            format(action, self.__class__))
+
+                    except Exception as e:
+                        self._log.exception("Adding config {} during restart failed: {}".
+                                            format(cfg, e))
+                return
+
+            add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
+                    dts_member_reg=self._reg,
                     xact=xact,
                     key_name=self.key_name())
 
@@ -202,14 +218,18 @@ class AbstractConfigSubscriber(SubscriberDtsHandler):
         @asyncio.coroutine
         def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
             """ on prepare callback """
-            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+            self._log.debug("Subscriber DTS prepare for project %s: %s",
+                            self.project, xact_info.query_action)
+            try:
+                xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+            except rift.tasklets.dts.ResponseError as e:
+                self._log.warning(
+                    "Subscriber DTS prepare for project {}, action {} in class {} failed: {}".
+                    format(self.project, xact_info.query_action, self.__class__, e))
 
         acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
         with self.dts.appconf_group_create(handler=acg_hdl) as acg:
-            self.reg = acg.register(
-                xpath=self.get_xpath(),
+            self._reg = acg.register(
+                xpath=self.project.add_project(self.get_xpath()),
                 flags=self.get_reg_flags(),
                 on_prepare=on_prepare)
-
-    def deregister(self):
-        self.reg.deregister()