X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwresmgr%2Frift%2Ftasklets%2Frwresmgrtasklet%2Frwresmgr_events.py;h=360390b0e95c02d42e7dfa733f5eae77f5e50925;hb=a3bb91f092d378448cb870eccd45d43865de143c;hp=c80925c6a35ab7f048e5afeffd6ded695c487957;hpb=66ed44eebbbe7bbc23d44780ae11e7feaa4b43ba;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwresmgr/rift/tasklets/rwresmgrtasklet/rwresmgr_events.py b/rwlaunchpad/plugins/rwresmgr/rift/tasklets/rwresmgrtasklet/rwresmgr_events.py index c80925c6..360390b0 100755 --- a/rwlaunchpad/plugins/rwresmgr/rift/tasklets/rwresmgrtasklet/rwresmgr_events.py +++ b/rwlaunchpad/plugins/rwresmgr/rift/tasklets/rwresmgrtasklet/rwresmgr_events.py @@ -48,6 +48,7 @@ class ResourceMgrEvent(object): self._dts = dts self._loop = loop self._parent = parent + self._project = parent._project self._vdu_reg = None self._link_reg = None @@ -60,22 +61,34 @@ class ResourceMgrEvent(object): yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()], timeout=timeout, loop=self._loop) - def create_record_dts(self, regh, xact, path, msg): + def _add_config_flag(self, xpath, config=False): + if xpath[0] == '/': + if config: + return 'C,' + xpath + else: + return 'D,' + xpath + + return xpath + + def create_record_dts(self, regh, xact, xpath, msg): """ Create a record in DTS with path and message """ + path = self._add_config_flag(self._project.add_project(xpath)) self._log.debug("Creating Resource Record xact = %s, %s:%s", xact, path, msg) regh.create_element(path, msg) - def delete_record_dts(self, regh, xact, path): + def delete_record_dts(self, regh, xact, xpath): """ Delete a VNFR record in DTS with path and message """ + path = self._add_config_flag(self._project.add_project(xpath)) self._log.debug("Deleting Resource Record xact = %s, %s", xact, path) regh.delete_element(path) + @asyncio.coroutine def register(self): @asyncio.coroutine @@ -161,12 +174,17 @@ class ResourceMgrEvent(object): yield from self._parent.release_virtual_network(pathentry.key00.event_id) self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema())) elif action == rwdts.QueryAction.READ: - response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id) + # TODO: Check why we are getting null event id request + if pathentry.key00.event_id: + response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id) + else: + xact_info.respond_xpath(rwdts.XactRspCode.NA) + return else: raise ValueError("Only read/create/delete actions available. Received action: %s" %(action)) - self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.", - response_xpath, response_info) + self._log.info("Responding with VirtualLinkInfo at xpath %s: %s.", + response_xpath, response_info) xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info) @@ -177,7 +195,7 @@ class ResourceMgrEvent(object): return rwdts.MemberRspCode.ACTION_OK def monitor_vdu_state(response_xpath, pathentry): - self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath) + self._log.debug("Initiating VDU state monitoring for xpath: %s ", response_xpath) time_to_wait = 300 sleep_time = 2 loop_cnt = int(time_to_wait/sleep_time) @@ -197,7 +215,8 @@ class ResourceMgrEvent(object): response_info) else: if response_info.resource_state == 'active' or response_info.resource_state == 'failed': - self._log.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s", + self._log.info("VDU state monitoring: VDU reached terminal state. " + + "Publishing VDU info: %s at path: %s", response_info, response_xpath) yield from self._dts.query_update(response_xpath, rwdts.XactFlag.ADVISE, @@ -205,7 +224,9 @@ class ResourceMgrEvent(object): return else: ### End of loop. This is only possible if VDU did not reach active state - err_msg = "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath, time_to_wait) + err_msg = ("VDU state monitoring: VDU at xpath :{} did not reached active " + + "state in {} seconds. Aborting monitoring". + format(response_xpath, time_to_wait)) self._log.info(err_msg) response_info = RwResourceMgrYang.VDUEventData_ResourceInfo() response_info.resource_state = 'failed' @@ -217,6 +238,7 @@ class ResourceMgrEvent(object): def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg): response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info" + response_xpath = self._add_config_flag(response_xpath) schema = RwResourceMgrYang.VDUEventData().schema() pathentry = schema.keyspec_to_entry(ks_path) try: @@ -233,21 +255,22 @@ class ResourceMgrEvent(object): response_info) else: if response_info.resource_state == 'failed' or response_info.resource_state == 'active' : - self._log.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s", - response_info, response_xpath) + self._log.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s", + response_info, response_xpath) yield from self._dts.query_update(response_xpath, rwdts.XactFlag.ADVISE, response_info) else: + self._log.debug("VDU create monitor at {}".format(response_xpath)) asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry), loop = self._loop) - @asyncio.coroutine def on_vdu_request_prepare(xact_info, action, ks_path, request_msg): self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s", xact_info, action, request_msg) response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info" + response_xpath = self._add_config_flag(response_xpath) schema = RwResourceMgrYang.VDUEventData().schema() pathentry = schema.keyspec_to_entry(ks_path) @@ -269,7 +292,12 @@ class ResourceMgrEvent(object): yield from self._parent.release_virtual_compute(pathentry.key00.event_id) self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema())) elif action == rwdts.QueryAction.READ: - response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id) + # TODO: Check why we are getting null event id request + if pathentry.key00.event_id: + response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id) + else: + xact_info.respond_xpath(rwdts.XactRspCode.NA) + return else: raise ValueError("Only create/delete actions available. Received action: %s" %(action)) @@ -293,24 +321,37 @@ class ResourceMgrEvent(object): link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,) with self._dts.group_create(handler=link_handlers) as link_group: - self._log.debug("Registering for Link Resource Request using xpath: %s", - ResourceMgrEvent.VLINK_REQUEST_XPATH) + xpath = self._project.add_project(ResourceMgrEvent.VLINK_REQUEST_XPATH) + self._log.debug("Registering for Link Resource Request using xpath: {}". + format(xpath)) - self._link_reg = link_group.register(xpath=ResourceMgrEvent.VLINK_REQUEST_XPATH, - handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready, - on_commit=on_link_request_commit, - on_prepare=on_link_request_prepare), - flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,) + self._link_reg = link_group.register(xpath=xpath, + handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready, + on_commit=on_link_request_commit, + on_prepare=on_link_request_prepare), + flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,) vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, ) with self._dts.group_create(handler=vdu_handlers) as vdu_group: - self._log.debug("Registering for VDU Resource Request using xpath: %s", - ResourceMgrEvent.VDU_REQUEST_XPATH) + xpath = self._project.add_project(ResourceMgrEvent.VDU_REQUEST_XPATH) + self._log.debug("Registering for VDU Resource Request using xpath: {}". + format(xpath)) + + self._vdu_reg = vdu_group.register(xpath=xpath, + handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready, + on_commit=on_vdu_request_commit, + on_prepare=on_vdu_request_prepare), + flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,) + + + def deregister(self): + self._log.debug("De-register for project {}".format(self._project.name)) - self._vdu_reg = vdu_group.register(xpath=ResourceMgrEvent.VDU_REQUEST_XPATH, - handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready, - on_commit=on_vdu_request_commit, - on_prepare=on_vdu_request_prepare), - flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,) + if self._vdu_reg: + self._vdu_reg.deregister() + self._vdu_reg = None + if self._link_reg: + self._link_reg.deregister() + self._link_reg = None