X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwvns%2Frift%2Fvlmgr%2Frwvlmgr.py;h=271ed39e57f272c299e81aaa3f62175563ad05eb;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=bdea4efce6f7a39dca3b4bfbcb86e8a6d1a0a623;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwvns/rift/vlmgr/rwvlmgr.py b/rwlaunchpad/plugins/rwvns/rift/vlmgr/rwvlmgr.py index bdea4efc..271ed39e 100755 --- a/rwlaunchpad/plugins/rwvns/rift/vlmgr/rwvlmgr.py +++ b/rwlaunchpad/plugins/rwvns/rift/vlmgr/rwvlmgr.py @@ -1,5 +1,4 @@ - -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,10 +16,10 @@ import asyncio import enum -import uuid +import gi import time +import uuid -import gi gi.require_version('RwVlrYang', '1.0') gi.require_version('RwDts', '1.0') gi.require_version('RwResourceMgrYang', '1.0') @@ -30,6 +29,8 @@ from gi.repository import ( RwDts as rwdts, RwResourceMgrYang, ) +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key import rift.tasklets @@ -63,29 +64,30 @@ class VirtualLinkRecord(object): """ Virtual Link Record object """ - def __init__(self, dts, log, loop, vnsm, vlr_msg, req_id=None): + def __init__(self, dts, log, loop, vnsm, vlr_msg): self._dts = dts self._log = log self._loop = loop self._vnsm = vnsm self._vlr_msg = vlr_msg + self._vlr_id = self._vlr_msg.id + self._project = vnsm._project self._network_id = None self._network_pool = None self._assigned_subnet = None + self._virtual_cps = list() self._create_time = int(time.time()) - if req_id == None: - self._request_id = str(uuid.uuid4()) - else: - self._request_id = req_id self._state = VirtualLinkRecordState.INIT self._state_failed_reason = None + self._name = self._vlr_msg.name @property def vld_xpath(self): """ VLD xpath associated with this VLR record """ - return "C,/vld:vld-catalog/vld:vld[id='{}']".format(self.vld_id) + return self._project.add_project("C,/vld:vld-catalog/vld:vld[id={}]". + format(quoted_key(self.vld_id))) @property def vld_id(self): @@ -95,29 +97,34 @@ class VirtualLinkRecord(object): @property def vlr_id(self): """ VLR id associated with this VLR record """ - return self._vlr_msg.id + return self._vlr_id @property def xpath(self): """ path for this VLR """ - return("D,/vlr:vlr-catalog" - "/vlr:vlr[vlr:id='{}']".format(self.vlr_id)) + return self._project.add_project("D,/vlr:vlr-catalog" + "/vlr:vlr[vlr:id={}]".format(quoted_key(self.vlr_id))) @property def name(self): """ Name of this VLR """ - return self._vlr_msg.name + return self._name + + @property + def datacenter(self): + """ RO Account to instantiate the virtual link on """ + return self._vlr_msg.datacenter @property - def cloud_account_name(self): - """ Cloud Account to instantiate the virtual link on """ - return self._vlr_msg.cloud_account + def event_id(self): + """ Event Identifier for this virtual link """ + return self._vlr_id @property def resmgr_path(self): """ path for resource-mgr""" - return ("D,/rw-resource-mgr:resource-mgmt" + - "/vlink-event/vlink-event-data[event-id='{}']".format(self._request_id)) + return self._project.add_project("D,/rw-resource-mgr:resource-mgmt" + + "/vlink-event/vlink-event-data[event-id={}]".format(quoted_key(self.event_id))) @property def operational_status(self): @@ -135,7 +142,7 @@ class VirtualLinkRecord(object): @property def msg(self): """ VLR message for this VLR """ - msg = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr() + msg = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr() msg.copy_from(self._vlr_msg) if self._network_id is not None: @@ -147,18 +154,25 @@ class VirtualLinkRecord(object): if self._assigned_subnet is not None: msg.assigned_subnet = self._assigned_subnet + if self._virtual_cps: + for cp in msg.virtual_connection_points: + for vcp in self._virtual_cps: + if cp.name == vcp['name']: + cp.ip_address = vcp['ip_address'] + cp.mac_address = vcp['mac_address'] + cp.connection_point_id = vcp['connection_point_id'] + break msg.operational_status = self.operational_status msg.operational_status_details = self._state_failed_reason - msg.res_id = self._request_id - + msg.res_id = self.event_id return msg @property def resmgr_msg(self): """ VLR message for this VLR """ - msg = RwResourceMgrYang.VirtualLinkEventData() - msg.event_id = self._request_id - msg.cloud_account = self.cloud_account_name + msg = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData() + msg.event_id = self.event_id + msg.cloud_account = self.datacenter msg.request_info.name = self.name msg.request_info.vim_network_name = self._vlr_msg.vim_network_name msg.request_info.provider_network.from_dict( @@ -167,24 +181,32 @@ class VirtualLinkRecord(object): if self._vlr_msg.has_field('ip_profile_params'): msg.request_info.ip_profile_params.from_dict(self._vlr_msg.ip_profile_params.as_dict()) + for cp in self._vlr_msg.virtual_connection_points: + vcp = msg.request_info.virtual_cps.add() + vcp.from_dict({k:v for k,v in cp.as_dict().items() + if k in ['name','port_security_enabled','type_yang']}) + if (self._vlr_msg.has_field('ip_profile_params')) and (self._vlr_msg.ip_profile_params.has_field('security_group')): + vcp.security_group = self._vlr_msg.ip_profile_params.security_group + return msg @asyncio.coroutine def create_network(self, xact): """ Create network for this VL """ - self._log.debug("Creating network req-id: %s", self._request_id) - return (yield from self.request_network(xact, "create")) + self._log.debug("Creating network event-id: %s:%s", self.event_id, self._vlr_msg) + network_rsp = yield from self.request_network(xact, "create") + return network_rsp @asyncio.coroutine def delete_network(self, xact): """ Delete network for this VL """ - self._log.debug("Deleting network - req-id: %s", self._request_id) + self._log.debug("Deleting network - event-id: %s", self.event_id) return (yield from self.request_network(xact, "delete")) @asyncio.coroutine def read_network(self, xact): """ Read network for this VL """ - self._log.debug("Reading network - req-id: %s", self._request_id) + self._log.debug("Reading network - event-id: %s", self.event_id) return (yield from self.request_network(xact, "read")) @asyncio.coroutine @@ -199,8 +221,7 @@ class VirtualLinkRecord(object): block.add_query_create(self.resmgr_path, self.resmgr_msg) elif action == "delete": self._log.debug("Deleting network path:%s", self.resmgr_path) - if self.resmgr_msg.request_info.name != "multisite": - block.add_query_delete(self.resmgr_path) + block.add_query_delete(self.resmgr_path) elif action == "read": self._log.debug("Reading network path:%s", self.resmgr_path) block.add_query_read(self.resmgr_path) @@ -222,8 +243,7 @@ class VirtualLinkRecord(object): if resp.has_field('resource_info') and resp.resource_info.resource_state == "failed": raise NetworkResourceError(resp.resource_info.resource_errors) - if not (resp.has_field('resource_info') and - resp.resource_info.has_field('virtual_link_id')): + if not resp.has_field('resource_info') : raise NetworkResourceError("Did not get a valid network resource response (resp: %s)", resp) self._log.debug("Got network request response: %s", resp) @@ -240,29 +260,70 @@ class VirtualLinkRecord(object): try: self._state = VirtualLinkRecordState.RESOURCE_ALLOC_PENDING + network_rsp = None if restart == 0: network_resp = yield from self.create_network(xact) else: network_resp = yield from self.read_network(xact) if network_resp == None: - network_resp = yield from self.create_network(xact) - - # Note network_resp.virtual_link_id is CAL assigned network_id. + network_resp = yield from self.create_network(xact) - self._network_id = network_resp.resource_info.virtual_link_id - self._network_pool = network_resp.resource_info.pool_name - self._assigned_subnet = network_resp.resource_info.subnet - - self._state = VirtualLinkRecordState.READY - - yield from self.publish(xact) + if network_resp: + self._state = self.vl_state_from_network_resp(network_resp) + if self._state == VirtualLinkRecordState.READY: + # Move this VL into ready state + yield from self.ready(network_resp, xact) + else: + yield from self.publish(xact) except Exception as e: self._log.error("Instantiatiation of VLR record failed: %s", str(e)) self._state = VirtualLinkRecordState.FAILED self._state_failed_reason = str(e) yield from self.publish(xact) + def vl_state_from_network_resp(self, network_resp): + """ Determine VL state from network response """ + if network_resp.resource_info.resource_state == 'pending': + return VirtualLinkRecordState.RESOURCE_ALLOC_PENDING + elif network_resp.resource_info.resource_state == 'active': + return VirtualLinkRecordState.READY + elif network_resp.resource_info.resource_state == 'failed': + return VirtualLinkRecordState.FAILED + return VirtualLinkRecordState.RESOURCE_ALLOC_PENDING + + @asyncio.coroutine + def ready(self, event_resp, xact): + """ This virtual link is ready """ + # Note network_resp.virtual_link_id is CAL assigned network_id. + self._log.debug("Virtual Link id %s name %s in ready state, event_rsp:%s", + self.vlr_id, + self.name, + event_resp) + self._network_id = event_resp.resource_info.virtual_link_id + self._network_pool = event_resp.resource_info.pool_name + self._assigned_subnet = event_resp.resource_info.subnet + self._virtual_cps = [ vcp.as_dict() + for vcp in event_resp.resource_info.virtual_connection_points ] + + yield from self.publish(xact) + + self._state = VirtualLinkRecordState.READY + + yield from self.publish(xact) + + @asyncio.coroutine + def failed(self, event_resp, xact): + """ This virtual link Failed """ + self._log.debug("Virtual Link id %s name %s failed to instantiate, event_rsp:%s", + self.vlr_id, + self.name, + event_resp) + + self._state = VirtualLinkRecordState.FAILED + + yield from self.publish(xact) + @asyncio.coroutine def publish(self, xact): """ publish this VLR """ @@ -313,6 +374,7 @@ class VlrDtsHandler(object): self._vnsm = vnsm self._regh = None + self._project = vnsm._project @property def regh(self): @@ -322,11 +384,6 @@ class VlrDtsHandler(object): @asyncio.coroutine def register(self): """ Register for the VLR path """ - def on_commit(xact_info): - """ The transaction has been committed """ - self._log.debug("Got vlr commit (xact_info: %s)", xact_info) - - return rwdts.MemberRspCode.ACTION_OK @asyncio.coroutine def on_event(dts, g_reg, xact, xact_event, scratch_data): @@ -369,7 +426,7 @@ class VlrDtsHandler(object): return elif action == rwdts.QueryAction.DELETE: # Delete an VLR record - schema = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.schema() + schema = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.schema() path_entry = schema.keyspec_to_entry(ks_path) self._log.debug("Terminating VLR id %s", path_entry.key00.id) yield from self._vnsm.delete_vlr(path_entry.key00.id, xact_info.xact) @@ -379,26 +436,32 @@ class VlrDtsHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) return - self._log.debug("Registering for VLR using xpath: %s", - VlrDtsHandler.XPATH) + xpath = self._project.add_project(VlrDtsHandler.XPATH) + self._log.debug("Registering for VLR using xpath: {}". + format(xpath)) - reg_handle = rift.tasklets.DTS.RegistrationHandler( - on_commit=on_commit, - on_prepare=on_prepare, - ) + reg_handle = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) handlers = rift.tasklets.Group.Handler(on_event=on_event,) with self._dts.group_create(handler=handlers) as group: self._regh = group.register( - xpath=VlrDtsHandler.XPATH, + xpath=xpath, handler=reg_handle, flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ| rwdts.Flag.DATASTORE, ) + def deregister(self): + self._log.debug("De-register VLR handler for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + @asyncio.coroutine - def create(self, xact, path, msg): + def create(self, xact, xpath, msg): """ Create a VLR record in DTS with path and message """ + path = self._project.add_project(xpath) self._log.debug("Creating VLR xact = %s, %s:%s", xact, path, msg) self.regh.create_element(path, msg) @@ -406,10 +469,11 @@ class VlrDtsHandler(object): xact, path, msg) @asyncio.coroutine - def update(self, xact, path, msg): + def update(self, xact, xpath, msg): """ Update a VLR record in DTS with path and message """ + path = self._project.add_project(xpath) self._log.debug("Updating VLR xact = %s, %s:%s", xact, path, msg) self.regh.update_element(path, msg) @@ -417,10 +481,11 @@ class VlrDtsHandler(object): xact, path, msg) @asyncio.coroutine - def delete(self, xact, path): + def delete(self, xact, xpath): """ Delete a VLR record in DTS with path and message """ + path = self._project.add_project(xpath) self._log.debug("Deleting VLR xact = %s, %s", xact, path) self.regh.delete_element(path) self._log.debug("Deleted VLR xact = %s, %s", xact, path) @@ -453,8 +518,13 @@ class VldDtsHandler(object): "Got on prepare for VLD update (ks_path: %s) (action: %s)", ks_path.to_xpath(VldYang.get_schema()), msg) - schema = VldYang.YangData_Vld_VldCatalog_Vld.schema() + schema = VldYang.YangData_RwProject_Project_VldCatalog_Vld.schema() path_entry = schema.keyspec_to_entry(ks_path) + # TODO: Check why on project delete this gets called + if not path_entry: + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + return + vld_id = path_entry.key00.id disabled_actions = [rwdts.QueryAction.DELETE, rwdts.QueryAction.UPDATE] @@ -476,8 +546,75 @@ class VldDtsHandler(object): handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare) - yield from self._dts.register( - VldDtsHandler.XPATH, + self._regh = yield from self._dts.register( + self._vnsm._project.add_project(VldDtsHandler.XPATH), flags=rwdts.Flag.SUBSCRIBER, handler=handler ) + + def deregister(self): + self._log.debug("De-register VLD handler for project {}". + format(self._vnsm._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + +class VirtualLinkEventListener(object): + """ DTS Listener to listen on Virtual Link related events """ + XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data" + def __init__(self, dts, log, loop, vnsm): + self._dts = dts + self._log = log + self._loop = loop + self._vnsm = vnsm + self._regh = None + + @property + def regh(self): + """ The registration handle assocaited with this Handler""" + return self._regh + + def event_id_from_keyspec(self, ks): + """ Get the event id from the keyspec """ + event_pe = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData.schema().keyspec_to_entry(ks) + try: + # Can get just path without event id when + # deleting project + event_id = event_pe.key00.event_id + except AttributeError: + return None + return event_id + + @asyncio.coroutine + def register(self): + """ Register the Virtual Link Event path """ + @asyncio.coroutine + def on_prepare(xact_info, query_action, ks_path, msg): + """ prepare callback on Virtual Link Events """ + try: + self._log.debug( + "Got on prepare for Virtual Link Event id (ks_path: %s) (msg: %s)", + ks_path.to_xpath(RwResourceMgrYang.get_schema()), msg) + event_id = self.event_id_from_keyspec(ks_path) + if event_id: + if query_action == rwdts.QueryAction.CREATE or query_action == rwdts.QueryAction.UPDATE: + yield from self._vnsm.update_virual_link_event(event_id, msg) + elif query_action == rwdts.QueryAction.DELETE: + self._vnsm.delete_virual_link_event(event_id) + except Exception as e: + self._log.exception("Caught execption in Virtual Link Event handler", e) + + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + + handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare) + + self._regh = yield from self._dts.register( + self._vnsm._project.add_project(VirtualLinkEventListener.XPATH), + flags=rwdts.Flag.SUBSCRIBER, + handler=handler + ) + + def deregister(self): + if self._regh: + self._regh.deregister() + self._regh = None