self._dts = dts
self._loop = loop
self._parent = parent
+ self._project = parent._project
self._vdu_reg = None
self._link_reg = None
yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
timeout=timeout, loop=self._loop)
- def create_record_dts(self, regh, xact, path, msg):
+ def _add_config_flag(self, xpath, config=False):
+ if xpath[0] == '/':
+ if config:
+ return 'C,' + xpath
+ else:
+ return 'D,' + xpath
+
+ return xpath
+
+ def create_record_dts(self, regh, xact, xpath, msg):
"""
Create a record in DTS with path and message
"""
+ path = self._add_config_flag(self._project.add_project(xpath))
self._log.debug("Creating Resource Record xact = %s, %s:%s",
xact, path, msg)
regh.create_element(path, msg)
- def delete_record_dts(self, regh, xact, path):
+ def delete_record_dts(self, regh, xact, xpath):
"""
Delete a VNFR record in DTS with path and message
"""
+ path = self._add_config_flag(self._project.add_project(xpath))
self._log.debug("Deleting Resource Record xact = %s, %s",
xact, path)
regh.delete_element(path)
+
@asyncio.coroutine
def register(self):
@asyncio.coroutine
yield from self._parent.release_virtual_network(pathentry.key00.event_id)
self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
elif action == rwdts.QueryAction.READ:
- response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
+ # TODO: Check why we are getting null event id request
+ if pathentry.key00.event_id:
+ response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
+ else:
+ xact_info.respond_xpath(rwdts.XactRspCode.NA)
+ return
else:
raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
- self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
- response_xpath, response_info)
+ self._log.info("Responding with VirtualLinkInfo at xpath %s: %s.",
+ response_xpath, response_info)
xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
return rwdts.MemberRspCode.ACTION_OK
def monitor_vdu_state(response_xpath, pathentry):
- self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
+ self._log.debug("Initiating VDU state monitoring for xpath: %s ", response_xpath)
time_to_wait = 300
sleep_time = 2
loop_cnt = int(time_to_wait/sleep_time)
response_info)
else:
if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
- self._log.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
+ self._log.info("VDU state monitoring: VDU reached terminal state. " +
+ "Publishing VDU info: %s at path: %s",
response_info, response_xpath)
yield from self._dts.query_update(response_xpath,
rwdts.XactFlag.ADVISE,
return
else:
### End of loop. This is only possible if VDU did not reach active state
- err_msg = "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath, time_to_wait)
+ err_msg = ("VDU state monitoring: VDU at xpath :{} did not reached active " +
+ "state in {} seconds. Aborting monitoring".
+ format(response_xpath, time_to_wait))
self._log.info(err_msg)
response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
response_info.resource_state = 'failed'
def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+ response_xpath = self._add_config_flag(response_xpath)
schema = RwResourceMgrYang.VDUEventData().schema()
pathentry = schema.keyspec_to_entry(ks_path)
try:
response_info)
else:
if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
- self._log.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
- response_info, response_xpath)
+ self._log.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
+ response_info, response_xpath)
yield from self._dts.query_update(response_xpath,
rwdts.XactFlag.ADVISE,
response_info)
else:
+ self._log.debug("VDU create monitor at {}".format(response_xpath))
asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
loop = self._loop)
-
@asyncio.coroutine
def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
xact_info, action, request_msg)
response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+ response_xpath = self._add_config_flag(response_xpath)
schema = RwResourceMgrYang.VDUEventData().schema()
pathentry = schema.keyspec_to_entry(ks_path)
yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
elif action == rwdts.QueryAction.READ:
- response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+ # TODO: Check why we are getting null event id request
+ if pathentry.key00.event_id:
+ response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
+ else:
+ xact_info.respond_xpath(rwdts.XactRspCode.NA)
+ return
else:
raise ValueError("Only create/delete actions available. Received action: %s" %(action))
link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
with self._dts.group_create(handler=link_handlers) as link_group:
- self._log.debug("Registering for Link Resource Request using xpath: %s",
- ResourceMgrEvent.VLINK_REQUEST_XPATH)
+ xpath = self._project.add_project(ResourceMgrEvent.VLINK_REQUEST_XPATH)
+ self._log.debug("Registering for Link Resource Request using xpath: {}".
+ format(xpath))
- self._link_reg = link_group.register(xpath=ResourceMgrEvent.VLINK_REQUEST_XPATH,
- handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
- on_commit=on_link_request_commit,
- on_prepare=on_link_request_prepare),
- flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+ self._link_reg = link_group.register(xpath=xpath,
+ handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+ on_commit=on_link_request_commit,
+ on_prepare=on_link_request_prepare),
+ flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
with self._dts.group_create(handler=vdu_handlers) as vdu_group:
- self._log.debug("Registering for VDU Resource Request using xpath: %s",
- ResourceMgrEvent.VDU_REQUEST_XPATH)
+ xpath = self._project.add_project(ResourceMgrEvent.VDU_REQUEST_XPATH)
+ self._log.debug("Registering for VDU Resource Request using xpath: {}".
+ format(xpath))
+
+ self._vdu_reg = vdu_group.register(xpath=xpath,
+ handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+ on_commit=on_vdu_request_commit,
+ on_prepare=on_vdu_request_prepare),
+ flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+
+
+ def deregister(self):
+ self._log.debug("De-register for project {}".format(self._project.name))
- self._vdu_reg = vdu_group.register(xpath=ResourceMgrEvent.VDU_REQUEST_XPATH,
- handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
- on_commit=on_vdu_request_commit,
- on_prepare=on_vdu_request_prepare),
- flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
+ if self._vdu_reg:
+ self._vdu_reg.deregister()
+ self._vdu_reg = None
+ if self._link_reg:
+ self._link_reg.deregister()
+ self._link_reg = None