Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_events.py
index 5f87c66..360390b 100755 (executable)
@@ -48,6 +48,7 @@ class ResourceMgrEvent(object):
         self._dts = dts
         self._loop = loop
         self._parent = parent
+        self._project = parent._project
         self._vdu_reg = None
         self._link_reg = None
 
@@ -60,22 +61,34 @@ class ResourceMgrEvent(object):
         yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
                                 timeout=timeout, loop=self._loop)
 
-    def create_record_dts(self, regh, xact, path, msg):
+    def _add_config_flag(self, xpath, config=False):
+        if xpath[0] == '/':
+            if config:
+                return 'C,' + xpath
+            else:
+                return 'D,' + xpath
+
+        return xpath
+
+    def create_record_dts(self, regh, xact, xpath, msg):
         """
         Create a record in DTS with path and message
         """
+        path = self._add_config_flag(self._project.add_project(xpath))
         self._log.debug("Creating Resource Record xact = %s, %s:%s",
                         xact, path, msg)
         regh.create_element(path, msg)
 
-    def delete_record_dts(self, regh, xact, path):
+    def delete_record_dts(self, regh, xact, xpath):
         """
         Delete a VNFR record in DTS with path and message
         """
+        path = self._add_config_flag(self._project.add_project(xpath))
         self._log.debug("Deleting Resource Record xact = %s, %s",
                         xact, path)
         regh.delete_element(path)
 
+
     @asyncio.coroutine
     def register(self):
         @asyncio.coroutine
@@ -161,12 +174,17 @@ class ResourceMgrEvent(object):
                 yield from self._parent.release_virtual_network(pathentry.key00.event_id)
                 self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
             elif action == rwdts.QueryAction.READ:
-                response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
+                # TODO: Check why we are getting null event id request
+                if pathentry.key00.event_id:
+                    response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
+                else:
+                    xact_info.respond_xpath(rwdts.XactRspCode.NA)
+                    return
             else:
                 raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
 
-            self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
-                            response_xpath, response_info)
+            self._log.info("Responding with VirtualLinkInfo at xpath %s: %s.",
+                           response_xpath, response_info)
 
             xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
 
@@ -177,11 +195,13 @@ class ResourceMgrEvent(object):
             return rwdts.MemberRspCode.ACTION_OK
 
         def monitor_vdu_state(response_xpath, pathentry):
-            self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
-            loop_cnt = 180
+            self._log.debug("Initiating VDU state monitoring for xpath: %s ", response_xpath)
+            time_to_wait = 300
+            sleep_time = 2
+            loop_cnt = int(time_to_wait/sleep_time)
             for i in range(loop_cnt):
-                self._log.debug("VDU state monitoring for xpath: %s. Sleeping for 1 second", response_xpath)
-                yield from asyncio.sleep(1, loop = self._loop)
+                self._log.debug("VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath)
+                yield from asyncio.sleep(2, loop = self._loop)
                 try:
                     response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
                 except Exception as e:
@@ -195,7 +215,8 @@ class ResourceMgrEvent(object):
                                                       response_info)
                 else:
                     if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
-                        self._log.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
+                        self._log.info("VDU state monitoring: VDU reached terminal state. " +
+                                       "Publishing VDU info: %s at path: %s",
                                        response_info, response_xpath)
                         yield from self._dts.query_update(response_xpath,
                                                           rwdts.XactFlag.ADVISE,
@@ -203,7 +224,9 @@ class ResourceMgrEvent(object):
                         return
             else:
                 ### End of loop. This is only possible if VDU did not reach active state
-                err_msg = "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath, loop_cnt)
+                err_msg = ("VDU state monitoring: VDU at xpath :{} did not reached active " +
+                           "state in {} seconds. Aborting monitoring".
+                           format(response_xpath, time_to_wait))
                 self._log.info(err_msg)
                 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
                 response_info.resource_state = 'failed'
@@ -215,6 +238,7 @@ class ResourceMgrEvent(object):
 
         def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
             response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+            response_xpath = self._add_config_flag(response_xpath)
             schema = RwResourceMgrYang.VDUEventData().schema()
             pathentry = schema.keyspec_to_entry(ks_path)
             try:
@@ -231,21 +255,22 @@ class ResourceMgrEvent(object):
                                                   response_info)
             else:
                 if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
-                    self._log.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
-                                   response_info, response_xpath)
+                    self._log.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
+                                    response_info, response_xpath)
                     yield from self._dts.query_update(response_xpath,
                                                       rwdts.XactFlag.ADVISE,
                                                       response_info)
                 else:
+                    self._log.debug("VDU create monitor at {}".format(response_xpath))
                     asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
                                           loop = self._loop)
 
-
         @asyncio.coroutine
         def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
             self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
                             xact_info, action, request_msg)
             response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+            response_xpath = self._add_config_flag(response_xpath)
             schema = RwResourceMgrYang.VDUEventData().schema()
             pathentry = schema.keyspec_to_entry(ks_path)
 
@@ -267,7 +292,12 @@ class ResourceMgrEvent(object):
                 yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
                 self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
             elif action == rwdts.QueryAction.READ:
-                response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+                # TODO: Check why we are getting null event id request
+                if pathentry.key00.event_id:
+                    response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+                else:
+                    xact_info.respond_xpath(rwdts.XactRspCode.NA)
+                    return
             else:
                 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
 
@@ -291,24 +321,37 @@ class ResourceMgrEvent(object):
 
         link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
         with self._dts.group_create(handler=link_handlers) as link_group:
-            self._log.debug("Registering for Link Resource Request using xpath: %s",
-                            ResourceMgrEvent.VLINK_REQUEST_XPATH)
+            xpath = self._project.add_project(ResourceMgrEvent.VLINK_REQUEST_XPATH)
+            self._log.debug("Registering for Link Resource Request using xpath: {}".
+                            format(xpath))
 
-            self._link_reg = link_group.register(xpath=ResourceMgrEvent.VLINK_REQUEST_XPATH,
-                                            handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
-                                                                                          on_commit=on_link_request_commit,
-                                                                                          on_prepare=on_link_request_prepare),
-                                            flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+            self._link_reg = link_group.register(xpath=xpath,
+                handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+                                                              on_commit=on_link_request_commit,
+                                                              on_prepare=on_link_request_prepare),
+                                                 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
 
         vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
         with self._dts.group_create(handler=vdu_handlers) as vdu_group:
 
-            self._log.debug("Registering for VDU Resource Request using xpath: %s",
-                            ResourceMgrEvent.VDU_REQUEST_XPATH)
+            xpath = self._project.add_project(ResourceMgrEvent.VDU_REQUEST_XPATH)
+            self._log.debug("Registering for VDU Resource Request using xpath: {}".
+                            format(xpath))
+
+            self._vdu_reg = vdu_group.register(xpath=xpath,
+                handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+                                                              on_commit=on_vdu_request_commit,
+                                                              on_prepare=on_vdu_request_prepare),
+                                               flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+
+
+    def deregister(self):
+        self._log.debug("De-register for project {}".format(self._project.name))
 
-            self._vdu_reg = vdu_group.register(xpath=ResourceMgrEvent.VDU_REQUEST_XPATH,
-                                           handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
-                                                                                         on_commit=on_vdu_request_commit,
-                                                                                         on_prepare=on_vdu_request_prepare),
-                                           flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+        if self._vdu_reg:
+            self._vdu_reg.deregister()
+            self._vdu_reg = None
 
+        if self._link_reg:
+            self._link_reg.deregister()
+            self._link_reg = None