X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwvnffgmgr.py;fp=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwvnffgmgr.py;h=0ebe9df35f6d74ac027ade5897f4ecc55a78d7af;hb=6f07e6f33f751ab4ffe624f6037f887b243bece2;hp=0000000000000000000000000000000000000000;hpb=72a563886272088feb7cb52e4aafbe6d2c580ff9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwvnffgmgr.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwvnffgmgr.py new file mode 100755 index 00000000..0ebe9df3 --- /dev/null +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwvnffgmgr.py @@ -0,0 +1,422 @@ + +# +# Copyright 2016 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio + +from gi.repository import ( + RwDts as rwdts, + RwsdnYang, + RwTypes, + ProtobufC, +) + +from gi.repository.RwTypes import RwStatus +import rw_peas +import rift.tasklets + +class SdnGetPluginError(Exception): + """ Error while fetching SDN plugin """ + pass + + +class SdnGetInterfaceError(Exception): + """ Error while fetching SDN interface""" + pass + + +class SdnAccountError(Exception): + """ Error while creating/deleting/updating SDN Account""" + pass + +class VnffgrDoesNotExist(Exception): + """ Error while fetching SDN interface""" + pass + +class VnffgrAlreadyExist(Exception): + """ Vnffgr already exists Error""" + pass + +class VnffgrCreationFailed(Exception): + """ Error while creating VNFFGR""" + pass + + +class VnffgrUpdateFailed(Exception): + """ Error while updating VNFFGR""" + pass + +class VnffgMgr(object): + """ Implements the interface to backend plugins to fetch topology """ + def __init__(self, dts, log, log_hdl, loop): + self._account = {} + self._dts = dts + self._log = log + self._log_hdl = log_hdl + self._loop = loop + self._sdn = {} + self._sdn_handler = SDNAccountDtsHandler(self._dts,self._log,self) + self._vnffgr_list = {} + + @asyncio.coroutine + def register(self): + yield from self._sdn_handler.register() + + def set_sdn_account(self,account): + if (account.name in self._account): + self._log.error("SDN Account is already set") + else: + sdn_account = RwsdnYang.SDNAccount() + sdn_account.from_dict(account.as_dict()) + sdn_account.name = account.name + self._account[account.name] = sdn_account + self._log.debug("Account set is %s , %s",type(self._account), self._account) + + def del_sdn_account(self, name): + self._log.debug("Account deleted is %s , %s", type(self._account), name) + del self._account[name] + + def update_sdn_account(self,account): + self._log.debug("Account updated is %s , %s", type(self._account), account) + if account.name in self._account: + sdn_account = self._account[account.name] + + sdn_account.from_dict( + account.as_dict(), + ignore_missing_keys=True, + ) + self._account[account.name] = sdn_account + + def get_sdn_account(self, name): + """ + Creates an object for class RwsdnYang.SdnAccount() + """ + if (name in self._account): + return self._account[name] + else: + self._log.error("SDN account is not configured") + + + def get_sdn_plugin(self,name): + """ + Loads rw.sdn plugin via libpeas + """ + if (name in self._sdn): + return self._sdn[name] + account = self.get_sdn_account(name) + plugin_name = getattr(account, account.account_type).plugin_name + self._log.debug("SDN plugin being created") + plugin = rw_peas.PeasPlugin(plugin_name, 'RwSdn-1.0') + engine, info, extension = plugin() + + self._sdn[name] = plugin.get_interface("Topology") + try: + rc = self._sdn[name].init(self._log_hdl) + assert rc == RwStatus.SUCCESS + except: + self._log.error("ERROR:SDN plugin instantiation failed ") + else: + self._log.debug("SDN plugin successfully instantiated") + return self._sdn[name] + + def fetch_vnffgr(self,vnffgr_id): + if vnffgr_id not in self._vnffgr_list: + self._log.error("VNFFGR with id %s not present in VNFFGMgr", vnffgr_id) + msg = "VNFFGR with id {} not present in VNFFGMgr".format(vnffgr_id) + raise VnffgrDoesNotExist(msg) + self.update_vnffgrs(self._vnffgr_list[vnffgr_id].sdn_account) + vnffgr = self._vnffgr_list[vnffgr_id].deep_copy() + self._log.debug("VNFFGR for id %s is %s",vnffgr_id,vnffgr) + return vnffgr + + def create_vnffgr(self, vnffgr,classifier_list,sff_list): + """ + """ + self._log.debug("Received VNFFG chain Create msg %s",vnffgr) + if vnffgr.id in self._vnffgr_list: + self._log.error("VNFFGR with id %s already present in VNFFGMgr", vnffgr.id) + vnffgr.operational_status = 'failed' + msg = "VNFFGR with id {} already present in VNFFGMgr".format(vnffgr.id) + raise VnffgrAlreadyExist(msg) + + self._vnffgr_list[vnffgr.id] = vnffgr + vnffgr.operational_status = 'init' + if len(self._account) == 0: + self._log.error("SDN Account not configured") + vnffgr.operational_status = 'failed' + return + if vnffgr.sdn_account: + sdn_acct_name = vnffgr.sdn_account + else: + self._log.error("SDN Account is not associated to create VNFFGR") + # TODO Fail the VNFFGR creation if SDN account is not associated + #vnffgr.operational_status = 'failed' + #msg = "SDN Account is not associated to create VNFFGR" + #raise VnffgrCreationFailed(msg) + sdn_account = [sdn_account.name for _,sdn_account in self._account.items()] + sdn_acct_name = sdn_account[0] + vnffgr.sdn_account = sdn_acct_name + sdn_plugin = self.get_sdn_plugin(sdn_acct_name) + + for rsp in vnffgr.rsp: + vnffg = RwsdnYang.VNFFGChain() + vnffg.name = rsp.name + vnffg.classifier_name = rsp.classifier_name + + vnfr_list = list() + for index,cp_ref in enumerate(rsp.vnfr_connection_point_ref): + cpath = vnffg.vnf_chain_path.add() + cpath.order=cp_ref.hop_number + cpath.service_function_type = cp_ref.service_function_type + cpath.nsh_aware=True + cpath.transport_type = 'vxlan-gpe' + + vnfr=cpath.vnfr_ids.add() + vnfr.vnfr_id = cp_ref.vnfr_id_ref + vnfr.vnfr_name = cp_ref.vnfr_name_ref + vnfr.mgmt_address = cp_ref.connection_point_params.mgmt_address + vnfr.mgmt_port = 5000 + vnfr_list.append(vnfr) + + vdu = vnfr.vdu_list.add() + vdu.name = cp_ref.connection_point_params.name + vdu.port_id = cp_ref.connection_point_params.port_id + vdu.vm_id = cp_ref.connection_point_params.vm_id + vdu.address = cp_ref.connection_point_params.address + vdu.port = cp_ref.connection_point_params.port + + for sff in sff_list.values(): + _sff = vnffg.sff.add() + _sff.from_dict(sff.as_dict()) + if sff.function_type == 'SFF': + for vnfr in vnfr_list: + vnfr.sff_name = sff.name + self._log.debug("Recevied SFF %s, Created SFF is %s",sff, _sff) + + self._log.debug("VNFFG chain msg is %s",vnffg) + rc,rs = sdn_plugin.create_vnffg_chain(self._account[sdn_acct_name],vnffg) + if rc != RwTypes.RwStatus.SUCCESS: + vnffgr.operational_status = 'failed' + msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id) + raise VnffgrCreationFailed(msg) + + self._log.info("VNFFG chain created successfully for rsp with id %s",rsp.id) + + + meta = {} + if(len(classifier_list) == 2): + meta[vnffgr.classifier[0].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[1].ip_address.split('.')) + meta[vnffgr.classifier[1].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[0].ip_address.split('.')) + + self._log.debug("VNFFG Meta VNFFG chain is {}".format(meta)) + + for classifier in classifier_list: + vnffgr_cl = [_classifier for _classifier in vnffgr.classifier if classifier.id == _classifier.id] + if len(vnffgr_cl) > 0: + cl_rsp_name = vnffgr_cl[0].rsp_name + else: + self._log.error("No RSP wiht name %s found; Skipping classifier %s creation",classifier.rsp_id_ref,classifier.name) + continue + vnffgcl = RwsdnYang.VNFFGClassifier() + vnffgcl.name = classifier.name + vnffgcl.rsp_name = cl_rsp_name + vnffgcl.port_id = vnffgr_cl[0].port_id + vnffgcl.vm_id = vnffgr_cl[0].vm_id + # Get the symmetric classifier endpoint ip and set it in nsh ctx1 + + vnffgcl.vnffg_metadata.ctx1 = meta.get(vnffgr_cl[0].id,'0') + vnffgcl.vnffg_metadata.ctx2 = '0' + vnffgcl.vnffg_metadata.ctx3 = '0' + vnffgcl.vnffg_metadata.ctx4 = '0' + if vnffgr_cl[0].has_field('sff_name'): + vnffgcl.sff_name = vnffgr_cl[0].sff_name + for index,match_rule in enumerate(classifier.match_attributes): + acl = vnffgcl.match_attributes.add() + #acl.name = vnffgcl.name + str(index) + acl.name = match_rule.id + acl.ip_proto = match_rule.ip_proto + acl.source_ip_address = match_rule.source_ip_address + '/32' + acl.source_port = match_rule.source_port + acl.destination_ip_address = match_rule.destination_ip_address + '/32' + acl.destination_port = match_rule.destination_port + + self._log.debug(" Creating VNFFG Classifier Classifier %s for RSP: %s",vnffgcl.name,vnffgcl.rsp_name) + rc,rs = sdn_plugin.create_vnffg_classifier(self._account[sdn_acct_name],vnffgcl) + if rc != RwTypes.RwStatus.SUCCESS: + self._log.error("VNFFG Classifier cretaion failed for Classifier %s for RSP ID: %s",classifier.name,classifier.rsp_id_ref) + #vnffgr.operational_status = 'failed' + #msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id) + #raise VnffgrCreationFailed(msg) + + vnffgr.operational_status = 'running' + self.update_vnffgrs(vnffgr.sdn_account) + return vnffgr + + def update_vnffgrs(self,sdn_acct_name): + """ + Update VNFFGR by reading data from SDN Plugin + """ + sdn_plugin = self.get_sdn_plugin(sdn_acct_name) + rc,rs = sdn_plugin.get_vnffg_rendered_paths(self._account[sdn_acct_name]) + if rc != RwTypes.RwStatus.SUCCESS: + msg = "Reading of VNFFGR from SDN Plugin failed" + raise VnffgrUpdateFailed(msg) + + vnffgr_list = [_vnffgr for _vnffgr in self._vnffgr_list.values() if _vnffgr.sdn_account == sdn_acct_name and _vnffgr.operational_status == 'running'] + + for _vnffgr in vnffgr_list: + for _vnffgr_rsp in _vnffgr.rsp: + vnffg_rsp_list = [vnffg_rsp for vnffg_rsp in rs.vnffg_rendered_path if vnffg_rsp.name == _vnffgr_rsp.name] + if vnffg_rsp_list is not None and len(vnffg_rsp_list) > 0: + vnffg_rsp = vnffg_rsp_list[0] + if len(vnffg_rsp.rendered_path_hop) != len(_vnffgr_rsp.vnfr_connection_point_ref): + _vnffgr.operational_status = 'failed' + self._log.error("Received hop count %d doesnt match the VNFFGD hop count %d", len(vnffg_rsp.rendered_path_hop), + len(_vnffgr_rsp.vnfr_connection_point_ref)) + msg = "Fetching of VNFFGR with id {} failed".format(_vnffgr.id) + raise VnffgrUpdateFailed(msg) + _vnffgr_rsp.path_id = vnffg_rsp.path_id + for index, rendered_hop in enumerate(vnffg_rsp.rendered_path_hop): + for vnfr_cp_ref in _vnffgr_rsp.vnfr_connection_point_ref: + if rendered_hop.vnfr_name == vnfr_cp_ref.vnfr_name_ref: + vnfr_cp_ref.hop_number = rendered_hop.hop_number + vnfr_cp_ref.service_index = rendered_hop.service_index + vnfr_cp_ref.service_function_forwarder.name = rendered_hop.service_function_forwarder.name + vnfr_cp_ref.service_function_forwarder.ip_address = rendered_hop.service_function_forwarder.ip_address + vnfr_cp_ref.service_function_forwarder.port = rendered_hop.service_function_forwarder.port + else: + _vnffgr.operational_status = 'failed' + self._log.error("VNFFGR RSP with name %s in VNFFG %s not found",_vnffgr_rsp.name, _vnffgr.id) + msg = "Fetching of VNFFGR with name {} failed".format(_vnffgr_rsp.name) + raise VnffgrUpdateFailed(msg) + + + def terminate_vnffgr(self,vnffgr_id,sdn_account_name = None): + """ + Deletet the VNFFG chain + """ + if vnffgr_id not in self._vnffgr_list: + self._log.error("VNFFGR with id %s not present in VNFFGMgr during termination", vnffgr_id) + msg = "VNFFGR with id {} not present in VNFFGMgr during termination".format(vnffgr_id) + return + #raise VnffgrDoesNotExist(msg) + self._log.info("Received VNFFG chain terminate for id %s",vnffgr_id) + if sdn_account_name is None: + sdn_account = [sdn_account.name for _,sdn_account in self._account.items()] + sdn_account_name = sdn_account[0] + sdn_plugin = self.get_sdn_plugin(sdn_account_name) + sdn_plugin.terminate_vnffg_chain(self._account[sdn_account_name],vnffgr_id) + sdn_plugin.terminate_vnffg_classifier(self._account[sdn_account_name],vnffgr_id) + del self._vnffgr_list[vnffgr_id] + +class SDNAccountDtsHandler(object): + XPATH = "C,/rw-sdn:sdn-account" + + def __init__(self, dts, log, parent): + self._dts = dts + self._log = log + self._parent = parent + + self._sdn_account = {} + + def _set_sdn_account(self, account): + self._log.info("Setting sdn account: {}".format(account)) + if account.name in self._sdn_account: + self._log.error("SDN Account with name %s already exists. Ignoring config", account.name); + self._sdn_account[account.name] = account + self._parent.set_sdn_account(account) + + def _del_sdn_account(self, account_name): + self._log.info("Deleting sdn account: {}".format(account_name)) + del self._sdn_account[account_name] + + self._parent.del_sdn_account(account_name) + + def _update_sdn_account(self, account): + self._log.info("Updating sdn account: {}".format(account)) + # No need to update locally saved sdn_account's updated fields, as they + # are not used anywhere. Call the parent's update callback. + self._parent.update_sdn_account(account) + + @asyncio.coroutine + def register(self): + def apply_config(dts, acg, xact, action, _): + self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action) + if action == rwdts.AppconfAction.INSTALL and xact.id is None: + self._log.debug("No xact handle. Skipping apply config") + return RwTypes.RwStatus.SUCCESS + + return RwTypes.RwStatus.SUCCESS + + @asyncio.coroutine + def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): + """ Prepare callback from DTS for SDN Account config """ + + self._log.info("SDN Cloud account config received: %s", msg) + + fref = ProtobufC.FieldReference.alloc() + fref.goto_whole_message(msg.to_pbcm()) + + if fref.is_field_deleted(): + # Delete the sdn account record + self._del_sdn_account(msg.name) + else: + # If the account already exists, then this is an update. + if msg.name in self._sdn_account: + self._log.debug("SDN account already exists. Invoking on_prepare update request") + if msg.has_field("account_type"): + errmsg = "Cannot update SDN account's account-type." + self._log.error(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + SDNAccountDtsHandler.XPATH, + errmsg) + raise SdnAccountError(errmsg) + + # Update the sdn account record + self._update_sdn_account(msg) + else: + self._log.debug("SDN account does not already exist. Invoking on_prepare add request") + if not msg.has_field('account_type'): + errmsg = "New SDN account must contain account-type field." + self._log.error(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + SDNAccountDtsHandler.XPATH, + errmsg) + raise SdnAccountError(errmsg) + + # Set the sdn account record + self._set_sdn_account(msg) + + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + + + self._log.debug("Registering for Sdn Account config using xpath: %s", + SDNAccountDtsHandler.XPATH, + ) + + acg_handler = rift.tasklets.AppConfGroup.Handler( + on_apply=apply_config, + ) + + with self._dts.appconf_group_create(acg_handler) as acg: + acg.register( + xpath=SDNAccountDtsHandler.XPATH, + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY, + on_prepare=on_prepare + ) + + +