update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_events.py
index c80925c..d2f9709 100755 (executable)
@@ -16,9 +16,9 @@
 #
 
 import asyncio
+import gi
 import sys
 
-import gi
 gi.require_version('RwDts', '1.0')
 gi.require_version('RwYang', '1.0')
 gi.require_version('RwResourceMgrYang', '1.0')
@@ -31,6 +31,8 @@ from gi.repository import (
     RwLaunchpadYang,
     RwcalYang,
 )
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
 
 from gi.repository.RwTypes import RwStatus
 import rift.tasklets
@@ -48,6 +50,7 @@ class ResourceMgrEvent(object):
         self._dts = dts
         self._loop = loop
         self._parent = parent
+        self._project = parent._project
         self._vdu_reg = None
         self._link_reg = None
 
@@ -60,22 +63,34 @@ class ResourceMgrEvent(object):
         yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
                                 timeout=timeout, loop=self._loop)
 
-    def create_record_dts(self, regh, xact, path, msg):
+    def _add_config_flag(self, xpath, config=False):
+        if xpath[0] == '/':
+            if config:
+                return 'C,' + xpath
+            else:
+                return 'D,' + xpath
+
+        return xpath
+
+    def create_record_dts(self, regh, xact, xpath, msg):
         """
         Create a record in DTS with path and message
         """
+        path = self._add_config_flag(self._project.add_project(xpath))
         self._log.debug("Creating Resource Record xact = %s, %s:%s",
                         xact, path, msg)
         regh.create_element(path, msg)
 
-    def delete_record_dts(self, regh, xact, path):
+    def delete_record_dts(self, regh, xact, xpath):
         """
         Delete a VNFR record in DTS with path and message
         """
+        path = self._add_config_flag(self._project.add_project(xpath))
         self._log.debug("Deleting Resource Record xact = %s, %s",
                         xact, path)
         regh.delete_element(path)
 
+
     @asyncio.coroutine
     def register(self):
         @asyncio.coroutine
@@ -90,15 +105,27 @@ class ResourceMgrEvent(object):
                 """
                 # wait for 3 seconds
                 yield from asyncio.sleep(3, loop=self._loop)
+                
+                try:
+                    response_info = yield from self._parent.reallocate_virtual_network(
+                          link.event_id,
+                          link.cloud_account,
+                          link.request_info, link.resource_info,
+                        )
+                except Exception as e:
+                  self._log.error("Encoutered exception in reallocate_virtual_network")
+                  self._log.exception(e)
+
 
-                response_info = yield from self._parent.reallocate_virtual_network(link.event_id,
-                                                                                 link.cloud_account,
-                                                                                 link.request_info, link.resource_info,
-                                                                                 )
             if (xact_event == rwdts.MemberEvent.INSTALL):
               link_cfg = self._link_reg.elements
+              self._log.debug("onlink_event INSTALL event: {}".format(link_cfg))
+
               for link in link_cfg:
                 self._loop.create_task(instantiate_realloc_vn(link))
+
+              self._log.debug("onlink_event INSTALL event complete")
+
             return rwdts.MemberRspCode.ACTION_OK
 
         @asyncio.coroutine
@@ -114,82 +141,135 @@ class ResourceMgrEvent(object):
                 # wait for 3 seconds
                 yield from asyncio.sleep(3, loop=self._loop)
 
-                response_info = yield from self._parent.allocate_virtual_compute(vdu.event_id,
-                                                                                 vdu.cloud_account,
-                                                                                 vdu.request_info
-                                                                                 )
+                try:
+                    response_info = yield from self._parent.allocate_virtual_compute(
+                        vdu.event_id,
+                        vdu.cloud_account,
+                        vdu.request_info
+                       )
+                except Exception as e:
+                    self._log.error("Encoutered exception in allocate_virtual_network")
+                    self._log.exception(e)
+                    raise e
+
+                response_xpath = "/rw-resource-mgr:resource-mgmt/rw-resource-mgr:vdu-event/rw-resource-mgr:vdu-event-data[rw-resource-mgr:event-id={}]/resource-info".format(
+                    quoted_key(vdu.event_id.strip()))
+
+                cloud_account = self._parent.get_cloud_account_detail(cloud_account)
+                asyncio.ensure_future(monitor_vdu_state(response_xpath, vdu.event_id, cloud_account.vdu_instance_timeout), loop=self._loop)
+
             if (xact_event == rwdts.MemberEvent.INSTALL):
-              vdu_cfg = self._vdu_reg.elements
-              for vdu in vdu_cfg:
-                self._loop.create_task(instantiate_realloc_vdu(vdu))
-            return rwdts.MemberRspCode.ACTION_OK
+                vdu_cfg = self._vdu_reg.elements
+                self._log.debug("onvdu_event INSTALL event: {}".format(vdu_cfg))
+
+                for vdu in vdu_cfg:
+                    self._loop.create_task(instantiate_realloc_vdu(vdu))
+
+                self._log.debug("onvdu_event INSTALL event complete")
 
-        def on_link_request_commit(xact_info):
-            """ The transaction has been committed """
-            self._log.debug("Received link request commit (xact_info: %s)", xact_info)
             return rwdts.MemberRspCode.ACTION_OK
 
+        @asyncio.coroutine
+        def allocate_vlink_task(ks_path, event_id, cloud_account, request_info):
+            response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+            schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData().schema()
+            pathentry = schema.keyspec_to_entry(ks_path)
+            try:
+                response_info = yield from self._parent.allocate_virtual_network(pathentry.key00.event_id,
+                                                                                 cloud_account,
+                                                                                 request_info)
+            except Exception as e:
+                self._log.error("Encountered exception: %s while creating virtual network", str(e))
+                self._log.exception(e)
+                response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
+                response_info.resource_state = 'failed'
+                response_info.resource_errors = str(e)
+                yield from self._dts.query_update(response_xpath,
+                                                  rwdts.XactFlag.ADVISE,
+                                                  response_info)
+            else:
+                yield from self._dts.query_update(response_xpath,
+                                                  rwdts.XactFlag.ADVISE,
+                                                  response_info)
+
+
         @asyncio.coroutine
         def on_link_request_prepare(xact_info, action, ks_path, request_msg):
-            self._log.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
+            self._log.debug(
+                "Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
                             xact_info, action, request_msg)
 
             response_info = None
             response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
 
-            schema = RwResourceMgrYang.VirtualLinkEventData().schema()
+            schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData().schema()
             pathentry = schema.keyspec_to_entry(ks_path)
 
             if action == rwdts.QueryAction.CREATE:
                 try:
-                    response_info = yield from self._parent.allocate_virtual_network(pathentry.key00.event_id,
-                                                                                 request_msg.cloud_account,
-                                                                                 request_msg.request_info)
+                    response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
+                    response_info.resource_state = 'pending'
+                    request_msg.resource_info = response_info
+                    self.create_record_dts(self._link_reg,
+                                           None,
+                                           ks_path.to_xpath(RwResourceMgrYang.get_schema()),
+                                           request_msg)
+
+                    asyncio.ensure_future(allocate_vlink_task(ks_path,
+                                                              pathentry.key00.event_id,
+                                                              request_msg.cloud_account,
+                                                              request_msg.request_info),
+                                                              loop = self._loop)
                 except Exception as e:
-                    self._log.error("Encountered exception: %s while creating virtual network", str(e))
+                    self._log.error(
+                        "Encountered exception: %s while creating virtual network", str(e))
                     self._log.exception(e)
-                    response_info = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
+                    response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
                     response_info.resource_state = 'failed'
                     response_info.resource_errors = str(e)
                     yield from self._dts.query_update(response_xpath,
                                                       rwdts.XactFlag.ADVISE,
                                                       response_info)
-                else:
-                    request_msg.resource_info = response_info
-                    self.create_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()), request_msg)
             elif action == rwdts.QueryAction.DELETE:
                 yield from self._parent.release_virtual_network(pathentry.key00.event_id)
-                self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
+                self.delete_record_dts(self._link_reg, None, 
+                    ks_path.to_xpath(RwResourceMgrYang.get_schema()))
+
             elif action == rwdts.QueryAction.READ:
-                response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
+                # TODO: Check why we are getting null event id request
+                if pathentry.key00.event_id:
+                    response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
+                else:
+                    xact_info.respond_xpath(rwdts.XactRspCode.NA)
+                    return
             else:
-                raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
+                raise ValueError(
+                    "Only read/create/delete actions available. Received action: %s" %(action))
 
-            self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
-                            response_xpath, response_info)
+            self._log.info("Responding with VirtualLinkInfo at xpath %s: %s.",
+                           response_xpath, response_info)
 
             xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
 
 
-        def on_vdu_request_commit(xact_info):
-            """ The transaction has been committed """
-            self._log.debug("Received vdu request commit (xact_info: %s)", xact_info)
-            return rwdts.MemberRspCode.ACTION_OK
 
-        def monitor_vdu_state(response_xpath, pathentry):
+        def monitor_vdu_state(response_xpath, event_id, vdu_timeout):
             self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
-            time_to_wait = 300
             sleep_time = 2
-            loop_cnt = int(time_to_wait/sleep_time)
+            loop_cnt = int(vdu_timeout/sleep_time)
+
             for i in range(loop_cnt):
-                self._log.debug("VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath)
+                self._log.debug(
+                    "VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath)
                 yield from asyncio.sleep(2, loop = self._loop)
+
                 try:
-                    response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+                    response_info = yield from self._parent.read_virtual_compute_info(event_id)
                 except Exception as e:
-                    self._log.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
-                                   str(e),response_xpath)
-                    response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                    self._log.info(
+                        "VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring", str(e),response_xpath)
+
+                    response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
                     response_info.resource_state = 'failed'
                     response_info.resource_errors = str(e)
                     yield from self._dts.query_update(response_xpath,
@@ -197,7 +277,8 @@ class ResourceMgrEvent(object):
                                                       response_info)
                 else:
                     if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
-                        self._log.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
+                        self._log.info("VDU state monitoring: VDU reached terminal state. " +
+                                       "Publishing VDU info: %s at path: %s",
                                        response_info, response_xpath)
                         yield from self._dts.query_update(response_xpath,
                                                           rwdts.XactFlag.ADVISE,
@@ -205,9 +286,11 @@ class ResourceMgrEvent(object):
                         return
             else:
                 ### End of loop. This is only possible if VDU did not reach active state
-                err_msg = "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath, time_to_wait)
+                err_msg = ("VDU state monitoring: VDU at xpath :{} did not reached active " +
+                           "state in {} seconds. Aborting monitoring".
+                           format(response_xpath, time_to_wait))
                 self._log.info(err_msg)
-                response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
                 response_info.resource_state = 'failed'
                 response_info.resource_errors = err_msg
                 yield from self._dts.query_update(response_xpath,
@@ -217,7 +300,8 @@ class ResourceMgrEvent(object):
 
         def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
             response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
-            schema = RwResourceMgrYang.VDUEventData().schema()
+            response_xpath = self._add_config_flag(response_xpath)
+            schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData().schema()
             pathentry = schema.keyspec_to_entry(ks_path)
             try:
                 response_info = yield from self._parent.allocate_virtual_compute(event_id,
@@ -225,34 +309,42 @@ class ResourceMgrEvent(object):
                                                                                  request_msg,)
             except Exception as e:
                 self._log.error("Encountered exception : %s while creating virtual compute", str(e))
-                response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
                 response_info.resource_state = 'failed'
                 response_info.resource_errors = str(e)
                 yield from self._dts.query_update(response_xpath,
                                                   rwdts.XactFlag.ADVISE,
                                                   response_info)
             else:
+                cloud_account = self._parent.get_cloud_account_detail(cloud_account)
+                #RIFT-17719 - Set the resource state to active if no floating ip pool specified and is waiting for public ip.
+                if response_info.resource_state == 'pending' and cloud_account.has_field('openstack') \
+                     and not (cloud_account.openstack.has_field('floating_ip_pool')) :
+                    if (request_msg.has_field('allocate_public_address')) and (request_msg.allocate_public_address == True):
+                        if not response_info.has_field('public_ip'):
+                            response_info.resource_state = 'active'
+
                 if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
-                    self._log.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
-                                   response_info, response_xpath)
+                    self._log.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
+                                    response_info, response_xpath)
                     yield from self._dts.query_update(response_xpath,
                                                       rwdts.XactFlag.ADVISE,
                                                       response_info)
                 else:
-                    asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
+                    asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry.key00.event_id, cloud_account.vdu_instance_timeout),
                                           loop = self._loop)
 
-
         @asyncio.coroutine
         def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
             self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
                             xact_info, action, request_msg)
             response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
-            schema = RwResourceMgrYang.VDUEventData().schema()
+            response_xpath = self._add_config_flag(response_xpath)
+            schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData().schema()
             pathentry = schema.keyspec_to_entry(ks_path)
 
             if action == rwdts.QueryAction.CREATE:
-                response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
                 response_info.resource_state = 'pending'
                 request_msg.resource_info = response_info
                 self.create_record_dts(self._vdu_reg,
@@ -269,7 +361,12 @@ class ResourceMgrEvent(object):
                 yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
                 self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
             elif action == rwdts.QueryAction.READ:
-                response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+                # TODO: Check why we are getting null event id request
+                if pathentry.key00.event_id:
+                    response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+                else:
+                    xact_info.respond_xpath(rwdts.XactRspCode.NA)
+                    return
             else:
                 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
 
@@ -293,24 +390,35 @@ class ResourceMgrEvent(object):
 
         link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
         with self._dts.group_create(handler=link_handlers) as link_group:
-            self._log.debug("Registering for Link Resource Request using xpath: %s",
-                            ResourceMgrEvent.VLINK_REQUEST_XPATH)
-
-            self._link_reg = link_group.register(xpath=ResourceMgrEvent.VLINK_REQUEST_XPATH,
-                                            handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
-                                                                                          on_commit=on_link_request_commit,
-                                                                                          on_prepare=on_link_request_prepare),
-                                            flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
-
+            xpath = self._project.add_project(ResourceMgrEvent.VLINK_REQUEST_XPATH)
+            self._log.debug("Registering for Link Resource Request using xpath: {}".
+                            format(xpath))
+
+            self._link_reg = link_group.register(xpath=xpath,
+                                                 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+                                                                                               on_prepare=on_link_request_prepare),
+                                                 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+            
         vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
         with self._dts.group_create(handler=vdu_handlers) as vdu_group:
+                
+            xpath = self._project.add_project(ResourceMgrEvent.VDU_REQUEST_XPATH)
+            self._log.debug("Registering for VDU Resource Request using xpath: {}".
+                            format(xpath))
+
+            self._vdu_reg = vdu_group.register(xpath=xpath,
+                handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+                                                              on_prepare=on_vdu_request_prepare),
+                                               flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+
 
-            self._log.debug("Registering for VDU Resource Request using xpath: %s",
-                            ResourceMgrEvent.VDU_REQUEST_XPATH)
+    def deregister(self):
+        self._log.debug("De-register for project {}".format(self._project.name))
 
-            self._vdu_reg = vdu_group.register(xpath=ResourceMgrEvent.VDU_REQUEST_XPATH,
-                                           handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
-                                                                                         on_commit=on_vdu_request_commit,
-                                                                                         on_prepare=on_vdu_request_prepare),
-                                           flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+        if self._vdu_reg:
+            self._vdu_reg.deregister()
+            self._vdu_reg = None
 
+        if self._link_reg:
+            self._link_reg.deregister()
+            self._link_reg = None