X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwnsmtasklet.py;h=29676d19289cdc00370c79522597374e5ab62fa1;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=e600b9af38d2701101f09fe6f5f9c6de6ee3dab7;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index e600b9af..29676d19 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -16,37 +16,46 @@ # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 import asyncio +import gi +import json import ncclient import ncclient.asyncio_manager import os +import requests import shutil import sys import tempfile import time import uuid import yaml -import requests -import json - -from collections import deque from collections import defaultdict +from collections import deque from enum import Enum +from urllib.parse import urlparse + +# disable unsigned certificate warning +from requests.packages.urllib3.exceptions import InsecureRequestWarning +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) -import gi gi.require_version('RwYang', '1.0') -gi.require_version('RwNsdYang', '1.0') +gi.require_version('NsdBaseYang', '1.0') +gi.require_version('ProjectNsdYang', '1.0') gi.require_version('RwDts', '1.0') gi.require_version('RwNsmYang', '1.0') gi.require_version('RwNsrYang', '1.0') +gi.require_version('NsrYang', '1.0') gi.require_version('RwTypes', '1.0') gi.require_version('RwVlrYang', '1.0') gi.require_version('RwVnfrYang', '1.0') +gi.require_version('VnfrYang', '1.0') +gi.require_version('ProjectVnfdYang', '1.0') from gi.repository import ( RwYang, RwNsrYang, NsrYang, - NsdYang, + NsdBaseYang, + ProjectNsdYang as NsdYang, RwVlrYang, VnfrYang, RwVnfrYang, @@ -54,22 +63,36 @@ from gi.repository import ( RwsdnalYang, RwDts as rwdts, RwTypes, + ProjectVnfdYang, ProtobufC, ) +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key -import rift.tasklets +from rift.mano.utils.ssh_keys import ManoSshKey import rift.mano.ncclient import rift.mano.config_data.config import rift.mano.dts as mano_dts +import rift.tasklets +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + get_add_delete_update_cfgs, + DEFAULT_PROJECT, + ) from . import rwnsm_conman as conman from . import cloud from . import publisher +from . import subscriber from . import xpath from . import config_value_pool from . import rwvnffgmgr from . import scale_group - +from . import rwnsmplugin +from . import openmano_nsm +import functools +import collections class NetworkServiceRecordState(Enum): """ Network Service Record State """ @@ -154,6 +177,10 @@ class NsrNsdUpdateError(Exception): class NsrVlUpdateError(NsrNsdUpdateError): pass +class VirtualLinkRecordError(Exception): + """ Virtual Links Record Error """ + pass + class VlRecordState(Enum): """ VL Record State """ @@ -179,7 +206,7 @@ class VnffgRecord(object): """ Vnffg Records class""" SFF_DP_PORT = 4790 SFF_MGMT_PORT = 5000 - def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name): + def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name,cloud_account_name): self._dts = dts self._log = log @@ -188,6 +215,7 @@ class VnffgRecord(object): self._nsr = nsr self._nsr_name = nsr_name self._vnffgd_msg = vnffgd_msg + self._cloud_account_name = cloud_account_name if sdn_account_name is None: self._sdn_account_name = '' else: @@ -219,7 +247,7 @@ class VnffgRecord(object): "sdn_account": self._sdn_account_name, "operational_status": 'init', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) elif self._vnffgr_state == VnffgRecordState.TERMINATED: vnffgr_dict = {"id": self._vnffgr_id, "vnffgd_id_ref": self._vnffgd_msg.id, @@ -227,7 +255,7 @@ class VnffgRecord(object): "sdn_account": self._sdn_account_name, "operational_status": 'terminated', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) else: try: vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id) @@ -240,7 +268,7 @@ class VnffgRecord(object): "sdn_account": self._sdn_account_name, "operational_status": 'failed', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) return vnffgr @@ -251,8 +279,9 @@ class VnffgRecord(object): "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, + "cloud_account": self._cloud_account_name, } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) for rsp in self._vnffgd_msg.rsp: vnffgr_rsp = vnffgr.rsp.add() vnffgr_rsp.id = str(uuid.uuid4()) @@ -264,9 +293,11 @@ class VnffgRecord(object): vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref] self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd) if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'): - self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type) + self._log.debug("Service Function Type for VNFD ID %s is %s", + rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type) else: - self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref) + self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain", + rsp_cp_ref.vnfd_id_ref) continue vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add() @@ -287,7 +318,8 @@ class VnffgRecord(object): self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) if vnfr.operational_status == 'failed': self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % + (self.id, vnfr.id)) yield from asyncio.sleep(2, loop=self._loop) vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) @@ -298,8 +330,8 @@ class VnffgRecord(object): vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name for vdu in vnfr.vdur: - for ext_intf in vdu.external_interface: - if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref: + for intf in vdu.interface: + if intf.type_yang == "EXTERNAL" and intf.external_connection_point_ref == vnfr_cp_ref.vnfr_connection_point_ref: vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, vnfr_cp_ref.connection_point_params.vm_id) @@ -314,7 +346,8 @@ class VnffgRecord(object): rsp_id_ref = _rsp[0].id rsp_name = _rsp[0].name else: - self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id) + self._log.error("RSP with ID %s not found during classifier creation for classifier id %s", + vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id) continue vnffgr_classifier = vnffgr.classifier.add() vnffgr_classifier.id = vnffgd_classifier.id @@ -338,7 +371,8 @@ class VnffgRecord(object): self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) if vnfr.operational_status == 'failed': self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % + (self.id, vnfr.id)) yield from asyncio.sleep(2, loop=self._loop) vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) @@ -348,11 +382,12 @@ class VnffgRecord(object): vnffgr_classifier.port_id = cp.connection_point_id vnffgr_classifier.ip_address = cp.ip_address for vdu in vnfr.vdur: - for ext_intf in vdu.external_interface: - if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref: + for intf in vdu.interface: + if intf.type_yang == "EXTERNAL" and intf.external_connection_point_ref == vnffgr_classifier.vnfr_connection_point_ref: vnffgr_classifier.vm_id = vdu.vim_id - self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, - vnfr_cp_ref.connection_point_params.vm_id) + self._log.debug("VIM ID for CP %s in VNFR %s is %s", + cp.name,nsr_vnfr.id, + vnfr_cp_ref.connection_point_params.vm_id) break self._log.info("VNFFGR msg to be sent is %s", vnffgr) @@ -377,7 +412,7 @@ class VnffgRecord(object): vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) - sff = RwsdnalYang.VNFFGSff() + sff = RwsdnalYang.YangData_RwProject_Project_Vnffgs_VnffgChain_Sff() sff_list[nsr_vnfr.vnfd.id] = sff sff.name = nsr_vnfr.name sff.function_type = nsr_vnfr.vnfd.service_function_chain @@ -453,7 +488,8 @@ class VirtualLinkRecord(object): XPATH = "D,/vlr:vlr-catalog/vlr:vlr" @staticmethod @asyncio.coroutine - def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False): + def create_record(dts, log, loop, project, nsr_name, vld_msg, + datacenter, ip_profile, nsr_id, restart_mode=False): """Creates a new VLR object based on the given data. If restart mode is enabled, then we look for existing records in the @@ -466,17 +502,17 @@ class VirtualLinkRecord(object): dts, log, loop, + project, nsr_name, vld_msg, - cloud_account_name, - om_datacenter, + datacenter, ip_profile, nsr_id, ) if restart_mode: res_iter = yield from dts.query_read( - "D,/vlr:vlr-catalog/vlr:vlr", + project.add_project("D,/vlr:vlr-catalog/vlr:vlr"), rwdts.XactFlag.MERGE) for fut in res_iter: @@ -492,14 +528,15 @@ class VirtualLinkRecord(object): return vlr_obj - def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id): + def __init__(self, dts, log, loop, project, nsr_name, vld_msg, + datacenter, ip_profile, nsr_id): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_name = nsr_name self._vld_msg = vld_msg - self._cloud_account_name = cloud_account_name - self._om_datacenter_name = om_datacenter + self._datacenter_name = datacenter self._assigned_subnet = None self._nsr_id = nsr_id self._ip_profile = ip_profile @@ -507,11 +544,13 @@ class VirtualLinkRecord(object): self._state = VlRecordState.INIT self._prev_state = None self._create_time = int(time.time()) + self.state_failed_reason = None @property def xpath(self): """ path for this object """ - return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id) + return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]". + format(quoted_key(self._vlr_id))) @property def id(self): @@ -545,22 +584,17 @@ class VirtualLinkRecord(object): # This is a temporary hack to identify manually provisioned inter-site network return self.vld_msg.name else: - return self._nsr_name + "." + self.vld_msg.name + return self._project.name + "." +self._nsr_name + "." + self.vld_msg.name @property - def cloud_account_name(self): - """ Cloud account that this VLR should be created in """ - return self._cloud_account_name - - @property - def om_datacenter_name(self): + def datacenter_name(self): """ Datacenter that this VLR should be created in """ - return self._om_datacenter_name + return self._datacenter_name @staticmethod def vlr_xpath(vlr): """ Get the VLR path from VLR """ - return (VirtualLinkRecord.XPATH + "[vlr:id = '{}']").format(vlr.id) + return (VirtualLinkRecord.XPATH + "[vlr:id={}]").format(quoted_key(vlr.id)) @property def state(self): @@ -601,15 +635,20 @@ class VirtualLinkRecord(object): "vld_ref": self.vld_msg.id, "name": self.name, "create_time": self._create_time, - "cloud_account": self.cloud_account_name, - "om_datacenter": self.om_datacenter_name, + "datacenter": self._datacenter_name, } if self._ip_profile and self._ip_profile.has_field('ip_profile_params'): vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict() + vlr_dict.update(vld_copy_dict) - vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict) + vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict) + + if self.vld_msg.has_field('virtual_connection_points'): + for cp in self.vld_msg.virtual_connection_points: + vcp = vlr.virtual_connection_points.add() + vcp.from_dict(cp.as_dict()) return vlr def reset_id(self, vlr_id): @@ -617,18 +656,16 @@ class VirtualLinkRecord(object): def create_nsr_vlr_msg(self, vnfrs): """ The VLR message""" - nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr() + nsr_vlr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vlr() nsr_vlr.vlr_ref = self._vlr_id nsr_vlr.assigned_subnet = self.assigned_subnet - nsr_vlr.cloud_account = self.cloud_account_name - nsr_vlr.om_datacenter = self.om_datacenter_name + nsr_vlr.datacenter = self._datacenter_name for conn in self.vld_msg.vnfd_connection_point_ref: for vnfr in vnfrs: if (vnfr.vnfd.id == conn.vnfd_id_ref and vnfr.member_vnf_index == conn.member_vnf_index_ref and - self.cloud_account_name == vnfr.cloud_account_name and - self.om_datacenter_name == vnfr.om_datacenter_name): + self._datacenter_name == vnfr._datacenter_name): cp_entry = nsr_vlr.vnfr_connection_point_ref.add() cp_entry.vnfr_id = vnfr.id cp_entry.connection_point = conn.vnfd_connection_point_ref @@ -666,7 +703,6 @@ class VirtualLinkRecord(object): self._log.info("Instantiated VL with xpath %s and vlr:%s", self.xpath, vlr) - self._state = VlRecordState.ACTIVE self._assigned_subnet = vlr.assigned_subnet def vlr_in_vns(self): @@ -698,6 +734,18 @@ class VirtualLinkRecord(object): self._state = VlRecordState.TERMINATED self._log.debug("Terminated VL id:%s", self.id) + def set_state_from_op_status(self, operational_status): + """ Set the state of this VL based on operational_status""" + + self._log.debug("set_state_from_op_status called for vlr id %s with value %s", self.id, operational_status) + if operational_status == 'running': + self._state = VlRecordState.ACTIVE + elif operational_status == 'failed': + self._state = VlRecordState.FAILED + elif operational_status == 'vl_alloc_pending': + self._state = VlRecordState.INSTANTIATION_PENDING + else: + raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status)) class VnfRecordState(Enum): """ Vnf Record State """ @@ -715,9 +763,9 @@ class VirtualNetworkFunctionRecord(object): @staticmethod @asyncio.coroutine - def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name, - cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id, - placement_groups, restart_mode=False): + def create_record(dts, log, loop, project, vnfd, nsr_config, const_vnfd_msg, nsd_id, nsr_name, + datacenter_name, nsr_id, group_name, group_instance_id, + placement_groups, cloud_config, restart_mode=False): """Creates a new VNFR object based on the given data. If restart mode is enabled, then we look for existing records in the @@ -726,25 +774,28 @@ class VirtualNetworkFunctionRecord(object): Returns: VirtualNetworkFunctionRecord """ + vnfr_obj = VirtualNetworkFunctionRecord( dts, log, loop, + project, vnfd, + nsr_config, const_vnfd_msg, nsd_id, nsr_name, - cloud_account_name, - om_datacenter_name, + datacenter_name, nsr_id, group_name, group_instance_id, placement_groups, + cloud_config, restart_mode=restart_mode) if restart_mode: res_iter = yield from dts.query_read( - "D,/vnfr:vnfr-catalog/vnfr:vnfr", + project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr"), rwdts.XactFlag.MERGE) for fut in res_iter: @@ -761,30 +812,36 @@ class VirtualNetworkFunctionRecord(object): dts, log, loop, + project, vnfd, + nsr_config, const_vnfd_msg, nsd_id, nsr_name, - cloud_account_name, - om_datacenter_name, + datacenter_name, nsr_id, group_name=None, group_instance_id=None, placement_groups = [], + cloud_config = None, restart_mode = False): self._dts = dts self._log = log self._loop = loop + self._project = project self._vnfd = vnfd + self._nsr_config = nsr_config self._const_vnfd_msg = const_vnfd_msg self._nsd_id = nsd_id self._nsr_name = nsr_name self._nsr_id = nsr_id - self._cloud_account_name = cloud_account_name - self._om_datacenter_name = om_datacenter_name + self._datacenter_name = datacenter_name self._group_name = group_name self._group_instance_id = group_instance_id self._placement_groups = placement_groups + self._cloud_config = cloud_config + self.restart_mode = restart_mode + self._config_status = NsrYang.ConfigStates.INIT self._create_time = int(time.time()) @@ -792,15 +849,20 @@ class VirtualNetworkFunctionRecord(object): self._state = VnfRecordState.INIT self._state_failed_reason = None + self._active_vdus = 0 + self.config_store = rift.mano.config_data.config.ConfigStore(self._log) self.configure() self._vnfr_id = str(uuid.uuid4()) self._name = None + + self.substitute_vnf_input_parameters = VnfInputParameterSubstitution(self._log, + self._const_vnfd_msg, + self._project) self._vnfr_msg = self.create_vnfr_msg() self._log.debug("Set VNFR {} config type to {}". format(self.name, self.config_type)) - self.restart_mode = restart_mode if group_name is None and group_instance_id is not None: @@ -814,7 +876,8 @@ class VirtualNetworkFunctionRecord(object): @property def xpath(self): """ VNFR xpath """ - return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id) + return self._project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]" + .format(quoted_key(self.id))) @property def vnfr_msg(self): @@ -824,7 +887,8 @@ class VirtualNetworkFunctionRecord(object): @property def const_vnfr_msg(self): """ VNFR message """ - return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name) + return RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConstituentVnfrRef( + vnfr_id=self.id, datacenter=self._datacenter_name) @property def vnfd(self): @@ -832,14 +896,9 @@ class VirtualNetworkFunctionRecord(object): return self._vnfd @property - def cloud_account_name(self): - """ Cloud account that this VNF should be created in """ - return self._cloud_account_name - - @property - def om_datacenter_name(self): + def datacenter_name(self): """ Datacenter that this VNF should be created in """ - return self._om_datacenter_name + return self._datacenter_name @property @@ -873,7 +932,7 @@ class VirtualNetworkFunctionRecord(object): if self._name is not None: return self._name - name_tags = [self._nsr_name] + name_tags = [self._project.name, self._nsr_name] if self._group_name is not None: name_tags.append(self._group_name) @@ -890,7 +949,8 @@ class VirtualNetworkFunctionRecord(object): @staticmethod def vnfr_xpath(vnfr): """ Get the VNFR path from VNFR """ - return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id) + return (VirtualNetworkFunctionRecord.XPATH + + "[vnfr:id={}]").format(quoted_key(vnfr.id)) @property def config_type(self): @@ -925,6 +985,7 @@ class VirtualNetworkFunctionRecord(object): def configure(self): self.config_store.merge_vnfd_config( + self._project.name, self._nsd_id, self._vnfd, self.member_vnf_index, @@ -944,15 +1005,14 @@ class VirtualNetworkFunctionRecord(object): "id": self.id, "nsr_id_ref": self._nsr_id, "name": self.name, - "cloud_account": self._cloud_account_name, - "om_datacenter": self._om_datacenter_name, + "datacenter": self._datacenter_name, "config_status": self.config_status } vnfr_dict.update(vnfd_copy_dict) - vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) - vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(), - ignore_missing_keys=True) + vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict) + vnfr.vnfd = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd. \ + from_dict(self.vnfd.as_dict()) vnfr.member_vnf_index_ref = self.member_vnf_index vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict()) @@ -963,10 +1023,21 @@ class VirtualNetworkFunctionRecord(object): group = vnfr.placement_groups_info.add() group.from_dict(group_info.as_dict()) + if self._cloud_config and len(self._cloud_config.as_dict()): + self._log.debug("Cloud config during vnfr create is {}".format(self._cloud_config)) + vnfr.cloud_config = self._cloud_config + # UI expects the monitoring param field to exist vnfr.monitoring_param = [] self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr)) + + if self.restart_mode: + vnfr.operational_status = 'init' + else: + # Set Operational Status as pre-init for Input Param Substitution + vnfr.operational_status = 'pre_init' + return vnfr @asyncio.coroutine @@ -975,7 +1046,7 @@ class VirtualNetworkFunctionRecord(object): format(self.name, self.vnfr_msg)) yield from self._dts.query_update( self.xpath, - rwdts.XactFlag.TRACE, + rwdts.XactFlag.REPLACE, self.vnfr_msg ) @@ -1001,8 +1072,8 @@ class VirtualNetworkFunctionRecord(object): format(self.name, self._config_status, self.config_type, status)) if self._config_status == NsrYang.ConfigStates.CONFIGURED: - self._log.error("Updating already configured VNFR {}". - format(self.name)) + self._log.warning("Updating already configured VNFR {}". + format(self.name)) return if self._config_status != status: @@ -1013,8 +1084,7 @@ class VirtualNetworkFunctionRecord(object): # But not sure whats the use of this variable? self.vnfr_msg.config_status = status_to_string(status) except Exception as e: - self._log.error("Exception=%s", str(e)) - pass + self._log.exception("Exception=%s", str(e)) self._log.debug("Updated VNFR {} status to {}".format(self.name, status)) @@ -1036,6 +1106,49 @@ class VirtualNetworkFunctionRecord(object): return False + @asyncio.coroutine + def update_config_primitives(self, vnf_config, nsr): + # Update only after we are configured + if self._config_status == NsrYang.ConfigStates.INIT: + return + + if not vnf_config.as_dict(): + return + + self._log.debug("Update VNFR {} config: {}". + format(self.name, vnf_config.as_dict())) + + # Update config primitive + updated = False + for prim in self._vnfd.vnf_configuration.config_primitive: + for p in vnf_config.config_primitive: + if prim.name == p.name: + for param in prim.parameter: + for pa in p.parameter: + if pa.name == param.name: + if pa.default_value and \ + (pa.default_value != param.default_value): + param.default_value = pa.default_value + param.read_only = pa.read_only + updated = True + break + self._log.debug("Prim: {}".format(prim.as_dict())) + break + + if updated: + self._log.debug("Updated VNFD {} config: {}". + format(self._vnfd.name, + self._vnfd.vnf_configuration)) + self._vnfr_msg = self.create_vnfr_msg() + + try: + yield from nsr.nsm_plugin.update_vnfr(self) + except Exception as e: + self._log.error("Exception updating VNFM with new config " + "primitive for VNFR {}: {}". + format(self.name, e)) + self._log.exception(e) + @asyncio.coroutine def instantiate(self, nsr): """ Instantiate this VNFR""" @@ -1050,20 +1163,22 @@ class VirtualNetworkFunctionRecord(object): def find_vlr_for_cp(conn): """ Find VLR for the given connection point """ - for vlr in nsr.vlrs: + for vlr_id, vlr in nsr.vlrs.items(): for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref: if (vnfd_cp.vnfd_id_ref == self._vnfd.id and vnfd_cp.vnfd_connection_point_ref == conn.name and vnfd_cp.member_vnf_index_ref == self.member_vnf_index and - vlr.cloud_account_name == self.cloud_account_name): + vlr._datacenter_name == self._datacenter_name): self._log.debug("Found VLR for cp_name:%s and vnf-index:%d", conn.name, self.member_vnf_index) return vlr return None # For every connection point in the VNFD fill in the identifier + self._log.debug("Add connection point for VNF %s: %s", + self.vnfr_msg.name, self._vnfd.connection_point) for conn_p in self._vnfd.connection_point: - cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint() + cpr = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint() cpr.name = conn_p.name cpr.type_yang = conn_p.type_yang if conn_p.has_field('port_security_enabled'): @@ -1077,24 +1192,30 @@ class VirtualNetworkFunctionRecord(object): continue cpr.vlr_ref = vlr_ref.id + self.vnfr_msg.connection_point.append(cpr) self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s", cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id) + self._log.debug("VNFR {} restart mode {}". + format(self.vnfr_msg.id, self.restart_mode)) if not self.restart_mode: - yield from self._dts.query_create(self.xpath, - 0, # this is sub - self.vnfr_msg) + # Checking for NS Terminate. + if nsr._ns_terminate_received == False: + # Create with pre-init operational state publishes the vnfr for substitution. + yield from self._dts.query_create(self.xpath, 0, self.vnfr_msg) + # Call to substitute VNF Input Parameter + self.substitute_vnf_input_parameters(self.vnfr_msg, self._nsr_config) + # Calling Update with pre-init operational data after Param substitution to instatntiate vnfr + yield from self._dts.query_update(self.xpath, 0, self.vnfr_msg) + else: yield from self._dts.query_update(self.xpath, 0, self.vnfr_msg) self._log.info("Created VNF with xpath %s and vnfr %s", - self.xpath, self.vnfr_msg) - - self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s", - self.xpath, self._vnfd, self.vnfr_msg) + self.xpath, self.vnfr_msg) @asyncio.coroutine def update_state(self, vnfr_msg): @@ -1114,7 +1235,7 @@ class VirtualNetworkFunctionRecord(object): @asyncio.coroutine def instantiation_failed(self, failed_reason=None): """ This VNFR instantiation failed""" - self._log.error("VNFR %s instantiation failed", self._vnfr_id) + self._log.debug("VNFR %s instantiation failed", self._vnfr_id) self.set_state(VnfRecordState.FAILED) self._state_failed_reason = failed_reason @@ -1216,7 +1337,7 @@ class NetworkServiceStatus(object): event_list = [] idx = 1 for entry in self._events: - event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents() + event = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_OperationalEvents() event.id = idx idx += 1 event.timestamp, event.event, event.description, event.details = entry @@ -1228,7 +1349,8 @@ class NetworkServiceRecord(object): """ Network service record """ XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr" - def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False, + def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, + sdn_account_name, key_pairs, project, restart_mode=False, vlr_handler=None): self._dts = dts self._log = log @@ -1238,12 +1360,15 @@ class NetworkServiceRecord(object): self._nsm_plugin = nsm_plugin self._sdn_account_name = sdn_account_name self._vlr_handler = vlr_handler + self._project = project self._nsd = None self._nsr_msg = None self._nsr_regh = None self._key_pairs = key_pairs - self._vlrs = [] + self._ssh_key_file = None + self._ssh_pub_key = None + self._vlrs = {} self._vnfrs = {} self._vnfds = {} self._vnffgrs = {} @@ -1260,6 +1385,16 @@ class NetworkServiceRecord(object): self._is_active = False self._vl_phase_completed = False self._vnf_phase_completed = False + self.instantiated = set() + + # Used for orchestration_progress + self._active_vms = 0 + self._active_networks = 0 + + # A flag to indicate if the NS has failed, currently it is recorded in + # operational status, but at the time of termination this field is + # over-written making it difficult to identify the failure. + self._is_failed = False # Initalise the state to init # The NSR moves through the following transitions @@ -1269,7 +1404,14 @@ class NetworkServiceRecord(object): self.set_state(NetworkServiceRecordState.INIT) - self.substitute_input_parameters = InputParameterSubstitution(self._log) + self.substitute_input_parameters = InputParameterSubstitution(self._log, self._project) + + # Create an asyncio loop to know when the virtual links are ready + self._vls_ready = asyncio.Event(loop=self._loop) + + # This variable stores all the terminate events received per NS. This is then used to prevent any + # further nsr non-terminate updates received in case of terminate being called bedore ns in in running state. + self._ns_terminate_received = False @property def nsm_plugin(self): @@ -1278,7 +1420,6 @@ class NetworkServiceRecord(object): def set_state(self, state): """ Set state for this NSR""" - self._log.debug("Setting state to %s", state) # We are in init phase and is moving to the next state # The new state could be a FAILED state or VNF_INIIT_PHASE if self.state == NetworkServiceRecordState.VL_INIT_PHASE: @@ -1288,6 +1429,7 @@ class NetworkServiceRecord(object): self._vnf_phase_completed = True self._op_status.set_state(state) + self._nsm_plugin.set_state(self.id, state) @property @@ -1301,13 +1443,9 @@ class NetworkServiceRecord(object): return self._nsr_cfg_msg.name @property - def cloud_account_name(self): - return self._nsr_cfg_msg.cloud_account - - @property - def om_datacenter_name(self): - if self._nsr_cfg_msg.has_field('om_datacenter'): - return self._nsr_cfg_msg.om_datacenter + def _datacenter_name(self): + if self._nsr_cfg_msg.has_field('datacenter'): + return self._nsr_cfg_msg.datacenter return None @property @@ -1377,6 +1515,23 @@ class NetworkServiceRecord(object): """ Config status for NSR """ return self._config_status + @property + def nsm(self): + """NS Manager""" + return self._nsm + + @property + def is_failed(self): + return self._is_failed + + @property + def public_key(self): + return self._ssh_pub_key + + @property + def private_key(self): + return self._ssh_key_file + def resolve_placement_group_cloud_construct(self, input_group): """ Returns the cloud specific construct for placement group @@ -1385,7 +1540,7 @@ class NetworkServiceRecord(object): for group_info in self._nsr_cfg_msg.nsd_placement_group_maps: if group_info.placement_group_ref == input_group.name: - group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo() + group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo() group_dict = {k:v for k,v in group_info.as_dict().items() if k != 'placement_group_ref'} for param in copy_dict: @@ -1396,22 +1551,22 @@ class NetworkServiceRecord(object): def __str__(self): - return "NSR(name={}, nsd_id={}, cloud_account={})".format( - self.name, self.nsd_id, self.cloud_account_name + return "NSR(name={}, nsd_id={}, data center={})".format( + self.name, self.nsd_id, self._datacenter_name ) def _get_vnfd(self, vnfd_id, config_xact): """ Fetch vnfd msg for the passed vnfd id """ return self._nsm.get_vnfd(vnfd_id, config_xact) - def _get_vnfd_cloud_account(self, vnfd_member_index): - """ Fetch Cloud Account for the passed vnfd id """ - if self._nsr_cfg_msg.vnf_cloud_account_map: - vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \ - if vnfd_member_index == vnf.member_vnf_index_ref] + def _get_vnfd_datacenter(self, vnfd_member_index): + """ Fetch datacenter for the passed vnfd id """ + if self._nsr_cfg_msg.vnf_datacenter_map: + vim_accounts = [vnf.datacenter for vnf in self._nsr_cfg_msg.vnf_datacenter_map \ + if str(vnfd_member_index) == str(vnf.member_vnf_index_ref)] if vim_accounts and vim_accounts[0]: return vim_accounts[0] - return (self.cloud_account_name,self.om_datacenter_name) + return self._datacenter_name def _get_constituent_vnfd_msg(self, vnf_index): for const_vnfd in self.nsd_msg.constituent_vnfd: @@ -1428,10 +1583,10 @@ class NetworkServiceRecord(object): def scaling_trigger_str(self, trigger): SCALING_TRIGGER_STRS = { - NsdYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in', - NsdYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in', - NsdYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out', - NsdYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out', + NsdBaseYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in', + NsdBaseYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in', + NsdBaseYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out', + NsdBaseYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out', } try: return SCALING_TRIGGER_STRS[trigger] @@ -1441,6 +1596,32 @@ class NetworkServiceRecord(object): self._log.exception(e) return "Unknown trigger" + def generate_ssh_key_pair(self, config_xact): + '''Generate a ssh key pair if required''' + if self._ssh_key_file: + self._log.debug("Key pair already generated") + return + + gen_key = False + for cv in self.nsd_msg.constituent_vnfd: + vnfd = self._get_vnfd(cv.vnfd_id_ref, config_xact) + if vnfd and vnfd.mgmt_interface.ssh_key: + gen_key = True + break + + if not gen_key: + return + + try: + key = ManoSshKey(self._log) + path = tempfile.mkdtemp() + key.write_to_disk(name=self.id, directory=path) + self._ssh_key_file = "file://{}".format(key.private_key_file) + self._ssh_pub_key = key.public_key + except Exception as e: + self._log.exception("Error generating ssh key for {}: {}". + format(self.nsr_cfg_msg.name, e)) + @asyncio.coroutine def instantiate_vls(self): """ @@ -1448,14 +1629,30 @@ class NetworkServiceRecord(object): """ self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs), self.id) - for vlr in self._vlrs: + for vlr_id, vlr in self._vlrs.items(): yield from self.nsm_plugin.instantiate_vl(self, vlr) - vlr.state = VlRecordState.ACTIVE + if not isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin): + self._vls_ready.set() + + # Wait for the VLs to be ready before yielding control out + self._log.debug("Waitng for %d VLs in NSR id %s to be active", + len(self._vlrs), self.id) + if self._vlrs: + self._log.debug("NSR id:%s, name:%s - Waiting for %d VLs to be ready", + self.id, self.name, len(self._vlrs)) + yield from self._vls_ready.wait() + else: + self._log.debug("NSR id:%s, name:%s, No virtual links found", + self.id, self.name) + self._vls_ready.set() + self._log.info("All %d VLs in NSR id %s are active, start the VNFs", + len(self._vlrs), self.id) @asyncio.coroutine def create(self, config_xact): """ Create this network service""" + self._log.debug("Create NS {} for {}".format(self.name, self._project.name)) # Create virtual links for all the external vnf # connection points in this NS yield from self.create_vls() @@ -1475,22 +1672,32 @@ class NetworkServiceRecord(object): @asyncio.coroutine def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None): """ Apply config based on script for scale group """ + rift_var_root_dir = os.environ['RIFT_VAR_ROOT'] @asyncio.coroutine def add_vnfrs_data(vnfrs_list): """ Add as a dict each of the VNFRs data """ vnfrs_data = [] + for vnfr in vnfrs_list: self._log.debug("Add VNFR {} data".format(vnfr)) vnfr_data = dict() vnfr_data['name'] = vnfr.name - if trigger in [NsdYang.ScalingTrigger.PRE_SCALE_IN, NsdYang.ScalingTrigger.POST_SCALE_OUT]: + if trigger in [NsdBaseYang.ScalingTrigger.PRE_SCALE_IN, + NsdBaseYang.ScalingTrigger.POST_SCALE_OUT]: # Get VNF management and other IPs, etc opdata = yield from self.fetch_vnfr(vnfr.xpath) self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata)) try: vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port + vnfr_data['member_vnf_index_ref'] = opdata.member_vnf_index_ref + vnfr_data['vdur_data'] = [] + for vdur in opdata.vdur: + vdur_data = dict() + vdur_data['vm_name'] = vdur.name + vdur_data['vm_mgmt_ip'] = vdur.vm_management_ip + vnfr_data['vdur_data'].append(vdur_data) except Exception as e: self._log.error("Unable to get management IP for vnfr {}:{}". format(vnfr.name, e)) @@ -1523,9 +1730,14 @@ class NetworkServiceRecord(object): if script[0] == '/': path = script else: - path = os.path.join(os.environ['RIFT_INSTALL'], "usr/bin", script) + path = os.path.join(rift_var_root_dir, + 'launchpad/packages/nsd', + self._project.name, + self.nsd_id, 'scripts', + script) + if not os.path.exists(path): - self._log.error("Config faled for scale group {}: Script does not exist at {}". + self._log.error("Config failed for scale group {}: Script does not exist at {}". format(group.name, path)) return False @@ -1577,7 +1789,11 @@ class NetworkServiceRecord(object): @asyncio.coroutine def update_config_status(success=True, err_msg=None): - self._log.debug("Update %s config status to %r : %s", + """ This is ugly!!! + We are trying to determine the scaling instance's config status + as a collation of the config status associated with 4 different triggers + """ + self._log.debug("Update %s scaling config status to %r : %s", scale_instance, success, err_msg) if (scale_instance.config_status == "failed"): # Do not update the config status if it is already in failed state @@ -1592,21 +1808,32 @@ class NetworkServiceRecord(object): else: # We are in configuring state # Only after post scale out mark instance as configured - if trigger == NsdYang.ScalingTrigger.POST_SCALE_OUT: + if trigger == NsdBaseYang.ScalingTrigger.POST_SCALE_OUT: if success: scale_instance.config_status = "configured" + for vnfr in scale_instance.vnfrs: + if vnfr.config_status == "configuring": + vnfr.vnfr_msg.config_status = "configured" + yield from vnfr.update_vnfm() else: scale_instance.config_status = "failed" scale_instance.config_err_msg = err_msg + yield from self.update_state() + # Publish config state as update_state seems to care only operational status + yield from self.publish() config = group.trigger_config(trigger) if config is None: + if trigger == NsdBaseYang.ScalingTrigger.POST_SCALE_OUT: + self._log.debug("No config needed, update %s scaling config status to configured", + scale_instance) + scale_instance.config_status = "configured" return True self._log.debug("Scaling group {} config: {}".format(group.name, config)) - if config.has_field("ns_config_primitive_name_ref"): - config_name = config.ns_config_primitive_name_ref + if config.has_field("ns_service_primitive_name_ref"): + config_name = config.ns_service_primitive_name_ref nsd_msg = self.nsd_msg config_primitive = None for ns_cfg_prim in nsd_msg.service_primitive: @@ -1619,7 +1846,8 @@ class NetworkServiceRecord(object): self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive)) if config_primitive.has_field("user_defined_script"): - rc = yield from self.apply_scale_group_config_script(config_primitive.user_defined_script, + script_path = '/'.join(["launchpad/packages/nsd", self._project.name, nsd_msg.id, "scripts", config_primitive.user_defined_script]) + rc = yield from self.apply_scale_group_config_script(script_path, group, scale_instance, trigger, vnfrs) err_msg = None if not rc: @@ -1672,11 +1900,11 @@ class NetworkServiceRecord(object): const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index) vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact) - cloud_account_name, om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd_msg.member_vnf_index) - if cloud_account_name is None: - cloud_account_name = self.cloud_account_name + datacenter_name = self._get_vnfd_datacenter(const_vnfd_msg.member_vnf_index) + if datacenter_name is None: + datacenter_name = self._datacenter_name for _ in range(count): - vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index) + vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, datacenter_name, group_name, index) scale_instance.add_vnfr(vnfr) vnfrs.append(vnfr) return vnfrs @@ -1692,7 +1920,7 @@ class NetworkServiceRecord(object): yield from self.update_state() try: - rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_OUT, + rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.PRE_SCALE_OUT, group, scale_instance, vnfrs) if not rc: self._log.error("Pre scale out config for scale group {} ({}) failed". @@ -1724,8 +1952,8 @@ class NetworkServiceRecord(object): @asyncio.coroutine def terminate_instance(): - self._log.debug("Terminating %s VNFRS" % scale_instance) - rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_IN, + self._log.debug("Terminating scaling instance %s VNFRS" % scale_instance) + rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.PRE_SCALE_IN, group, scale_instance) if not rc: self._log.error("Pre scale in config for scale group {} ({}) failed". @@ -1746,7 +1974,7 @@ class NetworkServiceRecord(object): @asyncio.coroutine def post_scale_out_task(group, instance): # Apply post scale out config once all VNFRs are active - rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_OUT, + rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.POST_SCALE_OUT, group, instance) instance.operational_status = "running" if rc: @@ -1780,7 +2008,7 @@ class NetworkServiceRecord(object): elif instance.operational_status == "vnf_terminate_phase": if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]): instance.operational_status = "terminated" - rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN, + rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.POST_SCALE_IN, group, instance) if rc: self._log.debug("Scale in for group {} and instance {} succeeded". @@ -1802,7 +2030,8 @@ class NetworkServiceRecord(object): self, self.name, vnffgd, - self._sdn_account_name + self._sdn_account_name, + self._datacenter_name ) self._vnffgrs[vnffgr.id] = vnffgr @@ -1814,12 +2043,12 @@ class NetworkServiceRecord(object): return profile[0] if profile else None @asyncio.coroutine - def _create_vls(self, vld, cloud_account,om_datacenter): + def _create_vls(self, vld, datacenter): """Create a VLR in the cloud account specified using the given VLD Args: vld : VLD yang obj - cloud_account : Cloud account name + datacenter : Cloud account name Returns: VirtualLinkRecord @@ -1828,60 +2057,58 @@ class NetworkServiceRecord(object): self._dts, self._log, self._loop, + self._project, self.name, vld, - cloud_account, - om_datacenter, + datacenter, self.resolve_vld_ip_profile(self.nsd_msg, vld), self.id, restart_mode=self.restart_mode) return vlr - def _extract_cloud_accounts_for_vl(self, vld): + def _extract_datacenters_for_vl(self, vld): """ Extracts the list of cloud accounts from the NS Config obj Rules: - 1. Cloud accounts based connection point (vnf_cloud_account_map) + 1. Cloud accounts based connection point (vnf_datacenter_map) Args: vld : VLD yang object Returns: TYPE: Description """ - cloud_account_list = [] + datacenter_list = [] - if self._nsr_cfg_msg.vnf_cloud_account_map: - # Handle case where cloud_account is None - vnf_cloud_map = {} - for vnf in self._nsr_cfg_msg.vnf_cloud_account_map: - if vnf.cloud_account is not None or vnf.om_datacenter is not None: - vnf_cloud_map[vnf.member_vnf_index_ref] = (vnf.cloud_account,vnf.om_datacenter) + if self._nsr_cfg_msg.vnf_datacenter_map: + # Handle case where datacenter is None + vnf_datacenter_map = {} + for vnf in self._nsr_cfg_msg.vnf_datacenter_map: + if vnf.datacenter is not None or vnf.datacenter is not None: + vnf_datacenter_map[vnf.member_vnf_index_ref] = \ + vnf.datacenter for vnfc in vld.vnfd_connection_point_ref: - cloud_account = vnf_cloud_map.get( - vnfc.member_vnf_index_ref, - (self.cloud_account_name,self.om_datacenter_name)) + datacenter = vnf_datacenter_map.get( + vnfc.member_vnf_index_ref, self._datacenter_name) - cloud_account_list.append(cloud_account) + datacenter_list.append(datacenter) - if self._nsr_cfg_msg.vl_cloud_account_map: - for vld_map in self._nsr_cfg_msg.vl_cloud_account_map: + if self._nsr_cfg_msg.vl_datacenter_map: + for vld_map in self._nsr_cfg_msg.vl_datacenter_map: if vld_map.vld_id_ref == vld.id: - for cloud_account in vld_map.cloud_accounts: - cloud_account_list.extend((cloud_account,None)) - for om_datacenter in vld_map.om_datacenters: - cloud_account_list.extend((None,om_datacenter)) + for datacenter in vld_map.datacenters: + datacenter_list.append(datacenter) # If no config has been provided then fall-back to the default # account - if not cloud_account_list: - cloud_account_list = [(self.cloud_account_name,self.om_datacenter_name)] + if not datacenter_list: + datacenter_list.append(self._datacenter_name) - self._log.debug("VL {} cloud accounts: {}". - format(vld.name, cloud_account_list)) - return set(cloud_account_list) + self._log.debug("VL {} data center list: {}". + format(vld.name, datacenter_list)) + return set(datacenter_list) @asyncio.coroutine def create_vls(self): @@ -1890,41 +2117,41 @@ class NetworkServiceRecord(object): for vld in self.nsd_msg.vld: self._log.debug("Found vld %s in nsr id %s", vld, self.id) - cloud_account_list = self._extract_cloud_accounts_for_vl(vld) - for cloud_account,om_datacenter in cloud_account_list: - vlr = yield from self._create_vls(vld, cloud_account,om_datacenter) - self._vlrs.append(vlr) - + datacenter_list = self._extract_datacenters_for_vl(vld) + for datacenter in datacenter_list: + vlr = yield from self._create_vls(vld, datacenter) + self._vlrs[vlr.id] = vlr + self._nsm.add_vlr_id_nsr_map(vlr.id, self) @asyncio.coroutine def create_vl_instance(self, vld): - self._log.debug("Create VL for {}: {}".format(self.id, vld.as_dict())) + self._log.error("Create VL for {}: {}".format(self.id, vld.as_dict())) # Check if the VL is already present vlr = None - for vl in self._vlrs: + for vl_id, vl in self._vlrs.items(): if vl.vld_msg.id == vld.id: - self._log.debug("The VLD %s already in NSR %s as VLR %s with status %s", + self._log.error("The VLD %s already in NSR %s as VLR %s with status %s", vld.id, self.id, vl.id, vl.state) vlr = vl if vlr.state != VlRecordState.TERMINATED: - err_msg = "VLR for VL %s in NSR %s already instantiated", \ - vld, self.id + err_msg = "VLR for VL {} in NSR {} already instantiated". \ + format(vld, self.id) self._log.error(err_msg) raise NsrVlUpdateError(err_msg) break if vlr is None: - cloud_account_list = self._extract_cloud_accounts_for_vl(vld) - for account,om_datacenter in cloud_account_list: - vlr = yield from self._create_vls(vld, account,om_datacenter) - self._vlrs.append(vlr) + datacenter_list = self._extract_datacenters_for_vl(vld) + for datacenter in datacenter_list: + vlr = yield from self._create_vls(vld, account, datacenter) + self._vlrs[vlr.id] = vlr + self._nsm.add_vlr_id_nsr_map(vlr.id, self) vlr.state = VlRecordState.INSTANTIATION_PENDING yield from self.update_state() try: yield from self.nsm_plugin.instantiate_vl(self, vlr) - vlr.state = VlRecordState.ACTIVE except Exception as e: err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \ @@ -1937,7 +2164,7 @@ class NetworkServiceRecord(object): @asyncio.coroutine def delete_vl_instance(self, vld): - for vlr in self._vlrs: + for vlr_id, vlr in self._vlrs.items(): if vlr.vld_msg.id == vld.id: self._log.debug("Found VLR %s for VLD %s in NSR %s", vlr.id, vld.id, self.id) @@ -1947,7 +2174,8 @@ class NetworkServiceRecord(object): try: yield from self.nsm_plugin.terminate_vl(vlr) vlr.state = VlRecordState.TERMINATED - self._vlrs.remove(vlr) + del self._vlrs[vlr] + self.remove_vlr_id_nsr_map(vlr.id) except Exception as e: err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \ @@ -1975,18 +2203,17 @@ class NetworkServiceRecord(object): continue vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact) - cloud_account_name,om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd.member_vnf_index) - if cloud_account_name is None: - cloud_account_name = self.cloud_account_name - yield from self.create_vnf_record(vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name) - + datacenter_name = self._get_vnfd_datacenter(const_vnfd.member_vnf_index) + if datacenter_name is None: + datacenter_name = self._datacenter_name + yield from self.create_vnf_record(vnfd_msg, const_vnfd, datacenter_name) def get_placement_groups(self, vnfd_msg, const_vnfd): placement_groups = [] for group in self.nsd_msg.placement_groups: for member_vnfd in group.member_vnfd: if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \ - (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): + (member_vnfd.member_vnf_index_ref == str(const_vnfd.member_vnf_index)): group_info = self.resolve_placement_group_cloud_construct(group) if group_info is None: self._log.info("Could not resolve cloud-construct for placement group: %s", group.name) @@ -1999,28 +2226,58 @@ class NetworkServiceRecord(object): placement_groups.append(group_info) return placement_groups + def get_cloud_config(self): + cloud_config = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_CloudConfig() + self._log.debug("Received key pair is {}".format(self._key_pairs)) + + for authorized_key in self.nsr_cfg_msg.ssh_authorized_key: + if authorized_key.key_pair_ref in self._key_pairs: + key_pair = cloud_config.key_pair.add() + key_pair.from_dict(self._key_pairs[authorized_key.key_pair_ref].as_dict()) + for nsd_key_pair in self.nsd_msg.key_pair: + key_pair = cloud_config.key_pair.add() + key_pair.from_dict(key_pair.as_dict()) + for nsr_cfg_user in self.nsr_cfg_msg.user: + user = cloud_config.user.add() + user.name = nsr_cfg_user.name + user.user_info = nsr_cfg_user.user_info + for ssh_key in nsr_cfg_user.ssh_authorized_key: + if ssh_key.key_pair_ref in self._key_pairs: + key_pair = user.key_pair.add() + key_pair.from_dict(self._key_pairs[ssh_key.key_pair_ref].as_dict()) + for nsd_user in self.nsd_msg.user: + user = cloud_config.user.add() + user.from_dict(nsd_user.as_dict()) + + self._log.debug("Formed cloud-config msg is {}".format(cloud_config)) + return cloud_config + @asyncio.coroutine - def create_vnf_record(self, vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name, group_name=None, group_instance_id=None): + def create_vnf_record(self, vnfd_msg, const_vnfd, datacenter_name, group_name=None, group_instance_id=None): # Fetch the VNFD associated with this VNF placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd) - self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name) + cloud_config = self.get_cloud_config() + self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,datacenter_name) self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s", vnfd_msg.name, const_vnfd.member_vnf_index, [ group.name for group in placement_groups]) + vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts, self._log, self._loop, + self._project, vnfd_msg, + self._nsr_cfg_msg, const_vnfd, self.nsd_id, self.name, - cloud_account_name, - om_datacenter_name, + datacenter_name, self.id, group_name, group_instance_id, placement_groups, + cloud_config, restart_mode=self.restart_mode, ) if vnfr.id in self._vnfrs: @@ -2076,10 +2333,38 @@ class NetworkServiceRecord(object): """ This function instantiates VNFs for every VNF in this Network Service """ - self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id) - for vnf in vnfrs: + @asyncio.coroutine + def instantiate_vnf(vnf): self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id) + vnfd_id = vnf.vnfr_msg.vnfd.id + for dependency_vnf in dependencies[vnfd_id]: + while dependency_vnf not in self.instantiated: + yield from asyncio.sleep(1, loop=self._loop) + yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout) + self.instantiated.add(vnfd_id) + + self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id) + dependencies = collections.defaultdict(list) + for dependency_vnf in self._nsr_cfg_msg.nsd.vnf_dependency: + dependencies[dependency_vnf.vnf_source_ref].append(dependency_vnf.vnf_depends_on_ref) + + # The dictionary copy is to ensure that if a terminate is initiated right after instantiation, the + # Runtime error for "dictionary changed size during iteration" does not occur. + # vnfrs - 'dict_values' object + # vnfrs_copy - list object + vnfrs_copy = list(vnfrs) + tasks = [] + for vnf in vnfrs_copy: + vnf_task = self._loop.create_task(instantiate_vnf(vnf)) + tasks.append(vnf_task) + + if len(tasks) > 0: + self._log.debug("Waiting for %s instantiate_vnf tasks to complete", len(tasks)) + done, pending = yield from asyncio.wait(tasks, loop=self._loop, timeout=30) + if pending: + self._log.error("The Instantiate vnf task timed out after 30 seconds.") + raise VirtualNetworkFunctionRecordError("Task tied out : ", pending) @asyncio.coroutine def instantiate_vnffgs(self): @@ -2142,6 +2427,7 @@ class NetworkServiceRecord(object): @asyncio.coroutine def publish(self): """ This function publishes this NSR """ + self._nsr_msg = self.create_msg() self._log.debug("Publishing the NSR with xpath %s and nsr %s", @@ -2152,37 +2438,37 @@ class NetworkServiceRecord(object): self._log.debug("Publishing NSR in RUNNING state!") #raise() - with self._dts.transaction() as xact: - yield from self._nsm.nsr_handler.update(xact, self.nsr_xpath, self._nsr_msg) - if self._op_status.state == NetworkServiceRecordState.RUNNING: - self._debug_running = True + yield from self._nsm.nsr_handler.update(None, self.nsr_xpath, self._nsr_msg) + if self._op_status.state == NetworkServiceRecordState.RUNNING: + self._debug_running = True @asyncio.coroutine - def unpublish(self, xact): + def unpublish(self, xact=None): """ Unpublish this NSR object """ self._log.debug("Unpublishing Network service id %s", self.id) + yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath) @property def nsr_xpath(self): """ Returns the xpath associated with this NSR """ - return( + return self._project.add_project(( "D,/nsr:ns-instance-opdata" + - "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" - ).format(self.id) + "/nsr:nsr[nsr:ns-instance-config-ref={}]" + ).format(quoted_key(self.id))) @staticmethod def xpath_from_nsr(nsr): """ Returns the xpath associated with this NSR op data""" return (NetworkServiceRecord.XPATH + - "[nsr:ns-instance-config-ref = '{}']").format(nsr.id) + "[nsr:ns-instance-config-ref={}]").format(quoted_key(nsr.id)) @property def nsd_xpath(self): """ Return NSD config xpath.""" - return( - "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']" - ).format(self.nsd_id) + return self._project.add_project(( + "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id={}]" + ).format(quoted_key(self.nsd_id))) @asyncio.coroutine def instantiate(self, config_xact): @@ -2211,17 +2497,17 @@ class NetworkServiceRecord(object): # Move the state to INIITALIZING self.set_state(NetworkServiceRecordState.INIT) - event_descr = "Instantiation Request Received NSR Id:%s" % self.id + event_descr = "Instantiation Request Received NSR Id: %s, NS Name: %s" % (self.id, self.name) self.record_event("instantiating", event_descr) # Find the NSD self._nsd = self._nsr_cfg_msg.nsd # Merge any config and initial config primitive values - self.config_store.merge_nsd_config(self.nsd_msg) + self.config_store.merge_nsd_config(self.nsd_msg, self._project.name) self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict())) - event_descr = "Fetched NSD with descriptor id %s" % self.nsd_id + event_descr = "Fetched NSD with descriptor id %s, NS Name: %s" % (self.nsd_id, self.name) self.record_event("nsd-fetched", event_descr) if self._nsd is None: @@ -2249,59 +2535,86 @@ class NetworkServiceRecord(object): self.id, self.nsd_id) # instantiate the VLs - event_descr = ("Instantiating %s external VLs for NSR id %s" % - (len(self.nsd_msg.vld), self.id)) + event_descr = ("Instantiating %s external VLs for NSR id: %s, NS Name: %s " % + (len(self.nsd_msg.vld), self.id, self.name)) self.record_event("begin-external-vls-instantiation", event_descr) self.set_state(NetworkServiceRecordState.VL_INIT_PHASE) - yield from self.instantiate_vls() - # Publish the NSR to DTS yield from self.publish() - event_descr = ("Finished instantiating %s external VLs for NSR id %s" % - (len(self.nsd_msg.vld), self.id)) + if self._ns_terminate_received: + self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-external-vls-instantiation.") + # Setting this flag as False again as this is a state where neither VL or VNF have been instantiated. + self._ns_terminate_received = False + # At this stage only ns-instance opdata is published. Cleaning up the record. + yield from self.unpublish() + return + + yield from self.instantiate_vls() + + event_descr = ("Finished instantiating %s external VLs for NSR id: %s, NS Name: %s " % + (len(self.nsd_msg.vld), self.id, self.name)) self.record_event("end-external-vls-instantiation", event_descr) self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE) + # Publish the NSR to DTS + yield from self.publish() + self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]", self.id, self.nsd_id) # instantiate the VNFs - event_descr = ("Instantiating %s VNFS for NSR id %s" % - (len(self.nsd_msg.constituent_vnfd), self.id)) + event_descr = ("Instantiating %s VNFS for NSR id: %s, NS Name: %s " % + (len(self.nsd_msg.constituent_vnfd), self.id, self.name)) self.record_event("begin-vnf-instantiation", event_descr) + if self._ns_terminate_received: + self._log.debug("Terminate Received. Interrupting Instantiation at event : end-external-vls-instantiation.") + return + yield from self.instantiate_vnfs(self._vnfrs.values()) - self._log.debug(" Finished instantiating %d VNFs for NSR id %s", - len(self.nsd_msg.constituent_vnfd), self.id) + self._log.debug(" Finished instantiating %d VNFs for NSR id: %s, NS Name: %s", + len(self.nsd_msg.constituent_vnfd), self.id, self.name) - event_descr = ("Finished instantiating %s VNFs for NSR id %s" % - (len(self.nsd_msg.constituent_vnfd), self.id)) + event_descr = ("Finished instantiating %s VNFs for NSR id: %s, NS Name: %s" % + (len(self.nsd_msg.constituent_vnfd), self.id, self.name)) self.record_event("end-vnf-instantiation", event_descr) + # Publish the NSR to DTS + yield from self.publish() + if len(self.vnffgrs) > 0: #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE) - event_descr = ("Instantiating %s VNFFGS for NSR id %s" % - (len(self.nsd_msg.vnffgd), self.id)) + event_descr = ("Instantiating %s VNFFGS for NSR id: %s, NS Name: %s" % + (len(self.nsd_msg.vnffgd), self.id, self.name)) self.record_event("begin-vnffg-instantiation", event_descr) + if self._ns_terminate_received: + self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-vnffg-instantiation.") + return + yield from self.instantiate_vnffgs() - event_descr = ("Finished instantiating %s VNFFGDs for NSR id %s" % - (len(self.nsd_msg.vnffgd), self.id)) + event_descr = ("Finished instantiating %s VNFFGDs for NSR id: %s, NS Name: %s" % + (len(self.nsd_msg.vnffgd), self.id, self.name)) self.record_event("end-vnffg-instantiation", event_descr) if self.has_scaling_instances(): - event_descr = ("Instantiating %s Scaling Groups for NSR id %s" % - (len(self._scaling_groups), self.id)) + event_descr = ("Instantiating %s Scaling Groups for NSR id: %s, NS Name: %s" % + (len(self._scaling_groups), self.id, self.name)) self.record_event("begin-scaling-group-instantiation", event_descr) + + if self._ns_terminate_received: + self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-scaling-group-instantiation.") + return + yield from self.instantiate_scaling_instances(config_xact) self.record_event("end-scaling-group-instantiation", event_descr) @@ -2309,14 +2622,14 @@ class NetworkServiceRecord(object): # virtual links and vnfs are instantiated yield from self.nsm_plugin.deploy(self._nsr_msg) - self._log.debug("Publishing NSR...... nsr[%s], nsd[%s]", - self.id, self.nsd_id) + self._log.debug("Publishing NSR...... nsr[%s], nsd[%s], for NS[%s]", + self.id, self.nsd_id, self.name) # Publish the NSR to DTS yield from self.publish() - self._log.debug("Published NSR...... nsr[%s], nsd[%s]", - self.id, self.nsd_id) + self._log.debug("Published NSR...... nsr[%s], nsd[%s], for NS[%s]", + self.id, self.nsd_id, self.name) def on_instantiate_done(fut): # If the do_instantiate fails, then publish NSR with failed result @@ -2344,6 +2657,9 @@ class NetworkServiceRecord(object): yield from self.publish() + if status == NsrYang.ConfigStates.TERMINATE: + yield from self.terminate_ns_cont() + @asyncio.coroutine def is_active(self): """ This NS is active """ @@ -2355,7 +2671,7 @@ class NetworkServiceRecord(object): self._log.debug("Network service %s is active ", self.id) self._is_active = True - event_descr = "NSR in running state for NSR id %s" % self.id + event_descr = "NSR in running state for NSR id: %s, NS Name: %s" % (self.id, self.name) self.record_event("ns-running", event_descr) yield from self.publish() @@ -2366,8 +2682,9 @@ class NetworkServiceRecord(object): self._log.error("Network service id:%s, name:%s instantiation failed", self.id, self.name) self.set_state(NetworkServiceRecordState.FAILED) + self._is_failed = True - event_descr = "Instantiation of NS %s failed" % self.id + event_descr = "Instantiation of NS %s - %s failed" % (self.id, self.name) self.record_event("ns-failed", event_descr, evt_details=failed_reason) # Publish the NSR to DTS @@ -2376,59 +2693,94 @@ class NetworkServiceRecord(object): @asyncio.coroutine def terminate_vnfrs(self, vnfrs, scalein=False): """ Terminate VNFRS in this network service """ - self._log.debug("Terminating VNFs in network service %s", self.id) - for vnfr in vnfrs: + self._log.debug("Terminating VNFs in network service %s - %s", self.id, self.name) + vnfr_ids = [] + for vnfr in list(vnfrs): self._log.debug("Terminating VNFs in network service %s %s", vnfr.id, self.id) - if scalein: - yield from self.nsm_plugin.terminate_vnf(self, vnfr, scalein=True) + yield from self.nsm_plugin.terminate_vnf(self, vnfr, scalein=scalein) + vnfr_ids.append(vnfr.id) + + for vnfr_id in vnfr_ids: + self._vnfrs.pop(vnfr_id, None) @asyncio.coroutine def terminate(self): - """ Terminate a NetworkServiceRecord.""" + """Start terminate of a NetworkServiceRecord.""" + # Move the state to TERMINATE + self.set_state(NetworkServiceRecordState.TERMINATE) + event_descr = "Terminate being processed for NS Id: %s, NS Name: %s" % (self.id, self.name) + self.record_event("terminate", event_descr) + self._log.debug("Terminating network service id: %s, NS Name: %s", self.id, self.name) + + # Adding the NSR ID on terminate Evet. This will be checked to halt the instantiation if not already finished. + self._ns_terminate_received = True + + yield from self.publish() + + if self._is_failed: + # IN case the instantiation failed, then trigger a cleanup immediately + # don't wait for Cfg manager, as it will have no idea of this NSR. + # Due to the failure + yield from self.terminate_ns_cont() + + + @asyncio.coroutine + def terminate_ns_cont(self): + """Config script related to terminate finished, continue termination""" def terminate_vnffgrs(): """ Terminate VNFFGRS in this network service """ - self._log.debug("Terminating VNFFGRs in network service %s", self.id) + self._log.debug("Terminating VNFFGRs in network service %s - %s", self.id, self.name) for vnffgr in self.vnffgrs.values(): yield from vnffgr.terminate() def terminate_vlrs(): """ Terminate VLRs in this netork service """ - self._log.debug("Terminating VLs in network service %s", self.id) - for vlr in self.vlrs: + self._log.debug("Terminating VLs in network service %s - %s", self.id, self.name) + for vlr_id, vlr in self.vlrs.items(): yield from self.nsm_plugin.terminate_vl(vlr) vlr.state = VlRecordState.TERMINATED - self._log.debug("Terminating network service id %s", self.id) - - # Move the state to TERMINATE - self.set_state(NetworkServiceRecordState.TERMINATE) - event_descr = "Terminate being processed for NS Id:%s" % self.id - self.record_event("terminate", event_descr) - # Move the state to VNF_TERMINATE_PHASE - self._log.debug("Terminating VNFFGs in NS ID: %s", self.id) + self._log.debug("Terminating VNFFGs in NS ID: %s, NS Name: %s", self.id, self.name) self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE) - event_descr = "Terminating VNFFGS in NS Id:%s" % self.id + event_descr = "Terminating VNFFGS in NS Id: %s, NS Name: %s" % (self.id, self.name) self.record_event("terminating-vnffgss", event_descr) yield from terminate_vnffgrs() # Move the state to VNF_TERMINATE_PHASE self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE) - event_descr = "Terminating VNFS in NS Id:%s" % self.id + event_descr = "Terminating VNFS in NS Id: %s, NS Name: %s" % (self.id, self.name) self.record_event("terminating-vnfs", event_descr) yield from self.terminate_vnfrs(self.vnfrs.values()) # Move the state to VL_TERMINATE_PHASE self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE) - event_descr = "Terminating VLs in NS Id:%s" % self.id + event_descr = "Terminating VLs in NS Id: %s, NS Name: %s" % (self.id, self.name) self.record_event("terminating-vls", event_descr) yield from terminate_vlrs() yield from self.nsm_plugin.terminate_ns(self) + # Remove the generated SSH key + if self._ssh_key_file: + p = urlparse(self._ssh_key_file) + if p[0] == 'file': + path = os.path.dirname(p[2]) + self._log.debug("NSR {}: Removing keys in {}".format(self.name, + path)) + shutil.rmtree(path, ignore_errors=True) + # Move the state to TERMINATED self.set_state(NetworkServiceRecordState.TERMINATED) - event_descr = "Terminated NS Id:%s" % self.id + event_descr = "Terminated NS Id: %s, NS Name: %s" % (self.id, self.name) self.record_event("terminated", event_descr) + # Unpublish the NSR record + self._log.debug("Unpublishing the network service %s - %s", self.id, self.name) + yield from self.unpublish() + + # Finaly delete the NS instance from this NS Manager + self._log.debug("Deleting the network service %s - %s", self.id, self.name) + self.nsm.delete_nsr(self.id) + def enable(self): """"Enable a NetworkServiceRecord.""" pass @@ -2457,8 +2809,8 @@ class NetworkServiceRecord(object): def create_msg(self): """ The network serice record as a message """ nsr_dict = {"ns_instance_config_ref": self.id} - nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict) - #nsr.cloud_account = self.cloud_account_name + nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict) + #nsr.datacenter = self.cloud_account_name nsr.sdn_account = self._sdn_account_name nsr.name_ref = self.name nsr.nsd_ref = self.nsd_id @@ -2470,18 +2822,47 @@ class NetworkServiceRecord(object): nsr.create_time = self._create_time nsr.uptime = int(time.time()) - self._create_time + # Added for OpenMano + + nsr.orchestration_progress.networks.total = len(self.nsd_msg.vld) + if isinstance(self.nsm_plugin, openmano_nsm.OpenmanoNsPlugin): + # Taking the last update by OpenMano + nsr.orchestration_progress.networks.active = self.nsm_plugin._openmano_nsrs[self.id]._active_nets + else: + nsr.orchestration_progress.networks.active = self._active_networks + no_of_vdus = 0 + for vnfr_id, vnfr in self._vnfrs.items(): + no_of_vdus += len(vnfr.vnfd.vdu) + + nsr.orchestration_progress.vms.total = no_of_vdus + if isinstance(self.nsm_plugin, openmano_nsm.OpenmanoNsPlugin): + # Taking the last update by OpenMano + nsr.orchestration_progress.vms.active = self.nsm_plugin._openmano_nsrs[self.id]._active_vms + else: + nsr.orchestration_progress.vms.active = self._active_vms + + # Generated SSH key + if self._ssh_pub_key: + nsr.ssh_key_generated.private_key_file = self._ssh_key_file + nsr.ssh_key_generated.public_key = self._ssh_pub_key + for cfg_prim in self.nsd_msg.service_primitive: - cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( + cfg_prim = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( cfg_prim.as_dict()) nsr.service_primitive.append(cfg_prim) - for init_cfg in self.nsd_msg.initial_config_primitive: - prim = NsrYang.NsrInitialConfigPrimitive.from_dict( + for init_cfg in self.nsd_msg.initial_service_primitive: + prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_InitialServicePrimitive.from_dict( init_cfg.as_dict()) - nsr.initial_config_primitive.append(prim) + nsr.initial_service_primitive.append(prim) + + for term_cfg in self.nsd_msg.terminate_service_primitive: + prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_TerminateServicePrimitive.from_dict( + term_cfg.as_dict()) + nsr.terminate_service_primitive.append(prim) if self.vl_phase_completed(): - for vlr in self.vlrs: + for vlr_id, vlr in self.vlrs.items(): nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values())) if self.vnf_phase_completed(): @@ -2506,104 +2887,187 @@ class NetworkServiceRecord(object): """ Re-evaluate this NS's state """ curr_state = self._op_status.state - if curr_state == NetworkServiceRecordState.TERMINATED: - self._log.debug("NS (%s) in terminated state, not updating state", self.id) - return + # This means that the terminate has been fired before the NS was UP. + if self._ns_terminate_received: + # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call. + self._ns_terminate_received = False + yield from self.terminate_ns_cont() + else: + if curr_state == NetworkServiceRecordState.TERMINATED: + self._log.debug("NS (%s - %s) in terminated state, not updating state", self.id, self.name) + return - new_state = NetworkServiceRecordState.RUNNING - self._log.info("Received update_state for nsr: %s, curr-state: %s", - self.id, curr_state) + new_state = NetworkServiceRecordState.RUNNING + self._log.debug("Received update_state for nsr: %s, curr-state: %s", + self.id, curr_state) - # Check all the VNFRs are present - for _, vnfr in self.vnfrs.items(): - if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]: - pass - elif vnfr.state == VnfRecordState.FAILED: - if vnfr._prev_state != vnfr.state: - event_descr = "Instantiation of VNF %s failed" % vnfr.id - event_error_details = vnfr.state_failed_reason - self.record_event("vnf-failed", event_descr, evt_details=event_error_details) - vnfr.set_state(VnfRecordState.FAILED) - else: - self._log.info("VNF state did not change, curr=%s, prev=%s", - vnfr.state, vnfr._prev_state) - new_state = NetworkServiceRecordState.FAILED - break - else: - self._log.info("VNF %s in NSR %s is still not active; current state is: %s", - vnfr.id, self.id, vnfr.state) - new_state = curr_state - - # If new state is RUNNING; check all VLs - if new_state == NetworkServiceRecordState.RUNNING: - for vl in self.vlrs: - - if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]: - pass - elif vl.state == VlRecordState.FAILED: - if vl.prev_state != vl.state: - event_descr = "Instantiation of VL %s failed" % vl.id - event_error_details = vl.state_failed_reason - self.record_event("vl-failed", event_descr, evt_details=event_error_details) - vl.prev_state = vl.state + # check all VLs + if (isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin)): + for vlr_id, vl in self.vlrs.items(): + self._log.debug("VLR %s state %s", vlr_id, vl.state) + if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]: + continue + elif vl.state == VlRecordState.FAILED: + if vl.prev_state != vl.state: + event_descr = "Instantiation of VL %s failed" % vl.id + event_error_details = vl.state_failed_reason + self.record_event("vl-failed", event_descr, evt_details=event_error_details) + vl.prev_state = vl.state + new_state = NetworkServiceRecordState.FAILED + break + else: + self._log.debug("VL already in failed state") else: - self._log.debug("VL %s already in failed state") - else: - if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]: - new_state = NetworkServiceRecordState.VL_INSTANTIATE + if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]: + new_state = NetworkServiceRecordState.VL_INSTANTIATE + break + + if vl.state in [VlRecordState.TERMINATE_PENDING]: + new_state = NetworkServiceRecordState.VL_TERMINATE + break + + # Check all the VNFRs are present + if new_state == NetworkServiceRecordState.RUNNING: + for _, vnfr in self.vnfrs.items(): + self._log.debug("VNFR state %s", vnfr.state) + if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]: + active_vdus = 0 + for vnfr in self.vnfrs: + active_vdus += self.nsm._vnfrs[vnfr]._active_vdus + + if self._active_vms != active_vdus: + self._active_vms = active_vdus + yield from self.publish() + + continue + + elif vnfr.state == VnfRecordState.FAILED: + if vnfr._prev_state != vnfr.state: + event_descr = "Instantiation of VNF %s for NS: %s failed" % (vnfr.id, self.name) + event_error_details = vnfr.state_failed_reason + self.record_event("vnf-failed", event_descr, evt_details=event_error_details) + vnfr.set_state(VnfRecordState.FAILED) + else: + self._log.info("VNF state did not change, curr=%s, prev=%s", + vnfr.state, vnfr._prev_state) + new_state = NetworkServiceRecordState.FAILED break - - if vl.state in [VlRecordState.TERMINATE_PENDING]: - new_state = NetworkServiceRecordState.VL_TERMINATE + else: + self._log.debug("VNF %s in NSR %s - %s is still not active; current state is: %s", + vnfr.id, self.id, self.name, vnfr.state) + new_state = curr_state + + # If new state is RUNNING; check VNFFGRs are also active + if new_state == NetworkServiceRecordState.RUNNING: + for _, vnffgr in self.vnffgrs.items(): + self._log.debug("Checking vnffgr state for nsr %s is: %s", + self.id, vnffgr.state) + if vnffgr.state == VnffgRecordState.ACTIVE: + continue + elif vnffgr.state == VnffgRecordState.FAILED: + event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id + self.record_event("vnffg-failed", event_descr) + new_state = NetworkServiceRecordState.FAILED break + else: + self._log.info("VNFFGR %s in NSR %s - %s is still not active; current state is: %s", + vnffgr.id, self.id, self.name, vnffgr.state) + new_state = curr_state + + # Update all the scaling group instance operational status to + # reflect the state of all VNFR within that instance + yield from self._update_scale_group_instances_status() - # If new state is RUNNING; check VNFFGRs are also active - if new_state == NetworkServiceRecordState.RUNNING: - for _, vnffgr in self.vnffgrs.items(): - self._log.info("Checking vnffgr state for nsr %s is: %s", - self.id, vnffgr.state) - if vnffgr.state == VnffgRecordState.ACTIVE: - pass - elif vnffgr.state == VnffgRecordState.FAILED: - event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id - self.record_event("vnffg-failed", event_descr) - new_state = NetworkServiceRecordState.FAILED + for _, group in self._scaling_groups.items(): + if group.state == scale_group.ScaleGroupState.SCALING_OUT: + new_state = NetworkServiceRecordState.SCALING_OUT + break + elif group.state == scale_group.ScaleGroupState.SCALING_IN: + new_state = NetworkServiceRecordState.SCALING_IN break + + if new_state != curr_state: + self._log.debug("Changing state of Network service %s - %s from %s to %s", + self.id, self.name, curr_state, new_state) + if new_state == NetworkServiceRecordState.RUNNING: + yield from self.is_active() + elif new_state == NetworkServiceRecordState.FAILED: + # If the NS is already active and we entered scaling_in, scaling_out, + # do not mark the NS as failing if scaling operation failed. + if curr_state in [NetworkServiceRecordState.SCALING_OUT, + NetworkServiceRecordState.SCALING_IN] and self._is_active: + new_state = NetworkServiceRecordState.RUNNING + self.set_state(new_state) + else: + yield from self.instantiation_failed() else: - self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s", - vnffgr.id, self.id, vnffgr.state) - new_state = curr_state + self.set_state(new_state) - # Update all the scaling group instance operational status to - # reflect the state of all VNFR within that instance - yield from self._update_scale_group_instances_status() + yield from self.publish() - for _, group in self._scaling_groups.items(): - if group.state == scale_group.ScaleGroupState.SCALING_OUT: - new_state = NetworkServiceRecordState.SCALING_OUT - break - elif group.state == scale_group.ScaleGroupState.SCALING_IN: - new_state = NetworkServiceRecordState.SCALING_IN - break + def vl_instantiation_state(self): + """ Check if all VLs in this NS are active """ + for vl_id, vlr in self.vlrs.items(): + if vlr.state == VlRecordState.ACTIVE: + continue + elif vlr.state == VlRecordState.FAILED: + return VlRecordState.FAILED + elif vlr.state == VlRecordState.TERMINATED: + return VlRecordState.TERMINATED + elif vlr.state == VlRecordState.INSTANTIATION_PENDING: + return VlRecordState.INSTANTIATION_PENDING + else: + self._log.error("vlr %s still in state %s", vlr, vlr.state) + raise VirtualLinkRecordError("Invalid state %s" %(vlr.state)) + return VlRecordState.ACTIVE + + def vl_instantiation_successful(self): + """ Mark that all VLs in this NS are active """ + if self._vls_ready.is_set(): + self._log.error("NSR id %s, vls_ready is already set", self.id) + + if self.vl_instantiation_state() == VlRecordState.ACTIVE: + self._log.debug("NSR id %s, All %d vlrs are in active state %s", + self.id, len(self.vlrs), self.vl_instantiation_state) + self._vls_ready.set() + + def vlr_event(self, vlr, action): + self._log.debug("Received VLR %s with action:%s", vlr, action) + + if vlr.id not in self.vlrs: + self._log.error("VLR %s:%s received for unknown id, state:%s", + vlr.id, vlr.name, vlr.operational_status) + return - if new_state != curr_state: - self._log.debug("Changing state of Network service %s from %s to %s", - self.id, curr_state, new_state) - if new_state == NetworkServiceRecordState.RUNNING: - yield from self.is_active() - elif new_state == NetworkServiceRecordState.FAILED: - # If the NS is already active and we entered scaling_in, scaling_out, - # do not mark the NS as failing if scaling operation failed. - if curr_state in [NetworkServiceRecordState.SCALING_OUT, - NetworkServiceRecordState.SCALING_IN] and self._is_active: - new_state = NetworkServiceRecordState.RUNNING - self.set_state(new_state) - else: - yield from self.instantiation_failed() + vlr_local = self.vlrs[vlr.id] + + if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE: + if vlr.operational_status == 'running': + vlr_local.set_state_from_op_status(vlr.operational_status) + self._active_networks += 1 + self._log.info("VLR %s:%s moving to active state", + vlr.id,vlr.name) + elif vlr.operational_status == 'failed': + vlr_local.set_state_from_op_status(vlr.operational_status) + vlr_local.state_failed_reason = vlr.operational_status_details + asyncio.ensure_future(self.update_state(), loop=self._loop) + self._log.info("VLR %s:%s moving to failed state", + vlr.id,vlr.name) else: - self.set_state(new_state) + self._log.warning("VLR %s:%s received state:%s", + vlr.id, vlr.name, vlr.operational_status) - yield from self.publish() + if isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin): + self.vl_instantiation_successful() + + # self.update_state() is responsible for publishing the NSR state. Its being called by vlr_event and update_vnfr. + # The call from vlr_event occurs only if vlr reaches a failed state. Hence implementing the check here to handle + # ns terminate received after other vlr states as vl-alloc-pending, vl-init, running. + if self._ns_terminate_received: + # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call. + if vlr.operational_status in ['running', 'failed']: + self._ns_terminate_received = False + asyncio.ensure_future(self.terminate_ns_cont(), loop=self._loop) class InputParameterSubstitution(object): @@ -2611,7 +3075,7 @@ class InputParameterSubstitution(object): This class is responsible for substituting input parameters into an NSD. """ - def __init__(self, log): + def __init__(self, log, project): """Create an instance of InputParameterSubstitution Arguments: @@ -2619,6 +3083,29 @@ class InputParameterSubstitution(object): """ self.log = log + self.project = project + + def _fix_xpath(self, xpath): + # Fix the parameter.xpath to include project and correct namespace + self.log.debug("Provided xpath: {}".format(xpath)) + #Split the xpath at the / + attrs = xpath.split('/') + new_xp = attrs[0] + for attr in attrs[1:]: + new_ns = 'project-nsd' + name = attr + if ':' in attr: + # Includes namespace + ns, name = attr.split(':', 2) + if ns == "rw-nsd": + ns = "rw-project-nsd" + + new_xp = new_xp + '/' + new_ns + ':' + name + + updated_xpath = self.project.add_project(new_xp) + + self.log.error("Updated xpath: {}".format(updated_xpath)) + return updated_xpath def __call__(self, nsd, nsr_config): """Substitutes input parameters from the NSR config into the NSD @@ -2656,12 +3143,108 @@ class InputParameterSubstitution(object): ) try: - xpath.setxattr(nsd, param.xpath, param.value) + xp = self._fix_xpath(param.xpath) + xpath.setxattr(nsd, xp, param.value) except Exception as e: self.log.exception(e) +class VnfInputParameterSubstitution(object): + """ + This class is responsible for substituting input parameters into a VNFD. + """ + + def __init__(self, log, const_vnfd, project): + """Create an instance of VnfInputParameterSubstitution + + Arguments: + log - a logger for this object to use + const_vnfd - id refs for vnfs in a ns + project - project for the VNFs + """ + + self.log = log + self.member_vnf_index = const_vnfd.member_vnf_index + self.vnfd_id_ref = const_vnfd.vnfd_id_ref + self.project = project + + def __call__(self, vnfr, nsr_config): + """Substitutes vnf input parameters from the NSR config into the VNFD + + This call modifies the provided VNFD with the input parameters that are + contained in the NSR config. + + Arguments: + vnfr - a GI VNFR object + nsr_config - a GI NSR Config object + + """ + + def compose_xpath(xpath, id): + prefix = "/rw-project:project[rw-project:name={}]".format(quoted_key(self.project.name)) + \ + "/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vnfd/".format(quoted_key(id)) + + suffix = '/'.join(xpath.split('/')[3:]).replace('vnfd', 'vnfr') + return prefix + suffix + + def substitute_xpath(ip_xpath, substitute_value, vnfr): + vnfr_xpath = compose_xpath(ip_xpath, vnfr.id) + + try: + verify_xpath_wildcarded = xpath.getxattr(vnfr, vnfr_xpath) + + self.log.debug( + "vnf-input-parameter:{} = {}, for VNF : [member-vnf-index : {}, vnfd-id-ref : {}]".format( + ip_xpath, + substitute_value, + self.member_vnf_index, + self.vnfd_id_ref + ) + ) + try: + xpath.setxattr(vnfr, vnfr_xpath, substitute_value) + + except Exception as e: + self.log.exception(e) + + except Exception as e: + self.log.exception("Wildcarded xpath {} is listy in nature. Can not update. Exception => {}" + .format(ip_xpath, e)) + + if vnfr is None or nsr_config is None: + return + + optional_input_parameters = set() + for input_parameter in nsr_config.nsd.input_parameter_xpath: + optional_input_parameters.add(input_parameter.xpath) + + # Apply the input parameters to the vnfr + if nsr_config.vnf_input_parameter: + for param in nsr_config.vnf_input_parameter: + if (param.member_vnf_index_ref == self.member_vnf_index and param.vnfd_id_ref == self.vnfd_id_ref): + if param.input_parameter: + for ip in param.input_parameter: + if ip.xpath not in optional_input_parameters: + msg = "Substitution Failed. Tried to set an invalid vnf input parameter ({}) for vnf [member-vnf-index : {}, vnfd-id-ref : {}]" + self.log.error(msg.format(ip.xpath, self.member_vnf_index, self.vnfd_id_ref)) + continue + + try: + substitute_xpath(ip.xpath, ip.value, vnfr) + except Exception as e: + self.log.exception(e) + else: + self.log.debug("Substituting Xpaths with default Values") + for input_parameter in nsr_config.nsd.input_parameter_xpath: + if input_parameter.default_value is not None: + try: + if "vnfd-catalog" in input_parameter.xpath: + substitute_xpath(input_parameter.xpath, input_parameter.default_value, vnfr) + except Exception as e: + self.log.exception(e) + + class NetworkServiceDescriptor(object): """ Network service descriptor class @@ -2693,7 +3276,9 @@ class NetworkServiceDescriptor(object): @staticmethod def path_for_id(nsd_id): """ Return path for the passed nsd_id""" - return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id) + return self._nsm._project.add_project( + "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id = '{}'". + format(nsd_id)) def path(self): """ Return the message associated with this NetworkServiceDescriptor""" @@ -2706,7 +3291,7 @@ class NetworkServiceDescriptor(object): class NsdDtsHandler(object): """ The network service descriptor DTS handler """ - XPATH = "C,/nsd:nsd-catalog/nsd:nsd" + XPATH = "C,/project-nsd:nsd-catalog/project-nsd:nsd" def __init__(self, dts, log, loop, nsm): self._dts = dts @@ -2715,6 +3300,7 @@ class NsdDtsHandler(object): self._nsm = nsm self._regh = None + self._project = nsm._project @property def regh(self): @@ -2725,34 +3311,33 @@ class NsdDtsHandler(object): def register(self): """ Register for Nsd create/update/delete/read requests from dts """ + if self._regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)", xact, action) - # Create/Update an NSD record - for cfg in self._regh.get_xact_elements(xact): - # Only interested in those NSD cfgs whose ID was received in prepare callback - if cfg.id in scratch.get('nsds', []) or is_recovery: - self._nsm.update_nsd(cfg) - scratch.pop('nsds', None) + if self._regh: + # Create/Update an NSD record + for cfg in self._regh.get_xact_elements(xact): + # Only interested in those NSD cfgs whose ID was received in prepare callback + if cfg.id in scratch.get('nsds', []) or is_recovery: + self._nsm.update_nsd(cfg) - return RwTypes.RwStatus.SUCCESS + else: + # This can happen if we do the deregister + # during project delete before this is called + self._log.debug("No reg handle for {} for project {}". + format(self.__class__, self._project.name)) - @asyncio.coroutine - def delete_nsd_libs(nsd_id): - """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/ """ - try: - rift_artifacts_dir = os.environ['RIFT_ARTIFACTS'] - nsd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', nsd_id) + scratch.pop('nsds', None) - if os.path.exists (nsd_dir): - shutil.rmtree(nsd_dir, ignore_errors=True) - except Exception as e: - self._log.error("Exception in cleaning up NSD libs {}: {}". - format(nsd_id, e)) - self._log.excpetion(e) + return RwTypes.RwStatus.SUCCESS @asyncio.coroutine def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): @@ -2767,7 +3352,6 @@ class NsdDtsHandler(object): if fref.is_field_deleted(): # Delete an NSD record self._log.debug("Deleting NSD with id %s", msg.id) - yield from delete_nsd_libs(msg.id) self._nsm.delete_nsd(msg.id) else: # Add this NSD to scratch to create/update in apply callback @@ -2777,9 +3361,10 @@ class NsdDtsHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) + xpath = self._project.add_project(NsdDtsHandler.XPATH) self._log.debug( "Registering for NSD config using xpath: %s", - NsdDtsHandler.XPATH, + xpath, ) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) @@ -2787,14 +3372,21 @@ class NsdDtsHandler(object): # Need a list in scratch to store NSDs to create/update later # acg._scratch['nsds'] = list() self._regh = acg.register( - xpath=NsdDtsHandler.XPATH, + xpath=xpath, flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, on_prepare=on_prepare) + def deregister(self): + self._log.debug("De-register NSD handler for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + class VnfdDtsHandler(object): """ DTS handler for VNFD config changes """ - XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd" + XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd" def __init__(self, dts, log, loop, nsm): self._dts = dts @@ -2802,6 +3394,7 @@ class VnfdDtsHandler(object): self._loop = loop self._nsm = nsm self._regh = None + self._project = nsm._project @property def regh(self): @@ -2812,21 +3405,33 @@ class VnfdDtsHandler(object): def register(self): """ Register for VNFD configuration""" + if self._regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)", xact, action, scratch) - # Create/Update a VNFD record - for cfg in self._regh.get_xact_elements(xact): - # Only interested in those VNFD cfgs whose ID was received in prepare callback - if cfg.id in scratch.get('vnfds', []): - self._nsm.update_vnfd(cfg) + is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL + + if self._regh: + # Create/Update a VNFD record + for cfg in self._regh.get_xact_elements(xact): + # Only interested in those VNFD cfgs whose ID was received in prepare callback + if cfg.id in scratch.get('vnfds', []) or is_recovery: + self._nsm.update_vnfd(cfg) - for cfg in self._regh.elements: - if cfg.id in scratch.get('deleted_vnfds', []): - yield from self._nsm.delete_vnfd(cfg.id) + for cfg in self._regh.elements: + if cfg.id in scratch.get('deleted_vnfds', []): + yield from self._nsm.delete_vnfd(cfg.id) + + else: + self._log.warning("Reg handle none for {} in project {}". + format(self.__class__, self._project)) scratch.pop('vnfds', None) scratch.pop('deleted_vnfds', None) @@ -2834,8 +3439,9 @@ class VnfdDtsHandler(object): @asyncio.coroutine def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): """ on prepare callback """ + xpath = ks_path.to_xpath(NsdYang.get_schema()) self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)", - ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg) + xpath, xact_info.query_action, msg) fref = ProtobufC.FieldReference.alloc() fref.goto_whole_message(msg.to_pbcm()) @@ -2850,44 +3456,62 @@ class VnfdDtsHandler(object): vnfds = scratch.setdefault('vnfds', []) vnfds.append(msg.id) - xact_info.respond_xpath(rwdts.XactRspCode.ACK) + try: + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + except rift.tasklets.dts.ResponseError as e: + self._log.warning( + "VnfdDtsHandler in project {} with path {} for action {} failed: {}". + format(self._project, xpath, xact_info.query_action, e)) + + xpath = self._project.add_project(VnfdDtsHandler.XPATH) self._log.debug( - "Registering for VNFD config using xpath: %s", - VnfdDtsHandler.XPATH, - ) + "Registering for VNFD config using xpath {} for project {}" + .format(xpath, self._project)) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: # Need a list in scratch to store VNFDs to create/update later # acg._scratch['vnfds'] = list() # acg._scratch['deleted_vnfds'] = list() self._regh = acg.register( - xpath=VnfdDtsHandler.XPATH, + xpath=xpath, flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY, on_prepare=on_prepare) + def deregister(self): + self._log.debug("De-register VNFD handler for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + + class NsrRpcDtsHandler(object): """ The network service instantiation RPC DTS handler """ EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service" EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service" NETCONF_IP_ADDRESS = "127.0.0.1" NETCONF_PORT = 2022 - RESTCONF_PORT = 8888 - NETCONF_USER = "admin" - NETCONF_PW = "admin" - REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1",8888) + RESTCONF_PORT = 8008 + NETCONF_USER = "@rift" + NETCONF_PW = "rift" + REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1", + RESTCONF_PORT) def __init__(self, dts, log, loop, nsm): self._dts = dts self._log = log self._loop = loop self._nsm = nsm + self._project = nsm._project self._nsd = None self._ns_regh = None self._manager = None - self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + 'config/ns-instance-config' + self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + \ + 'project/{}/'.format(self._project) + \ + 'config/ns-instance-config' self._model = RwYang.Model.create_libncx() self._model.load_schema_ypbc(RwNsrYang.get_schema()) @@ -2934,26 +3558,43 @@ class NsrRpcDtsHandler(object): timeout_secs) def _apply_ns_instance_config(self,payload_dict): - #self._log.debug("At apply NS instance config with payload %s",payload_dict) req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'} - response=requests.post(self._nsr_config_url, headers=req_hdr, auth=('admin', 'admin'),data=payload_dict,verify=False) + response=requests.post(self._nsr_config_url, + headers=req_hdr, + auth=(NsrRpcDtsHandler.NETCONF_USER, NsrRpcDtsHandler.NETCONF_PW), + data=payload_dict, + verify=False) return response @asyncio.coroutine def register(self): """ Register for NS monitoring read from dts """ + @asyncio.coroutine def on_ns_config_prepare(xact_info, action, ks_path, msg): """ prepare callback from dts start-network-service""" assert action == rwdts.QueryAction.RPC + + if not self._project.rpc_check(msg, xact_info): + return + rpc_ip = msg rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({ "nsr_id":str(uuid.uuid4()) }) - if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): - self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip)) - + if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and + ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): + errmsg = ( + "Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}". + format(rpc_ip)) + self._log.error(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH, + errmsg) + xact_info.respond_xpath(rwdts.XactRspCode.NACK, + NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH) + return self._log.debug("start-network-service RPC input: {}".format(rpc_ip)) @@ -2963,34 +3604,23 @@ class NsrRpcDtsHandler(object): nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref) - #if not self._manager: - # self._manager = yield from self._connect() - self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s", rpc_ip.name, rpc_ip.nsd_ref) ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"} ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items() - if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields} + if k in RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr().fields} ns_instance_config_dict.update(ns_instance_config_copy_dict) - ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict) - ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd() + ns_instance_config = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict) + ns_instance_config.nsd = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_Nsd() ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict()) payload_dict = ns_instance_config.to_json(self._model) - #xml = ns_instance_config.to_xml_v2(self._model) - #netconf_xml = self.wrap_netconf_config_xml(xml) - #self._log.debug("Sending configure ns-instance-config xml to %s: %s", - # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS) self._log.debug("Sending configure ns-instance-config json to %s: %s", self._nsr_config_url,ns_instance_config) - #response = yield from self._manager.edit_config( - # target="running", - # config=netconf_xml, - # ) response = yield from self._loop.run_in_executor( None, self._apply_ns_instance_config, @@ -3003,20 +3633,26 @@ class NsrRpcDtsHandler(object): NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH, rpc_op) except Exception as e: - self._log.error("Exception processing the " - "start-network-service: {}".format(e)) - self._log.exception(e) + errmsg = ("Exception processing the " + "start-network-service: {}".format(e)) + self._log.exception(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH, + errmsg) xact_info.respond_xpath(rwdts.XactRspCode.NACK, NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH) + self._ns_regh = yield from self._dts.register( + xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH, + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_ns_config_prepare), + flags=rwdts.Flag.PUBLISHER, + ) - hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,) - - with self._dts.group_create() as group: - self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH, - handler=hdl_ns, - flags=rwdts.Flag.PUBLISHER, - ) + def deregister(self): + if self._ns_regh: + self._ns_regh.deregister() + self._ns_regh = None class NsrDtsHandler(object): @@ -3030,6 +3666,7 @@ class NsrDtsHandler(object): self._log = log self._loop = loop self._nsm = nsm + self._project = self._nsm._project self._nsr_regh = None self._scale_regh = None @@ -3044,13 +3681,18 @@ class NsrDtsHandler(object): def register(self): """ Register for Nsr create/update/delete/read requests from dts """ + if self._nsr_regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + def nsr_id_from_keyspec(ks): - nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) + nsr_path_entry = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) nsr_id = nsr_path_entry.key00.id return nsr_id def group_name_from_keyspec(ks): - group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) + group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) group_name = group_path_entry.key00.scaling_group_name_ref return group_name @@ -3141,32 +3783,6 @@ class NsrDtsHandler(object): for vld in vl_delta["deleted"]: yield from self._nsm.nsr_terminate_vl(nsr_id, vld) - def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch): - # Unfortunately, it is currently difficult to figure out what has exactly - # changed in this xact without Pbdelta support (RIFT-4916) - # As a workaround, we can fetch the pre and post xact elements and - # perform a comparison to figure out adds/deletes/updates - xact_cfgs = list(dts_member_reg.get_xact_elements(xact)) - curr_cfgs = list(dts_member_reg.elements) - - xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs} - curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs} - - # Find Adds - added_keys = set(xact_key_map) - set(curr_key_map) - added_cfgs = [xact_key_map[key] for key in added_keys] - - # Find Deletes - deleted_keys = set(curr_key_map) - set(xact_key_map) - deleted_cfgs = [curr_key_map[key] for key in deleted_keys] - - # Find Updates - updated_keys = set(curr_key_map) & set(xact_key_map) - updated_cfgs = [xact_key_map[key] for key in updated_keys - if xact_key_map[key] != curr_key_map[key]] - - return added_cfgs, deleted_cfgs, updated_cfgs - def get_nsr_key_pairs(dts_member_reg, xact): key_pairs = {} for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True): @@ -3180,6 +3796,7 @@ class NsrDtsHandler(object): self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)", xact, action, scratch) + @asyncio.coroutine def handle_create_nsr(msg, key_pairs=None, restart_mode=False): # Handle create nsr requests """ # Do some validations @@ -3190,15 +3807,17 @@ class NsrDtsHandler(object): self._log.debug("Creating NetworkServiceRecord %s from nsr config %s", msg.id, msg.as_dict()) - nsr = self.nsm.create_nsr(msg, key_pairs=key_pairs, restart_mode=restart_mode) + nsr = yield from self.nsm.create_nsr(msg, + xact, + key_pairs=key_pairs, + restart_mode=restart_mode) return nsr def handle_delete_nsr(msg): @asyncio.coroutine def delete_instantiation(ns_id): """ Delete instantiation """ - with self._dts.transaction() as xact: - yield from self._nsm.terminate_ns(ns_id, xact) + yield from self._nsm.terminate_ns(ns_id, None) # Handle delete NSR requests self._log.info("Delete req for NSR Id: %s received", msg.id) @@ -3206,7 +3825,7 @@ class NsrDtsHandler(object): nsr = self._nsm.get_ns_by_nsr_id(msg.id) nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD) - event_descr = "Terminate rcvd for NS Id:%s" % msg.id + event_descr = "Terminate rcvd for NS Id: %s, NS Name: %s" % (msg.id, msg.name) nsr.record_event("terminate-rcvd", event_descr) self._loop.create_task(delete_instantiation(msg.id)) @@ -3215,9 +3834,18 @@ class NsrDtsHandler(object): def begin_instantiation(nsr): # Begin instantiation self._log.info("Beginning NS instantiation: %s", nsr.id) - yield from self._nsm.instantiate_ns(nsr.id, xact) + try: + yield from self._nsm.instantiate_ns(nsr.id, xact) + except Exception as e: + self._log.exception(e) + raise e + + @asyncio.coroutine + def instantiate_ns(msg, key_pairs, restart_mode=False): + nsr = yield from handle_create_nsr(msg, key_pairs, restart_mode=restart_mode) + yield from begin_instantiation(nsr) - def on_instantiate_done(fut): + def on_instantiate_done(fut, msg): # If the do_instantiate fails, then publish NSR with failed result e = fut.exception() if e is not None: @@ -3233,18 +3861,28 @@ class NsrDtsHandler(object): if action == rwdts.AppconfAction.INSTALL and xact.id is None: key_pairs = [] - for element in self._key_pair_regh.elements: - key_pairs.append(element) - for element in self._nsr_regh.elements: - nsr = handle_create_nsr(element, key_pairs, restart_mode=True) - instantiate_task = self._loop.create_task(begin_instantiation(nsr)) - instantiate_task.add_done_callback(on_instantiate_done) + if self._key_pair_regh: + for element in self._key_pair_regh.elements: + key_pairs.append(element) + else: + self._log.error("Reg handle none for key pair in project {}". + format(self._project)) + + if self._nsr_regh: + for element in self._nsr_regh.elements: + if element.id not in self.nsm._nsrs: + instantiate_task = self._loop.create_task(instantiate_ns(element, key_pairs, + restart_mode=True)) + instantiate_task.add_done_callback(functools.partial(on_instantiate_done, msg=element)) + else: + self._log.error("Reg handle none for NSR in project {}". + format(self._project)) + return RwTypes.RwStatus.SUCCESS (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, - "id", - scratch) + "id") self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs, deleted_msgs, updated_msgs) @@ -3252,9 +3890,8 @@ class NsrDtsHandler(object): if msg.id not in self._nsm.nsrs: self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id) key_pairs = get_nsr_key_pairs(self._key_pair_regh, xact) - nsr = handle_create_nsr(msg,key_pairs) - instantiate_task = self._loop.create_task(begin_instantiation(nsr)) - instantiate_task.add_done_callback(on_instantiate_done) + instantiate_task = self._loop.create_task(instantiate_ns(msg,key_pairs)) + instantiate_task.add_done_callback(functools.partial(on_instantiate_done, msg=msg)) for msg in deleted_msgs: self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id) @@ -3265,7 +3902,6 @@ class NsrDtsHandler(object): for msg in updated_msgs: self._log.info("Update NSR received in on_apply: %s", msg) - self._nsm.nsr_update_cfg(msg.id, msg) if 'nsd' in msg: @@ -3295,149 +3931,118 @@ class NsrDtsHandler(object): xact, action, xact_info, xpath, msg ) - @asyncio.coroutine - def delete_instantiation(ns_id): - """ Delete instantiation """ - yield from self._nsm.terminate_ns(ns_id, None) - - def handle_delete_nsr(): - """ Handle delete NSR requests """ - self._log.info("Delete req for NSR Id: %s received", msg.id) - # Terminate the NSR instance - nsr = self._nsm.get_ns_by_nsr_id(msg.id) - - nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD) - event_descr = "Terminate rcvd for NS Id:%s" % msg.id - nsr.record_event("terminate-rcvd", event_descr) - - self._loop.create_task(delete_instantiation(msg.id)) - fref = ProtobufC.FieldReference.alloc() fref.goto_whole_message(msg.to_pbcm()) + def send_err_msg(err_msg): + self._log.error(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + xpath, + errmsg) + xact_info.respond_xpath(rwdts.XactRspCode.NACK) + + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]: # if this is an NSR create if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs: # Ensure the Cloud account/datacenter has been specified - if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"): - raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR") + if not msg.has_field("datacenter") and not msg.has_field("datacenter"): + errmsg = ("Cloud account or datacenter not specified in NS {}". + format(msg.name)) + send_err_msg(errmsg) + return # Check if nsd is specified if not msg.has_field("nsd"): - raise NsrInstantiationFailed("NSD not specified in NSR") + errmsg = ("NSD not specified in NS {}". + format(msg.name)) + send_err_msg(errmsg) + return else: nsr = self._nsm.nsrs[msg.id] - if msg.has_field("nsd"): if nsr.state != NetworkServiceRecordState.RUNNING: - raise NsrVlUpdateError("Unable to update VL when NSR not in running state") + errmsg = ("Unable to update VL when NS {} not in running state". + format(msg.name)) + send_err_msg(errmsg) + return + if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0: - raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined") + errmsg = ("NS config {} NSD should have atleast 1 VLD". + format(msg.name)) + send_err_msg(errmsg) + return if msg.has_field("scaling_group"): self._log.debug("ScaleMsg %s", msg) self._log.debug("NSSCALINGSTATE %s", nsr.state) if nsr.state != NetworkServiceRecordState.RUNNING: - raise ScalingOperationError("Unable to perform scaling action when NS is not in running state") + errmsg = ("Unable to perform scaling action when NS {} not in running state". + format(msg.name)) + send_err_msg(errmsg) + return if len(msg.scaling_group) > 1: - raise ScalingOperationError("Only a single scaling group can be configured at a time") + errmsg = ("Only a single scaling group can be configured at a time for NS {}". + format(msg.name)) + send_err_msg(errmsg) + return for group_msg in msg.scaling_group: num_new_group_instances = len(group_msg.instance) if num_new_group_instances > 1: - raise ScalingOperationError("Only a single scaling instance can be modified at a time") + errmsg = ("Only a single scaling instance can be modified at a time for NS {}". + format(msg.name)) + send_err_msg(errmsg) + return elif num_new_group_instances == 1: scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref] if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: if len(scale_group.instances) == scale_group.max_instance_count: - raise ScalingOperationError("Max instances for %s reached" % scale_group) + errmsg = (" Max instances for {} reached for NS {}". + format(str(scale_group), msg.name)) + send_err_msg(errmsg) + return acg.handle.prepare_complete_ok(xact_info.handle) - self._log.debug("Registering for NSR config using xpath: %s", - NsrDtsHandler.NSR_XPATH) + xpath = self._project.add_project(NsrDtsHandler.NSR_XPATH) + self._log.debug("Registering for NSR config using xpath: {}". + format(xpath)) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: - self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - on_prepare=on_prepare) + self._nsr_regh = acg.register( + xpath=xpath, + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + on_prepare=on_prepare + ) self._scale_regh = acg.register( - xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, - ) + xpath=self._project.add_project(NsrDtsHandler.SCALE_INSTANCE_XPATH), + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, + ) self._key_pair_regh = acg.register( - xpath=NsrDtsHandler.KEY_PAIR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - ) - - -class NsrOpDataDtsHandler(object): - """ The network service op data DTS handler """ - XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr" - - def __init__(self, dts, log, loop, nsm): - self._dts = dts - self._log = log - self._loop = loop - self._nsm = nsm - self._regh = None - - @property - def regh(self): - """ Return the registration handle""" - return self._regh - - @property - def nsm(self): - """ Return the NS manager instance """ - return self._nsm - - @asyncio.coroutine - def register(self): - """ Register for Nsr op data publisher registration""" - self._log.debug("Registering Nsr op data path %s as publisher", - NsrOpDataDtsHandler.XPATH) - - hdl = rift.tasklets.DTS.RegistrationHandler() - handlers = rift.tasklets.Group.Handler() - with self._dts.group_create(handler=handlers) as group: - self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH, - handler=hdl, - flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE) - - @asyncio.coroutine - def create(self, path, msg): - """ - Create an NS record in DTS with the path and message - """ - self._log.debug("Creating NSR %s:%s", path, msg) - self.regh.create_element(path, msg) - self._log.debug("Created NSR, %s:%s", path, msg) - - @asyncio.coroutine - def update(self, path, msg, flags=rwdts.XactFlag.REPLACE): - """ - Update an NS record in DTS with the path and message - """ - self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh) - self.regh.update_element(path, msg, flags) - self._log.debug("Updated NSR, %s:%s", path, msg) + xpath=self._project.add_project(NsrDtsHandler.KEY_PAIR_XPATH), + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + ) - @asyncio.coroutine - def delete(self, path): - """ - Update an NS record in DTS with the path and message - """ - self._log.debug("Deleting NSR path:%s", path) - self.regh.delete_element(path) - self._log.debug("Deleted NSR path:%s", path) + def deregister(self): + self._log.debug("De-register NSR config for project {}". + format(self._project.name)) + if self._nsr_regh: + self._nsr_regh.deregister() + self._nsr_regh = None + if self._scale_regh: + self._scale_regh.deregister() + self._scale_regh = None + if self._key_pair_regh: + self._key_pair_regh.deregister() + self._key_pair_regh = None class VnfrDtsHandler(object): @@ -3465,11 +4070,10 @@ class VnfrDtsHandler(object): @asyncio.coroutine def register(self): """ Register for vnfr create/update/delete/ advises from dts """ - - def on_commit(xact_info): - """ The transaction has been committed """ - self._log.debug("Got vnfr commit (xact_info: %s)", xact_info) - return rwdts.MemberRspCode.ACTION_OK + if self._regh: + self._log.warning("VNFR DTS handler already registered for project {}". + format(self._project.name)) + return @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): @@ -3480,43 +4084,51 @@ class VnfrDtsHandler(object): xact_info, action, ks_path, msg ) - schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema() + schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema() path_entry = schema.keyspec_to_entry(ks_path) - if path_entry.key00.id not in self._nsm._vnfrs: - self._log.error("%s request for non existent record path %s", + if not path_entry or (path_entry.key00.id not in self._nsm._vnfrs): + # This can happen when using external RO or after delete with monitoring params + self._log.debug("%s request for non existent record path %s", action, xpath) xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath) return - self._log.debug("Deleting VNFR with id %s", path_entry.key00.id) if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE: yield from self._nsm.update_vnfr(msg) elif action == rwdts.QueryAction.DELETE: self._log.debug("Deleting VNFR with id %s", path_entry.key00.id) + self._nsm.delete_vnfr(path_entry.key00.id) xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath) self._log.debug("Registering for VNFR using xpath: %s", - VnfrDtsHandler.XPATH,) + VnfrDtsHandler.XPATH) - hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit, - on_prepare=on_prepare,) + hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=VnfrDtsHandler.XPATH, + self._regh = group.register(xpath=self._nsm._project.add_project( + VnfrDtsHandler.XPATH), handler=hdl, flags=(rwdts.Flag.SUBSCRIBER),) + def deregister(self): + self._log.debug("De-register VNFR for project {}". + format(self._nsm._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None class NsManager(object): """ The Network Service Manager class""" - def __init__(self, dts, log, loop, + def __init__(self, dts, log, loop, project, nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector, vnffgmgr, vnfd_pub_handler, cloud_account_handler): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_handler = nsr_handler self._vnfr_pub_handler = vnfr_handler self._vlr_pub_handler = vlr_handler @@ -3528,19 +4140,20 @@ class NsManager(object): # Intialize the set of variables for implementing Scaling RPC using REST. self._headers = {"content-type":"application/json", "accept":"application/json"} - #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed. - self._user = 'admin' - self._password = 'admin' + self._user = '@rift' + self._password = 'rift' self._ip = 'localhost' self._rport = 8008 - self._conf_url = "https://{ip}:{port}/api/config". \ + self._conf_url = "https://{ip}:{port}/api/config/project/{project}". \ format(ip=self._ip, - port=self._rport) + port=self._rport, + project=self._project.name) self._nsrs = {} self._nsds = {} self._vnfds = {} self._vnfrs = {} + self._nsr_for_vlr = {} self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self) @@ -3551,8 +4164,8 @@ class NsManager(object): self._dts_handlers = [self._nsd_dts_handler, VnfrDtsHandler(dts, log, loop, self), NsrDtsHandler(dts, log, loop, self), - ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback), - NsrRpcDtsHandler(dts,log,loop,self), + ScalingRpcHandler(log, dts, loop, self, self.scale_rpc_callback), + # NsrRpcDtsHandler(dts, log, loop, self), self._vnfd_dts_handler, self.cfgmgr_obj, ] @@ -3625,8 +4238,17 @@ class NsManager(object): @asyncio.coroutine def register(self): """ Register all static DTS handlers """ + self._log.debug("Register DTS handlers for project {}".format(self._project)) for dts_handle in self._dts_handlers: - yield from dts_handle.register() + if asyncio.iscoroutinefunction(dts_handle.register): + yield from dts_handle.register() + else: + dts_handle.register() + + def deregister(self): + """ Register all static DTS handlers """ + for dts_handle in self._dts_handlers: + dts_handle.deregister() def get_ns_by_nsr_id(self, nsr_id): @@ -3670,7 +4292,7 @@ class NsManager(object): def get_scaling_group_information(): scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False) - if output.text == None or len(output.text) == 0: + if output.text is None or len(output.text) == 0: self.log.error("nsr id %s information not present", self._nsr_id) return None scaling_group_info = json.loads(output.text) @@ -3678,14 +4300,15 @@ class NsManager(object): def config_scaling_group_information(scaling_group_info): data_str = json.dumps(scaling_group_info) - self.log.debug("scaling group Info %s", data_str) scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) - response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers) + response = requests.put(scale_out_url, data=data_str, verify=False, + auth=(self._user, self._password), headers=self._headers) response.raise_for_status() def scale_out(): scaling_group_info = get_scaling_group_information() + self._log.debug("Scale out info: {}".format(scaling_group_info)) if scaling_group_info is None: return @@ -3704,7 +4327,8 @@ class NsManager(object): scaling_group["instance"].append({"id": int(msg.instance_id)}) if not scaling_group_present: - scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}] + scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, + "instance": [{"id": msg.instance_id}]}] config_scaling_group_information(scaling_group_info) return @@ -3749,7 +4373,7 @@ class NsManager(object): nsr.nsr_cfg_msg= msg def nsr_instantiate_vl(self, nsr_id, vld): - self.log.debug("NSR {} create VL {}".format(nsr_id, vld)) + self.log.error("NSR {} create VL {}".format(nsr_id, vld)) nsr = self._nsrs[nsr_id] if nsr.state != NetworkServiceRecordState.RUNNING: raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state") @@ -3766,7 +4390,8 @@ class NsManager(object): # Not calling in a separate task as this is called from a separate task yield from nsr.delete_vl_instance(vld) - def create_nsr(self, nsr_msg, key_pairs=None,restart_mode=False): + @asyncio.coroutine + def create_nsr(self, nsr_msg, config_xact, key_pairs=None,restart_mode=False): """ Create an NSR instance """ self._log.debug("NSRMSG %s", nsr_msg) if nsr_msg.id in self._nsrs: @@ -3774,12 +4399,18 @@ class NsManager(object): self._log.error(msg) raise NetworkServiceRecordError(msg) - self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s", + self._log.debug("Create NetworkServiceRecord nsr id %s from nsd_id %s", nsr_msg.id, nsr_msg.nsd.id) - nsm_plugin = self._ro_plugin_selector.ro_plugin - sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account) + nsm_plugin = self._ro_plugin_selector.get_ro_plugin(nsr_msg.resource_orchestrator) + #Work Around - openmano expects datacenter id instead of datacenter name + if isinstance(nsm_plugin, openmano_nsm.OpenmanoNsPlugin): + for uuid, name in nsm_plugin._cli_api.datacenter_list(): + if name == nsr_msg.datacenter: + nsr_msg.datacenter = uuid + + sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.datacenter) nsr = NetworkServiceRecord(self._dts, self._log, @@ -3789,11 +4420,26 @@ class NsManager(object): nsr_msg, sdn_account_name, key_pairs, + self._project, restart_mode=restart_mode, - vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr + vlr_handler=self._vlr_pub_handler ) self._nsrs[nsr_msg.id] = nsr - nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs) + + try: + # Generate ssh key pair if required + nsr.generate_ssh_key_pair(config_xact) + except Exception as e: + self._log.exception("SSH key: {}".format(e)) + + self._log.debug("NSR {}: SSh key generated: {}".format(nsr_msg.name, + nsr.public_key)) + + ssh_key = {'private_key': nsr.private_key, + 'public_key': nsr.public_key + } + + nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs, ssh_key=ssh_key) return nsr @@ -3813,7 +4459,11 @@ class NsManager(object): raise NetworkServiceRecordError(err) nsr = self._nsrs[nsr_id] - yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact) + try: + yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact) + except Exception as e: + self._log.exception("NS instantiate: {}".format(e)) + raise e @asyncio.coroutine def update_vnfr(self, vnfr): @@ -3821,10 +4471,18 @@ class NsManager(object): vnfr_state = self._vnfrs[vnfr.id].state self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr) - + + no_of_active_vms = 0 + for vdur in vnfr.vdur: + if vdur.operational_status == 'running': + no_of_active_vms += 1 + + self._vnfrs[vnfr.id]._active_vdus = no_of_active_vms yield from self._vnfrs[vnfr.id].update_state(vnfr) nsr = self.find_nsr_for_vnfr(vnfr.id) - yield from nsr.update_state() + if nsr is not None: + nsr._vnf_inst_started = False + yield from nsr.update_state() def find_nsr_for_vnfr(self, vnfr_id): """ Find the NSR which )has the passed vnfr id""" @@ -3840,7 +4498,7 @@ class NsManager(object): @asyncio.coroutine def get_nsr_config(self, nsd_id): - xpath = "C,/nsr:ns-instance-config" + xpath = self._project.add_project("C,/nsr:ns-instance-config") results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) for result in results: @@ -3976,6 +4634,9 @@ class NsManager(object): Terminate network service for the given NSR Id """ + if nsr_id not in self._nsrs: + return + # Terminate the instances/networks assocaited with this nw service self._log.debug("Terminating the network service %s", nsr_id) try : @@ -3983,64 +4644,96 @@ class NsManager(object): except Exception as e: self.log.exception("Failed to terminate NSR[id=%s]", nsr_id) - # Unpublish the NSR record - self._log.debug("Unpublishing the network service %s", nsr_id) - yield from self._nsrs[nsr_id].unpublish(xact) - - # Finaly delete the NS instance from this NS Manager - self._log.debug("Deletng the network service %s", nsr_id) - self.delete_nsr(nsr_id) + def vlr_event(self, vlr, action): + self._log.debug("Received VLR %s with action:%s", vlr, action) + # Find the NS and see if we can proceed + nsr = self.find_nsr_for_vlr_id(vlr.id) + if nsr is None: + self._log.error("VLR %s:%s received for NSR, state:%s", + vlr.id, vlr.name, vlr.operational_status) + return + nsr.vlr_event(vlr, action) + + def add_vlr_id_nsr_map(self, vlr_id, nsr): + """ Add a mapping for vlr_id into NSR """ + self._nsr_for_vlr[vlr_id] = nsr + + def remove_vlr_id_nsr_map(self, vlr_id): + """ Remove a mapping for vlr_id into NSR """ + if vlr_id in self._nsr_for_vlr: + del self._nsr_for_vlr[vlr_id] + + def find_nsr_for_vlr_id(self, vlr_id): + """ Find NSR for VLR id """ + nsr = None + if vlr_id in self._nsr_for_vlr: + nsr = self._nsr_for_vlr[vlr_id] + return nsr class NsmRecordsPublisherProxy(object): """ This class provides a publisher interface that allows plugin objects to publish NSR/VNFR/VLR""" - def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr): + def __init__(self, dts, log, loop, project, nsr_pub_hdlr, + vnfr_pub_hdlr, vlr_pub_hdlr,): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_pub_hdlr = nsr_pub_hdlr self._vlr_pub_hdlr = vlr_pub_hdlr self._vnfr_pub_hdlr = vnfr_pub_hdlr + @asyncio.coroutine + def publish_nsr_opdata(self, xact, nsr): + """ Publish an NSR """ + path = ("D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref={}]" + ).format(quoted_key(nsr.ns_instance_config_ref)) + return (yield from self._nsr_pub_hdlr.update(xact, path, nsr)) + @asyncio.coroutine def publish_nsr(self, xact, nsr): """ Publish an NSR """ - path = NetworkServiceRecord.xpath_from_nsr(nsr) + path = self._project.add_project(NetworkServiceRecord.xpath_from_nsr(nsr)) return (yield from self._nsr_pub_hdlr.update(xact, path, nsr)) @asyncio.coroutine def unpublish_nsr(self, xact, nsr): """ Unpublish an NSR """ - path = NetworkServiceRecord.xpath_from_nsr(nsr) + path = self._project.add_project(NetworkServiceRecord.xpath_from_nsr(nsr)) return (yield from self._nsr_pub_hdlr.delete(xact, path)) @asyncio.coroutine def publish_vnfr(self, xact, vnfr): """ Publish an VNFR """ - path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr) + path = self._project.add_project(VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)) return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr)) @asyncio.coroutine def unpublish_vnfr(self, xact, vnfr): """ Unpublish a VNFR """ - path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr) - return (yield from self._vnfr_pub_hdlr.delete(xact, path)) + path = self._project.add_project(VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)) + yield from self._vnfr_pub_hdlr.delete(xact, path) + # NOTE: The regh delete does not send the on_prepare to VNFM tasklet as well + # as remove all the VNFR elements. So need to send this additional delete block. + with self._dts.transaction(flags = 0) as xact: + block = xact.block_create() + block.add_query_delete(path) + yield from block.execute(flags=0, now=True) @asyncio.coroutine def publish_vlr(self, xact, vlr): """ Publish a VLR """ - path = VirtualLinkRecord.vlr_xpath(vlr) + path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) return (yield from self._vlr_pub_hdlr.update(xact, path, vlr)) @asyncio.coroutine def unpublish_vlr(self, xact, vlr): """ Unpublish a VLR """ - path = VirtualLinkRecord.vlr_xpath(vlr) + path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) return (yield from self._vlr_pub_hdlr.delete(xact, path)) - class ScalingRpcHandler(mano_dts.DtsHandler): """ The Network service Monitor DTS handler """ SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in" @@ -4051,22 +4744,48 @@ class ScalingRpcHandler(mano_dts.DtsHandler): ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT') - def __init__(self, log, dts, loop, callback=None): - super().__init__(log, dts, loop) + def __init__(self, log, dts, loop, nsm, callback=None): + super().__init__(log, dts, loop, nsm._project) + self._nsm = nsm self.callback = callback self.last_instance_id = defaultdict(int) + self._reg_in = None + self._reg_out = None + @asyncio.coroutine def register(self): + def send_err_msg(err_msg, xact_info, ks_path, e=False): + xpath = ks_path.to_xpath(NsrYang.get_schema()) + if e: + self._log.exception(err_msg) + else: + self._log.error(err_msg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + xpath, + err_msg) + xact_info.respond_xpath(rwdts.XactRspCode.NACK) + @asyncio.coroutine def on_scale_in_prepare(xact_info, action, ks_path, msg): assert action == rwdts.QueryAction.RPC + self._log.debug("Scale in called: {}".format(msg.as_dict())) + if not self.project.rpc_check(msg, xact_info): + return + try: rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ "instance_id": msg.instance_id}) + nsr = self._nsm.nsrs[msg.nsr_id_ref] + if nsr.state != NetworkServiceRecordState.RUNNING: + errmsg = ("Unable to perform scaling action when NS {}({}) not in running state". + format(nsr.name, nsr.id)) + send_err_msg(errmsg, xact_info, ks_path) + return + xact_info.respond_xpath( rwdts.XactRspCode.ACK, self.__class__.SCALE_IN_OUTPUT_XPATH, @@ -4074,16 +4793,20 @@ class ScalingRpcHandler(mano_dts.DtsHandler): if self.callback: self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) + except Exception as e: - self.log.exception(e) - xact_info.respond_xpath( - rwdts.XactRspCode.NACK, - self.__class__.SCALE_IN_OUTPUT_XPATH) + errmsg = ("Exception doing scale in using {}: {}". + format(msg, e)) + send_err_msg(errmsg, xact_info, ks_path, e=True) @asyncio.coroutine def on_scale_out_prepare(xact_info, action, ks_path, msg): assert action == rwdts.QueryAction.RPC + self._log.debug("Scale out called: {}".format(msg.as_dict())) + if not self.project.rpc_check(msg, xact_info): + return + try: scaling_group = msg.scaling_group_name_ref if not msg.instance_id: @@ -4091,6 +4814,13 @@ class ScalingRpcHandler(mano_dts.DtsHandler): msg.instance_id = last_instance_id + 1 self.last_instance_id[scale_group] += 1 + nsr = self._nsm.nsrs[msg.nsr_id_ref] + if nsr.state != NetworkServiceRecordState.RUNNING: + errmsg = ("Unable to perform scaling action when NS {}({}) not in running state". + format(nsr.name, nsr.id)) + send_err_msg(errmsg, xact_info, ks_path) + return + rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ "instance_id": msg.instance_id}) @@ -4101,44 +4831,45 @@ class ScalingRpcHandler(mano_dts.DtsHandler): if self.callback: self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) + except Exception as e: - self.log.exception(e) - xact_info.respond_xpath( - rwdts.XactRspCode.NACK, - self.__class__.SCALE_OUT_OUTPUT_XPATH) + errmsg = ("Exception doing scale in using {}: {}". + format(msg, e)) + send_err_msg(errmsg, xact_info, ks_path, e=True) - scale_in_hdl = rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_scale_in_prepare) - scale_out_hdl = rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_scale_out_prepare) + self._reg_in = yield from self.dts.register( + xpath=self.__class__.SCALE_IN_INPUT_XPATH, + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_scale_in_prepare), + flags=rwdts.Flag.PUBLISHER) - with self.dts.group_create() as group: - group.register( - xpath=self.__class__.SCALE_IN_INPUT_XPATH, - handler=scale_in_hdl, - flags=rwdts.Flag.PUBLISHER) - group.register( - xpath=self.__class__.SCALE_OUT_INPUT_XPATH, - handler=scale_out_hdl, - flags=rwdts.Flag.PUBLISHER) + self._reg_out = yield from self.dts.register( + xpath=self.__class__.SCALE_OUT_INPUT_XPATH, + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_scale_out_prepare), + flags=rwdts.Flag.PUBLISHER) + def deregister(self): + if self._reg_in: + self._reg_in.deregister() + self._reg_in = None -class NsmTasklet(rift.tasklets.Tasklet): - """ - The network service manager tasklet - """ - def __init__(self, *args, **kwargs): - super(NsmTasklet, self).__init__(*args, **kwargs) - self.rwlog.set_category("rw-mano-log") - self.rwlog.set_subcategory("nsm") + if self._reg_out: + self._reg_out.deregister() + self._reg_out = None - self._dts = None + +class NsmProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(NsmProject, self).__init__(tasklet.log, name) + self.update(tasklet) self._nsm = None self._ro_plugin_selector = None self._vnffgmgr = None - self._nsr_handler = None + self._nsr_pub_handler = None self._vnfr_pub_handler = None self._vlr_pub_handler = None self._vnfd_pub_handler = None @@ -4146,57 +4877,48 @@ class NsmTasklet(rift.tasklets.Tasklet): self._records_publisher_proxy = None - def start(self): - """ The task start callback """ - super(NsmTasklet, self).start() - self.log.info("Starting NsmTasklet") - - self.log.debug("Registering with dts") - self._dts = rift.tasklets.DTS(self.tasklet_info, - RwNsmYang.get_schema(), - self.loop, - self.on_dts_state_change) - - self.log.debug("Created DTS Api GI Object: %s", self._dts) - - def stop(self): - try: - self._dts.deinit() - except Exception: - print("Caught Exception in NSM stop:", sys.exc_info()[0]) - raise - - def on_instance_started(self): - """ Task instance started callback """ - self.log.debug("Got instance started callback") + def vlr_event(self, vlr, action): + """ VLR Event callback """ + self.log.debug("VLR Event received for VLR %s with action %s", vlr, action) + self._nsm.vlr_event(vlr, action) @asyncio.coroutine - def init(self): - """ Task init callback """ - self.log.debug("Got instance started callback") - - self.log.debug("creating config account handler") + def register(self): + self.log.debug("Register NsmProject for {}".format(self.name)) - self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop) + self._nsr_pub_handler = publisher.NsrOpDataDtsHandler( + self._dts, self.log, self.loop, self) yield from self._nsr_pub_handler.register() - self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop) + self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler( + self._dts, self.log, self.loop, self) yield from self._vnfr_pub_handler.register() - self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop) + self._vlr_pub_handler = publisher.VlrPublisherDtsHandler( + self._dts, self.log, self.loop, self) yield from self._vlr_pub_handler.register() - manifest = self.tasklet_info.get_pb_manifest() + self._vlr_sub_handler = subscriber.VlrSubscriberDtsHandler(self.log, + self._dts, + self.loop, + self, + self.vlr_event, + ) + yield from self._vlr_sub_handler.register() + + manifest = self._tasklet.tasklet_info.get_pb_manifest() use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl ssl_cert = manifest.bootstrap_phase.rwsecurity.cert ssl_key = manifest.bootstrap_phase.rwsecurity.key - self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop) + self._vnfd_pub_handler = publisher.VnfdPublisher( + use_ssl, ssl_cert, ssl_key, self.loop, self) self._records_publisher_proxy = NsmRecordsPublisherProxy( self._dts, self.log, self.loop, + self, self._nsr_pub_handler, self._vnfr_pub_handler, self._vlr_pub_handler, @@ -4204,38 +4926,116 @@ class NsmTasklet(rift.tasklets.Tasklet): # Register the NSM to receive the nsm plugin # when cloud account is configured - self._ro_plugin_selector = cloud.ROAccountPluginSelector( + self._ro_plugin_selector = cloud.ROAccountConfigSubscriber( self._dts, self.log, self.loop, - self._records_publisher_proxy, + self, + self._records_publisher_proxy ) yield from self._ro_plugin_selector.register() self._cloud_account_handler = cloud.CloudAccountConfigSubscriber( self._log, self._dts, - self.log_hdl) + self.log_hdl, + self, + ) yield from self._cloud_account_handler.register() - self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop) + self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop, + self, self._cloud_account_handler) yield from self._vnffgmgr.register() self._nsm = NsManager( self._dts, self.log, self.loop, + self, self._nsr_pub_handler, self._vnfr_pub_handler, self._vlr_pub_handler, self._ro_plugin_selector, self._vnffgmgr, self._vnfd_pub_handler, - self._cloud_account_handler + self._cloud_account_handler, ) yield from self._nsm.register() + self.log.debug("Register NsmProject for {} complete".format(self.name)) + + def deregister(self): + self._log.debug("Project {} de-register".format(self.name)) + self._nsm.deregister() + self._vnffgmgr.deregister() + self._cloud_account_handler.deregister() + self._ro_plugin_selector.deregister() + self._nsr_pub_handler.deregister() + self._vnfr_pub_handler.deregister() + self._vlr_pub_handler.deregister() + self._vlr_sub_handler.deregister() + self._nsm = None + + @asyncio.coroutine + def delete_prepare(self): + if self._nsm and self._nsm._nsrs: + delete_msg = "Project has NSR associated with it. Delete all Project NSR and try again." + return False, delete_msg + return True, "True" + + +class NsmTasklet(rift.tasklets.Tasklet): + """ + The network service manager tasklet + """ + def __init__(self, *args, **kwargs): + super(NsmTasklet, self).__init__(*args, **kwargs) + self.rwlog.set_category("rw-mano-log") + self.rwlog.set_subcategory("nsm") + + self._dts = None + self.project_handler = None + self.projects = {} + + @property + def dts(self): + return self._dts + + def start(self): + """ The task start callback """ + super(NsmTasklet, self).start() + self.log.info("Starting NsmTasklet") + + self.log.debug("Registering with dts") + self._dts = rift.tasklets.DTS(self.tasklet_info, + RwNsmYang.get_schema(), + self.loop, + self.on_dts_state_change) + + self.log.debug("Created DTS Api GI Object: %s", self._dts) + + def stop(self): + try: + self._dts.deinit() + except Exception: + print("Caught Exception in NSM stop:", sys.exc_info()[0]) + raise + + def on_instance_started(self): + """ Task instance started callback """ + self.log.debug("Got instance started callback") + + @asyncio.coroutine + def init(self): + """ Task init callback """ + self.log.debug("Got instance started callback") + + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, NsmProject) + self.project_handler.register() + + @asyncio.coroutine def run(self):