RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_events.py
diff --git a/rwlaunchpad/plugins/rwresmgr/rift/tasklets/rwresmgrtasklet/rwresmgr_events.py b/rwlaunchpad/plugins/rwresmgr/rift/tasklets/rwresmgrtasklet/rwresmgr_events.py
new file mode 100755 (executable)
index 0000000..5f87c66
--- /dev/null
@@ -0,0 +1,314 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import sys
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwLaunchpadYang', '1.0')
+gi.require_version('RwcalYang', '1.0')
+from gi.repository import (
+    RwDts as rwdts,
+    RwYang,
+    RwResourceMgrYang,
+    RwLaunchpadYang,
+    RwcalYang,
+)
+
+from gi.repository.RwTypes import RwStatus
+import rift.tasklets
+
+if sys.version_info < (3, 4, 4):
+    asyncio.ensure_future = asyncio.async
+
+
+class ResourceMgrEvent(object):
+    VDU_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
+    VLINK_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
+
+    def __init__(self, dts, log, loop, parent):
+        self._log = log
+        self._dts = dts
+        self._loop = loop
+        self._parent = parent
+        self._vdu_reg = None
+        self._link_reg = None
+
+        self._vdu_reg_event = asyncio.Event(loop=self._loop)
+        self._link_reg_event = asyncio.Event(loop=self._loop)
+
+    @asyncio.coroutine
+    def wait_ready(self, timeout=5):
+        self._log.debug("Waiting for all request registrations to become ready.")
+        yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
+                                timeout=timeout, loop=self._loop)
+
+    def create_record_dts(self, regh, xact, path, msg):
+        """
+        Create a record in DTS with path and message
+        """
+        self._log.debug("Creating Resource Record xact = %s, %s:%s",
+                        xact, path, msg)
+        regh.create_element(path, msg)
+
+    def delete_record_dts(self, regh, xact, path):
+        """
+        Delete a VNFR record in DTS with path and message
+        """
+        self._log.debug("Deleting Resource Record xact = %s, %s",
+                        xact, path)
+        regh.delete_element(path)
+
+    @asyncio.coroutine
+    def register(self):
+        @asyncio.coroutine
+        def onlink_event(dts, g_reg, xact, xact_event, scratch_data):
+            @asyncio.coroutine
+            def instantiate_realloc_vn(link):
+                """Re-populate the virtual link information after restart
+
+                Arguments:
+                    vlink 
+
+                """
+                # wait for 3 seconds
+                yield from asyncio.sleep(3, loop=self._loop)
+
+                response_info = yield from self._parent.reallocate_virtual_network(link.event_id,
+                                                                                 link.cloud_account,
+                                                                                 link.request_info, link.resource_info,
+                                                                                 )
+            if (xact_event == rwdts.MemberEvent.INSTALL):
+              link_cfg = self._link_reg.elements
+              for link in link_cfg:
+                self._loop.create_task(instantiate_realloc_vn(link))
+            return rwdts.MemberRspCode.ACTION_OK
+
+        @asyncio.coroutine
+        def onvdu_event(dts, g_reg, xact, xact_event, scratch_data):
+            @asyncio.coroutine
+            def instantiate_realloc_vdu(vdu):
+                """Re-populate the VDU information after restart
+
+                Arguments:
+                    vdu 
+
+                """
+                # wait for 3 seconds
+                yield from asyncio.sleep(3, loop=self._loop)
+
+                response_info = yield from self._parent.allocate_virtual_compute(vdu.event_id,
+                                                                                 vdu.cloud_account,
+                                                                                 vdu.request_info
+                                                                                 )
+            if (xact_event == rwdts.MemberEvent.INSTALL):
+              vdu_cfg = self._vdu_reg.elements
+              for vdu in vdu_cfg:
+                self._loop.create_task(instantiate_realloc_vdu(vdu))
+            return rwdts.MemberRspCode.ACTION_OK
+
+        def on_link_request_commit(xact_info):
+            """ The transaction has been committed """
+            self._log.debug("Received link request commit (xact_info: %s)", xact_info)
+            return rwdts.MemberRspCode.ACTION_OK
+
+        @asyncio.coroutine
+        def on_link_request_prepare(xact_info, action, ks_path, request_msg):
+            self._log.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
+                            xact_info, action, request_msg)
+
+            response_info = None
+            response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+
+            schema = RwResourceMgrYang.VirtualLinkEventData().schema()
+            pathentry = schema.keyspec_to_entry(ks_path)
+
+            if action == rwdts.QueryAction.CREATE:
+                try:
+                    response_info = yield from self._parent.allocate_virtual_network(pathentry.key00.event_id,
+                                                                                 request_msg.cloud_account,
+                                                                                 request_msg.request_info)
+                except Exception as e:
+                    self._log.error("Encountered exception: %s while creating virtual network", str(e))
+                    self._log.exception(e)
+                    response_info = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
+                    response_info.resource_state = 'failed'
+                    response_info.resource_errors = str(e)
+                    yield from self._dts.query_update(response_xpath,
+                                                      rwdts.XactFlag.ADVISE,
+                                                      response_info)
+                else:
+                    request_msg.resource_info = response_info
+                    self.create_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()), request_msg)
+            elif action == rwdts.QueryAction.DELETE:
+                yield from self._parent.release_virtual_network(pathentry.key00.event_id)
+                self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
+            elif action == rwdts.QueryAction.READ:
+                response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
+            else:
+                raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
+
+            self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
+                            response_xpath, response_info)
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
+
+
+        def on_vdu_request_commit(xact_info):
+            """ The transaction has been committed """
+            self._log.debug("Received vdu request commit (xact_info: %s)", xact_info)
+            return rwdts.MemberRspCode.ACTION_OK
+
+        def monitor_vdu_state(response_xpath, pathentry):
+            self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
+            loop_cnt = 180
+            for i in range(loop_cnt):
+                self._log.debug("VDU state monitoring for xpath: %s. Sleeping for 1 second", response_xpath)
+                yield from asyncio.sleep(1, loop = self._loop)
+                try:
+                    response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+                except Exception as e:
+                    self._log.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
+                                   str(e),response_xpath)
+                    response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                    response_info.resource_state = 'failed'
+                    response_info.resource_errors = str(e)
+                    yield from self._dts.query_update(response_xpath,
+                                                      rwdts.XactFlag.ADVISE,
+                                                      response_info)
+                else:
+                    if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
+                        self._log.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
+                                       response_info, response_xpath)
+                        yield from self._dts.query_update(response_xpath,
+                                                          rwdts.XactFlag.ADVISE,
+                                                          response_info)
+                        return
+            else:
+                ### End of loop. This is only possible if VDU did not reach active state
+                err_msg = "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath, loop_cnt)
+                self._log.info(err_msg)
+                response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                response_info.resource_state = 'failed'
+                response_info.resource_errors = err_msg
+                yield from self._dts.query_update(response_xpath,
+                                                  rwdts.XactFlag.ADVISE,
+                                                  response_info)
+            return
+
+        def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
+            response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+            schema = RwResourceMgrYang.VDUEventData().schema()
+            pathentry = schema.keyspec_to_entry(ks_path)
+            try:
+                response_info = yield from self._parent.allocate_virtual_compute(event_id,
+                                                                                 cloud_account,
+                                                                                 request_msg,)
+            except Exception as e:
+                self._log.error("Encountered exception : %s while creating virtual compute", str(e))
+                response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                response_info.resource_state = 'failed'
+                response_info.resource_errors = str(e)
+                yield from self._dts.query_update(response_xpath,
+                                                  rwdts.XactFlag.ADVISE,
+                                                  response_info)
+            else:
+                if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
+                    self._log.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
+                                   response_info, response_xpath)
+                    yield from self._dts.query_update(response_xpath,
+                                                      rwdts.XactFlag.ADVISE,
+                                                      response_info)
+                else:
+                    asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
+                                          loop = self._loop)
+
+
+        @asyncio.coroutine
+        def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
+            self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
+                            xact_info, action, request_msg)
+            response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+            schema = RwResourceMgrYang.VDUEventData().schema()
+            pathentry = schema.keyspec_to_entry(ks_path)
+
+            if action == rwdts.QueryAction.CREATE:
+                response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+                response_info.resource_state = 'pending'
+                request_msg.resource_info = response_info
+                self.create_record_dts(self._vdu_reg,
+                                       None,
+                                       ks_path.to_xpath(RwResourceMgrYang.get_schema()),
+                                       request_msg)
+                asyncio.ensure_future(allocate_vdu_task(ks_path,
+                                                        pathentry.key00.event_id,
+                                                        request_msg.cloud_account,
+                                                        request_msg.request_info),
+                                      loop = self._loop)
+            elif action == rwdts.QueryAction.DELETE:
+                response_info = None
+                yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
+                self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
+            elif action == rwdts.QueryAction.READ:
+                response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+            else:
+                raise ValueError("Only create/delete actions available. Received action: %s" %(action))
+
+            self._log.debug("Responding with VDUInfo at xpath %s: %s",
+                            response_xpath, response_info)
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
+
+
+        @asyncio.coroutine
+        def on_request_ready(registration, status):
+            self._log.debug("Got request ready event (registration: %s) (status: %s)",
+                            registration, status)
+
+            if registration == self._link_reg:
+                self._link_reg_event.set()
+            elif registration == self._vdu_reg:
+                self._vdu_reg_event.set()
+            else:
+                self._log.error("Unknown registration ready event: %s", registration)
+
+        link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
+        with self._dts.group_create(handler=link_handlers) as link_group:
+            self._log.debug("Registering for Link Resource Request using xpath: %s",
+                            ResourceMgrEvent.VLINK_REQUEST_XPATH)
+
+            self._link_reg = link_group.register(xpath=ResourceMgrEvent.VLINK_REQUEST_XPATH,
+                                            handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+                                                                                          on_commit=on_link_request_commit,
+                                                                                          on_prepare=on_link_request_prepare),
+                                            flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+
+        vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
+        with self._dts.group_create(handler=vdu_handlers) as vdu_group:
+
+            self._log.debug("Registering for VDU Resource Request using xpath: %s",
+                            ResourceMgrEvent.VDU_REQUEST_XPATH)
+
+            self._vdu_reg = vdu_group.register(xpath=ResourceMgrEvent.VDU_REQUEST_XPATH,
+                                           handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+                                                                                         on_commit=on_vdu_request_commit,
+                                                                                         on_prepare=on_vdu_request_prepare),
+                                           flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+