X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwvnfm%2Frift%2Ftasklets%2Frwvnfmtasklet%2Frwvnfmtasklet.py;h=c00f91e1c63b9d7d6138ddddc81b24b45e6e08fd;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=2cbe240429ad9e4a9afb72c9afd47a8b2fd81892;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwvnfm/rift/tasklets/rwvnfmtasklet/rwvnfmtasklet.py b/rwlaunchpad/plugins/rwvnfm/rift/tasklets/rwvnfmtasklet/rwvnfmtasklet.py index 2cbe2404..c00f91e1 100755 --- a/rwlaunchpad/plugins/rwvnfm/rift/tasklets/rwvnfmtasklet/rwvnfmtasklet.py +++ b/rwlaunchpad/plugins/rwvnfm/rift/tasklets/rwvnfmtasklet/rwvnfmtasklet.py @@ -1,4 +1,3 @@ -# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,18 +16,21 @@ import asyncio import collections import enum +import gi import logging -import uuid -import time import os.path import re import shutil import sys +import time +import uuid +import yaml -import gi gi.require_version('RwDts', '1.0') gi.require_version('RwVnfrYang', '1.0') +gi.require_version('VnfrYang', '1.0') gi.require_version('RwVnfmYang', '1.0') +gi.require_version('RwVnfdYang', '1.0') gi.require_version('RwVlrYang', '1.0') gi.require_version('RwManifestYang', '1.0') gi.require_version('RwBaseYang', '1.0') @@ -37,6 +39,8 @@ gi.require_version('RwResourceMgrYang', '1.0') from gi.repository import ( RwDts as rwdts, RwVnfrYang, + RwVnfdYang, + VnfdYang, RwVnfmYang, RwVlrYang, VnfrYang, @@ -44,15 +48,24 @@ from gi.repository import ( RwBaseYang, RwResourceMgrYang, ProtobufC, + RwTypes ) +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key import rift.tasklets import rift.package.store import rift.package.cloud_init import rift.package.script import rift.mano.dts as mano_dts +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + ) import rift.mano.utils.short_name as mano_short_name +from . import subscriber +VCP_FIELDS = ['name', 'id', 'connection_point_id', 'type_yang', 'ip_address', 'mac_address'] class VMResourceError(Exception): """ VM resource Error""" @@ -135,10 +148,18 @@ class VnfrInstantiationFailed(Exception): class VNFMPlacementGroupError(Exception): + """ VNF placement group Error """ pass + +class VlrError(Exception): + """ Virtual Link Record Error """ + pass + + class VirtualNetworkFunctionRecordState(enum.Enum): """ VNFR state """ + PRE_INIT = 0 INIT = 1 VL_INIT_PHASE = 2 VM_INIT_PHASE = 3 @@ -160,133 +181,31 @@ class VDURecordState(enum.Enum): TERMINATED = 6 FAILED = 10 - -class VcsComponent(object): - """ VCS Component within the VNF descriptor """ - def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name): - self._dts = dts - self._log = log - self._loop = loop - self._component = component - self._cluster_name = cluster_name - self._vcs_handler = vcs_handler - self._mangled_name = mangled_name - - @staticmethod - def mangle_name(component_name, vnf_name, vnfd_id): - """ mangled component name """ - return vnf_name + ":" + component_name + ":" + vnfd_id - - @property - def name(self): - """ name of this component""" - return self._mangled_name - - @property - def path(self): - """ The path for this object """ - return("D,/rw-manifest:manifest" + - "/rw-manifest:operational-inventory" + - "/rw-manifest:component" + - "[rw-manifest:component-name = '{}']").format(self.name) - - @property - def instance_xpath(self): - """ The path for this object """ - return("D,/rw-base:vcs" + - "/instances" + - "/instance" + - "[instance-name = '{}']".format(self._cluster_name)) - - @property - def start_comp_xpath(self): - """ start component xpath """ - return (self.instance_xpath + - "/child-n[instance-name = 'START-REQ']") - - def get_start_comp_msg(self, ip_address): - """ start this component """ - start_msg = RwBaseYang.VcsInstance_Instance_ChildN() - start_msg.instance_name = 'START-REQ' - start_msg.component_name = self.name - start_msg.admin_command = "START" - start_msg.ip_address = ip_address - - return start_msg - - @property - def msg(self): - """ Returns the message for this vcs component""" - - vcs_comp_dict = self._component.as_dict() - - def mangle_comp_names(comp_dict): - """ mangle component name with VNF name, id""" - for key, val in comp_dict.items(): - if isinstance(val, dict): - comp_dict[key] = mangle_comp_names(val) - elif isinstance(val, list): - i = 0 - for ent in val: - if isinstance(ent, dict): - val[i] = mangle_comp_names(ent) - else: - val[i] = ent - i += 1 - elif key == "component_name": - comp_dict[key] = VcsComponent.mangle_name(val, - self._vnfd_name, - self._vnfd_id) - return comp_dict - - mangled_dict = mangle_comp_names(vcs_comp_dict) - msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict) - return msg - - @asyncio.coroutine - def publish(self, xact): - """ Publishes the VCS component """ - self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s", - self.name, self.path, self.msg) - yield from self._vcs_handler.publish(xact, self.path, self.msg) - - @asyncio.coroutine - def start(self, xact, parent, ip_addr=None): - """ Starts this VCS component """ - # ATTN RV - replace with block add - start_msg = self.get_start_comp_msg(ip_addr) - self._log.debug("starting component %s %s", - self.start_comp_xpath, start_msg) - yield from self._dts.query_create(self.start_comp_xpath, - 0, - start_msg) - self._log.debug("started component %s, %s", - self.start_comp_xpath, start_msg) - - class VirtualDeploymentUnitRecord(object): """ Virtual Deployment Unit Record """ def __init__(self, dts, log, loop, + project, vdud, vnfr, nsr_config, mgmt_intf, mgmt_network, - cloud_account_name, + datacenter_name, vnfd_package_store, vdur_id=None, placement_groups=[]): self._dts = dts self._log = log self._loop = loop + self._project = project self._vdud = vdud self._vnfr = vnfr self._nsr_config = nsr_config self._mgmt_intf = mgmt_intf - self._cloud_account_name = cloud_account_name + self._datacenter_name = datacenter_name self._vnfd_package_store = vnfd_package_store self._mgmt_network = mgmt_network @@ -301,35 +220,46 @@ class VirtualDeploymentUnitRecord(object): self._rm_regh = None self._vm_resp = None self._vdud_cloud_init = None - self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id) + self._vdur_console_handler = VnfrConsoleOperdataDtsHandler( + dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id) + @asyncio.coroutine def vdu_opdata_register(self): yield from self._vdur_console_handler.register() - def cp_ip_addr(self, cp_name): - """ Find ip address by connection point name """ + def vm_cp_info(self, cp_name): + """ Find the VM Connection info by connection point name """ if self._vm_resp is not None: for conn_point in self._vm_resp.connection_points: if conn_point.name == cp_name: - return conn_point.ip_address - return "0.0.0.0" + return conn_point + return None + + def cp_ip_addr(self, cp_name): + """ Find ip address by connection point name """ + vm_cp_info = self.vm_cp_info(cp_name) + if vm_cp_info: + return vm_cp_info.ip_address + else: + return "0.0.0.0" def cp_mac_addr(self, cp_name): """ Find mac address by connection point name """ - if self._vm_resp is not None: - for conn_point in self._vm_resp.connection_points: - if conn_point.name == cp_name: - return conn_point.mac_addr - return "00:00:00:00:00:00" + vm_cp_info = self.vm_cp_info(cp_name) + if vm_cp_info: + return vm_cp_info.mac_addr + else: + return "00:00:00:00:00:00" def cp_id(self, cp_name): """ Find connection point id by connection point name """ - if self._vm_resp is not None: - for conn_point in self._vm_resp.connection_points: - if conn_point.name == cp_name: - return conn_point.connection_point_id - return '' + vm_cp_info = self.vm_cp_info(cp_name) + if vm_cp_info: + return vm_cp_info.connection_point_id + else: + return str() + @property def vdu_id(self): @@ -350,9 +280,9 @@ class VirtualDeploymentUnitRecord(object): """ Return this VDUR's unique short name """ # Impose these restrictions on Unique name # Max 64 - # - Max 10 of NSR name (remove all specialcharacters, only numbers and alphabets) - # - 6 chars of shortened name - # - Max 10 of VDU name (remove all specialcharacters, only numbers and alphabets) + # - Max trailing 10 chars of NSR name (remove all specialcharacters, only numbers and alphabets) + # - 9 chars of shortened name + # - Max trailing 10 of VDU name (remove all specialcharacters, only numbers and alphabets) # def _restrict_tag(input_str): # Exclude all characters except a-zA-Z0-9 @@ -370,9 +300,9 @@ class VirtualDeploymentUnitRecord(object): return shortstr @property - def cloud_account_name(self): + def datacenter_name(self): """ Cloud account this VDU should be created in """ - return self._cloud_account_name + return self._datacenter_name @property def image_name(self): @@ -419,8 +349,9 @@ class VirtualDeploymentUnitRecord(object): "vswitch_epa", "hypervisor_epa", "host_epa", - "volumes", + "volumes" ] + vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields} vdur_dict = {"id": self._vdur_id, @@ -431,6 +362,7 @@ class VirtualDeploymentUnitRecord(object): "unique_short_name": self.unique_short_name } + if self.vm_resp is not None: vdur_dict.update({"vim_id": self.vm_resp.vdu_id, "flavor_id": self.vm_resp.flavor_id @@ -438,14 +370,15 @@ class VirtualDeploymentUnitRecord(object): if self._vm_resp.has_field('image_id'): vdur_dict.update({ "image_id": self.vm_resp.image_id }) - if self.management_ip is not None: + if self.management_ip: vdur_dict["management_ip"] = self.management_ip - if self.vm_management_ip is not None: + if self.vm_management_ip: vdur_dict["vm_management_ip"] = self.vm_management_ip vdur_dict.update(vdu_copy_dict) + if self.vm_resp is not None: if self._vm_resp.has_field('volumes'): for opvolume in self._vm_resp.volumes: @@ -464,9 +397,18 @@ class VirtualDeploymentUnitRecord(object): vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'): metadata_list = list() + + # supplemental_boot_data below is returned by Openstack. + # The self._vm_resp version of supplemental data is defaulting to CLOUD_METADATA + # as Openstack does not repond with 'destination' attribute of custom meta data elements. + # Therefore the vdur when published does not specify the destination of the custom-meta-data. + # Should we add this field (destination) explicitly here by comparig the keys with the already obtained + # details in self._vdud ? + for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data: - metadata_list.append(metadata_item.as_dict()) + metadata_list.append(metadata_item.as_dict()) vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list + if self._vm_resp.supplemental_boot_data.has_field('config_file'): file_list = list() for file_item in self._vm_resp.supplemental_boot_data.config_file: @@ -479,45 +421,77 @@ class VirtualDeploymentUnitRecord(object): for intf, cp_id, vlr in self._int_intf: cp = self.find_internal_cp_by_cp_id(cp_id) - icp_list.append({"name": cp.name, - "id": cp.id, - "type_yang": "VPORT", - "ip_address": self.cp_ip_addr(cp.id), - "mac_address": self.cp_mac_addr(cp.id)}) + cp_info = dict(name=cp.name, + id=cp.id, + type_yang='VPORT', + ip_address=self.cp_ip_addr(cp.name), + mac_address=self.cp_mac_addr(cp.name), + connection_point_id=self.cp_id(cp.name)) + + virtual_cps = [ vcp for vcp in vlr._vlr.virtual_connection_points + if [ True for cp_ref in vcp.associated_cps if cp.name == cp_ref ]] + + if virtual_cps: + for vcp in virtual_cps: + cp_info['virtual_cps'] = [ {k:v for k,v in vcp.as_dict().items() if k in VCP_FIELDS} + for vcp in virtual_cps ] + + icp_list.append(cp_info) + + ii_dict = {"name": intf.name, + "internal_connection_point_ref": cp.id, + "virtual_interface": {}} - ii_list.append({"name": intf.name, - "vdur_internal_connection_point_ref": cp.id, - "virtual_interface": {}}) + if "position" in intf.as_dict(): + ii_dict["position"] = intf.position + + ii_list.append(ii_dict) vdur_dict["internal_connection_point"] = icp_list self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"]) - vdur_dict["internal_interface"] = ii_list + ei_list = [] for intf, cp, vlr in self._ext_intf: - ei_list.append({"name": cp.name, - "vnfd_connection_point_ref": cp.name, - "virtual_interface": {}}) + ei_dict = {"name": intf.name, + "external_connection_point_ref": cp.name, + "virtual_interface": {}} + if "position" in intf.as_dict(): + ei_dict["position"] = intf.position + + ei_list.append(ei_dict) + + virtual_cps = [ vcp for vcp in vlr.virtual_connection_points + if [ True for cp_ref in vcp.associated_cps if cp.name == cp_ref ]] + + if virtual_cps: + for vcp in virtual_cps: + virtual_cp_info = [ {k:v for k,v in vcp.as_dict().items() if k in VCP_FIELDS} + for vcp in virtual_cps ] + else: + virtual_cp_info = [] + self._vnfr.update_cp(cp.name, self.cp_ip_addr(cp.name), self.cp_mac_addr(cp.name), - self.cp_id(cp.name)) + self.cp_id(cp.name), + virtual_cp_info) - vdur_dict["external_interface"] = ei_list + vdur_dict["interface"] = ei_list + ii_list - placement_groups = [] - for group in self._placement_groups: - placement_groups.append(group.as_dict()) - vdur_dict['placement_groups_info'] = placement_groups - return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict) + vdur_dict['placement_groups_info'] = [group.as_dict() + for group in self._placement_groups] + + return RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict) @property def resmgr_path(self): """ path for resource-mgr""" - return ("D,/rw-resource-mgr:resource-mgmt" + - "/vdu-event" + - "/vdu-event-data[event-id='{}']".format(self._request_id)) + xpath = self._project.add_project("D,/rw-resource-mgr:resource-mgmt" + + "/vdu-event" + + "/vdu-event-data[event-id={}]".format(quoted_key(self._request_id))) + return xpath @property def vm_flavor_msg(self): @@ -531,7 +505,20 @@ class VirtualDeploymentUnitRecord(object): def vdud_cloud_init(self): """ Return the cloud-init contents for the VDU """ if self._vdud_cloud_init is None: - self._vdud_cloud_init = self.cloud_init() + ci = self.cloud_init() + + # VNFR ssh public key, if available + if self._vnfr.public_key: + if not ci: + ci = "#cloud-config" + self._vdud_cloud_init = """{} +ssh_authorized_keys: + - {}""". \ + format(ci, self._vnfr.public_key) + else: + self._vdud_cloud_init = ci + + self._log.debug("Cloud init: {}".format(self._vdud_cloud_init)) return self._vdud_cloud_init @@ -539,9 +526,10 @@ class VirtualDeploymentUnitRecord(object): """ Populate cloud_init with cloud-config script from either the inline contents or from the file provided """ + cloud_init_msg = None if self._vdud.cloud_init is not None: self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init) - return self._vdud.cloud_init + cloud_init_msg = self._vdud.cloud_init elif self._vdud.cloud_init_file is not None: # Get cloud-init script contents from the file provided in the cloud_init_file param self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file) @@ -550,12 +538,52 @@ class VirtualDeploymentUnitRecord(object): stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id) cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log) try: - return cloud_init_extractor.read_script(stored_package, filename) + cloud_init_msg = cloud_init_extractor.read_script(stored_package, filename) except rift.package.cloud_init.CloudInitExtractionError as e: self.instantiation_failed(str(e)) raise VirtualDeploymentUnitRecordError(e) else: - self._log.debug("VDU Instantiation: cloud-init script not provided") + if not self._vnfr._vnfr_msg.cloud_config.key_pair and not self._vnfr._vnfr_msg.cloud_config.user: + self._log.debug("VDU Instantiation: cloud-init script not provided") + return + + self._log.debug("Current cloud init msg is {}".format(cloud_init_msg)) + if not self._vnfr._vnfr_msg.cloud_config.key_pair and not self._vnfr._vnfr_msg.cloud_config.user: + return cloud_init_msg + + cloud_init_dict = {} + if cloud_init_msg: + try: + cloud_init_dict = yaml.load(cloud_init_msg) + except Exception as e: + self._log.exception(e) + self._log.error("Error loading cloud init Yaml file with exception %s", str(e)) + return cloud_init_msg + + self._log.debug("Current cloud init dict is {}".format(cloud_init_dict)) + + for key_pair in self._vnfr._vnfr_msg.cloud_config.key_pair: + if "ssh_authorized_keys" not in cloud_init_dict: + cloud_init_dict["ssh_authorized_keys"] = list() + cloud_init_dict["ssh_authorized_keys"].append(key_pair.key) + + users = list() + for user_entry in self._vnfr._vnfr_msg.cloud_config.user: + if "users" not in cloud_init_dict: + cloud_init_dict["users"] = list() + user = {} + user["name"] = user_entry.name + user["gecos"] = user_entry.user_info + user["sudo"] = "ALL=(ALL) NOPASSWD:ALL" + user["ssh-authorized-keys"] = list() + for ssh_key in user_entry.key_pair: + user["ssh-authorized-keys"].append(ssh_key.key) + cloud_init_dict["users"].append(user) + + cloud_msg = yaml.safe_dump(cloud_init_dict,width=1000,default_flow_style=False) + cloud_init = "#cloud-config\n"+cloud_msg + self._log.debug("Cloud init msg is {}".format(cloud_init)) + return cloud_init def process_openstack_placement_group_construct(self, vm_create_msg_dict): host_aggregates = [] @@ -572,15 +600,19 @@ class VirtualDeploymentUnitRecord(object): if availability_zones: if len(availability_zones) > 1: - self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones) - raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones)) + self._log.error("Can not launch VDU: %s in multiple availability zones. " + + "Requested Zones: %s", self.name, availability_zones) + raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability" + + " zones. Requsted Zones".format(self.name, availability_zones)) else: vm_create_msg_dict['availability_zone'] = availability_zones[0] if server_groups: if len(server_groups) > 1: - self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups) - raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups)) + self._log.error("Can not launch VDU: %s in multiple Server Group. " + + "Requested Groups: %s", self.name, server_groups) + raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple " + + "Server Groups. Requsted Groups".format(self.name, server_groups)) else: vm_create_msg_dict['server_group'] = server_groups[0] @@ -620,11 +652,15 @@ class VirtualDeploymentUnitRecord(object): # Find source file in scripts dir of VNFD self._log.debug("Checking for source config file at %s", source) try: - source_file_str = cloud_init_extractor.read_script(stored_package, source) + try: + source_file_str = cloud_init_extractor.read_script(stored_package, source) + file_item['source'] = source_file_str + except rift.package.package.PackageError as e: + self._log.info("Invalid package with Package descriptor id") + except rift.package.cloud_init.CloudInitExtractionError as e: raise VirtualDeploymentUnitRecordError(e) # Update source file location with file contents - file_item['source'] = source_file_str return @@ -637,6 +673,48 @@ class VirtualDeploymentUnitRecord(object): "volumes", "supplemental_boot_data"] + def make_resmgr_cp_args(intf, cp, vlr): + cp_info = dict(name = cp.name, + virtual_link_id = vlr.network_id, + type_yang = intf.virtual_interface.type_yang) + + if vlr.network_id is None: + raise VlrError("Unresolved virtual link id for vlr id:%s, name:%s", + (vlr.id, vlr.name)) + + if cp.has_field('port_security_enabled'): + cp_info["port_security_enabled"] = cp.port_security_enabled + + try: + if intf.static_ip_address: + cp_info["static_ip_address"] = intf.static_ip_address + except AttributeError as e: + ### This can happen because of model difference between OSM and RIFT. Ignore exception + self._log.debug(str(e)) + + if (intf.virtual_interface.has_field('vpci') and + intf.virtual_interface.vpci is not None): + cp_info["vpci"] = intf.virtual_interface.vpci + + if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')): + cp_info['security_group'] = vlr.ip_profile_params.security_group + + if vlr.has_field('virtual_connection_points'): + virtual_cps = [ vcp for vcp in vlr.virtual_connection_points + if [ True for cp_ref in vcp.associated_cps if cp.name == cp_ref ]] + if virtual_cps: + fields = ['connection_point_id', 'name', 'ip_address', 'mac_address'] + cp_info['virtual_cps'] = [ {k:v for k,v in vcp.as_dict().items() if k in fields} + for vcp in virtual_cps ] + + # Adding Port Sequence Information to cp_info + intf_dict = intf.as_dict() + if "position" in intf_dict: + cp_info["port_order"] = intf.position + + self._log.debug("CP info {}".format(cp_info)) + return cp_info + self._log.debug("Creating params based on VDUD: %s", self._vdud) vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields} @@ -662,41 +740,13 @@ class VirtualDeploymentUnitRecord(object): if self._mgmt_network: vm_create_msg_dict['mgmt_network'] = self._mgmt_network - cp_list = [] + cp_list = list() for intf, cp, vlr in self._ext_intf: - cp_info = { "name": cp.name, - "virtual_link_id": vlr.network_id, - "type_yang": intf.virtual_interface.type_yang } - - if cp.has_field('port_security_enabled'): - cp_info["port_security_enabled"] = cp.port_security_enabled - - if (intf.virtual_interface.has_field('vpci') and - intf.virtual_interface.vpci is not None): - cp_info["vpci"] = intf.virtual_interface.vpci - - if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')): - cp_info['security_group'] = vlr.ip_profile_params.security_group + cp_list.append(make_resmgr_cp_args(intf, cp, vlr)) - cp_list.append(cp_info) - - for intf, cp, vlr in self._int_intf: - if (intf.virtual_interface.has_field('vpci') and - intf.virtual_interface.vpci is not None): - cp_list.append({"name": cp, - "virtual_link_id": vlr.network_id, - "type_yang": intf.virtual_interface.type_yang, - "vpci": intf.virtual_interface.vpci}) - else: - if cp.has_field('port_security_enabled'): - cp_list.append({"name": cp, - "virtual_link_id": vlr.network_id, - "type_yang": intf.virtual_interface.type_yang, - "port_security_enabled": cp.port_security_enabled}) - else: - cp_list.append({"name": cp, - "virtual_link_id": vlr.network_id, - "type_yang": intf.virtual_interface.type_yang}) + for intf, cp_id, vlr in self._int_intf: + cp = self.find_internal_cp_by_cp_id(cp_id) + cp_list.append(make_resmgr_cp_args(intf, cp, vlr.msg())) vm_create_msg_dict["connection_points"] = cp_list @@ -704,13 +754,18 @@ class VirtualDeploymentUnitRecord(object): self.process_placement_groups(vm_create_msg_dict) if 'supplemental_boot_data' in vm_create_msg_dict: - self.process_custom_bootdata(vm_create_msg_dict) + self.process_custom_bootdata(vm_create_msg_dict) - msg = RwResourceMgrYang.VDUEventData() + msg = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData() msg.event_id = self._request_id - msg.cloud_account = self.cloud_account_name + msg.cloud_account = self.datacenter_name + msg.request_info.from_dict(vm_create_msg_dict) + for volume in self._vdud.volumes: + v = msg.request_info.volumes.add() + v.from_dict(volume.as_dict()) + return msg @asyncio.coroutine @@ -734,7 +789,7 @@ class VirtualDeploymentUnitRecord(object): self._rm_regh = None if self._vdur_console_handler is not None: - self._log.error("Deregistering vnfr vdur registration handle") + self._log.debug("Deregistering vnfr vdur console registration handle") self._vdur_console_handler._regh.deregister() self._vdur_console_handler._regh = None @@ -780,71 +835,92 @@ class VirtualDeploymentUnitRecord(object): cp_name) return cp - def find_internal_vlr_by_cp_name(cp_name): - """ Find the VLR corresponding to the connection point name""" - cp = None - - self._log.debug("find_internal_vlr_by_cp_name(%s) called", - cp_name) - - for int_cp in self._vdud.internal_connection_point: - self._log.debug("Checking for int cp %s in internal connection points", - int_cp.id) - if int_cp.id == cp_name: - cp = int_cp - break + def find_internal_vlr_by_cp_id(cp_id): + self._log.debug("find_internal_vlr_by_cp_id(%s) called", + cp_id) - if cp is None: - self._log.debug("Failed to find cp %s in internal connection points", - cp_name) - msg = "Failed to find cp %s in internal connection points" % cp_name - raise VduRecordError(msg) + # Validate the cp + cp = self.find_internal_cp_by_cp_id(cp_id) # return the VLR associated with the connection point - return vnfr.find_vlr_by_cp(cp_name) + return vnfr.find_vlr_by_cp(cp_id) - block = xact.block_create() - self._log.debug("Executing vm request id: %s, action: create", - self._request_id) - - # Resolve the networks associated external interfaces - for ext_intf in self._vdud.external_interface: - self._log.debug("Resolving external interface name [%s], cp[%s]", - ext_intf.name, ext_intf.vnfd_connection_point_ref) - cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref) + def add_external_interface(interface): + # Add an external interface from vdu interface list + cp = find_cp_by_name(interface.external_connection_point_ref) if cp is None: self._log.debug("Failed to find connection point - %s", - ext_intf.vnfd_connection_point_ref) - continue + interface.external_connection_point_ref) + return + self._log.debug("Connection point name [%s], type[%s]", cp.name, cp.type_yang) vlr = vnfr.ext_vlr_by_id(cp.vlr_ref) - etuple = (ext_intf, cp, vlr) + etuple = (interface, cp, vlr) self._ext_intf.append(etuple) self._log.debug("Created external interface tuple : %s", etuple) - # Resolve the networks associated internal interfaces - for intf in self._vdud.internal_interface: - cp_id = intf.vdu_internal_connection_point_ref + @asyncio.coroutine + def add_internal_interface(interface): + # Add an internal interface from vdu interface list + cp_id = interface.internal_connection_point_ref self._log.debug("Resolving internal interface name [%s], cp[%s]", - intf.name, cp_id) - + interface.name, cp_id) + + if cp_id is None: + msg = "The Internal Interface : %s is not mapped to an internal connection point." % (interface.name) + self._log.error(msg) + raise VduRecordError(msg) + try: - vlr = find_internal_vlr_by_cp_name(cp_id) + vlr = find_internal_vlr_by_cp_id(cp_id) + iter = yield from self._dts.query_read(vlr.vlr_path()) + for itr in iter: + vlr._vlr = (yield from itr).result except Exception as e: self._log.debug("Failed to find cp %s in internal VLR list", cp_id) msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e) raise VduRecordError(msg) - ituple = (intf, cp_id, vlr) + ituple = (interface, cp_id, vlr) self._int_intf.append(ituple) self._log.debug("Created internal interface tuple : %s", ituple) + + block = xact.block_create() + + self._log.debug("Executing vm request id: %s, action: create", + self._request_id) + + # Resolve the networks associated with interfaces ( both internal and external) + + for intf in self._vdud.interface: + if intf.type_yang == 'EXTERNAL': + self._log.debug("Resolving external interface name [%s], cp[%s]", + intf.name, intf.external_connection_point_ref) + try: + add_external_interface(intf) + except Exception as e: + msg = "Failed to add external interface %s from vdu interface list, e = %s" % (intf.name, e) + self._log.error(msg) + raise VduRecordError(msg) + elif intf.type_yang == 'INTERNAL': + self._log.debug("Resolving internal interface name [%s], cp[%s]", + intf.name, intf.internal_connection_point_ref) + try: + yield from add_internal_interface(intf) + except Exception as e: + msg = "Failed to add internal interface %s from vdu interface list, e = %s" % (intf.name, e) + self._log.error(msg) + raise VduRecordError(msg) + + + resmgr_path = self.resmgr_path resmgr_msg = self.resmgr_msg(config) @@ -895,17 +971,6 @@ class VirtualDeploymentUnitRecord(object): #self._vm_resp = resp.resource_info return resp.resource_info - - @asyncio.coroutine - def start_component(self): - """ This VDUR is active """ - self._log.debug("Starting component %s for vdud %s vdur %s", - self._vdud.vcs_component_ref, - self._vdud, - self._vdur_id) - yield from self._vnfr.start_component(self._vdud.vcs_component_ref, - self.vm_resp.management_ip) - @property def active(self): """ Is this VDU active """ @@ -928,9 +993,6 @@ class VirtualDeploymentUnitRecord(object): self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id) - if self._vdud.vcs_component_ref is not None: - yield from self.start_component() - self._state = VDURecordState.READY if self._vnfr.all_vdus_active(): @@ -969,6 +1031,10 @@ class VirtualDeploymentUnitRecord(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) try: + #Check if resource orchestrator is not rift so that resource manager tasklet is not invoked + if self._nsr_config.resource_orchestrator is not None: + return + reg_event = asyncio.Event(loop=self._loop) @asyncio.coroutine @@ -1027,18 +1093,23 @@ class VlRecordState(enum.Enum): class InternalVirtualLinkRecord(object): """ Internal Virtual Link record """ - def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name, ip_profile=None): + def __init__(self, dts, log, loop, project, vnfm, + ivld_msg, vnfr_name, datacenter_name, ip_profile=None): self._dts = dts self._log = log self._loop = loop + self._project = project + self._vnfm = vnfm self._ivld_msg = ivld_msg self._vnfr_name = vnfr_name - self._cloud_account_name = cloud_account_name + self._datacenter_name = datacenter_name self._ip_profile = ip_profile self._vlr_req = self.create_vlr() self._vlr = None + self._network_id = None self._state = VlRecordState.INIT + self._state_details = "" @property def vlr_id(self): @@ -1056,11 +1127,32 @@ class InternalVirtualLinkRecord(object): @property def network_id(self): """ Find VLR by id """ - return self._vlr.network_id if self._vlr else None + return self._network_id + + @network_id.setter + def network_id(self, network_id): + """ network id setter""" + self._network_id = network_id + + @property + def active(self): + """ """ + return self._state == VlRecordState.ACTIVE + + @property + def state(self): + """ state for this VLR """ + return self._state + + @property + def state_details(self): + """ state details for this VLR """ + return self._state_details def vlr_path(self): """ VLR path for this VLR instance""" - return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id) + return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]". + format(quoted_key(self.vlr_id))) def create_vlr(self): """ Create the VLR record which will be instantiated """ @@ -1077,7 +1169,7 @@ class InternalVirtualLinkRecord(object): vlr_dict = {"id": str(uuid.uuid4()), "name": self.name, - "cloud_account": self._cloud_account_name, + "datacenter": self._datacenter_name, } if self._ip_profile and self._ip_profile.has_field('ip_profile_params'): @@ -1085,7 +1177,13 @@ class InternalVirtualLinkRecord(object): vlr_dict.update(vld_copy_dict) - vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict) + vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict) + + if self._ivld_msg.has_field('virtual_connection_points'): + for cp in self._ivld_msg.virtual_connection_points: + vcp = vlr.virtual_connection_points.add() + vcp.from_dict(cp.as_dict()) + return vlr @asyncio.coroutine @@ -1098,28 +1196,39 @@ class InternalVirtualLinkRecord(object): self._log.debug("Create VL with xpath %s and vlr %s", self.vlr_path(), self._vlr_req) - with self._dts.transaction(flags=0) as xact: - block = xact.block_create() - block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req) - self._log.debug("Executing VL create path:%s msg:%s", - self.vlr_path(), self._vlr_req) - - res_iter = None - try: - res_iter = yield from block.execute() - except Exception: + try: + with self._dts.transaction(flags=0) as xact: + block = xact.block_create() + block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req) + self._log.debug("Executing VL create path:%s msg:%s", + self.vlr_path(), self._vlr_req) + + self._state = VlRecordState.INSTANTIATION_PENDING + self._state_details = "Oustanding VL create request:%s".format(self.vlr_path()) + res_iter = None + try: + res_iter = yield from block.execute() + except Exception as e: + self._state = VlRecordState.FAILED + self._state_details = str(e) + self._log.exception("Caught exception while instantial VL") + raise + + for ent in res_iter: + res = yield from ent + self._vlr = res.result + + if self._vlr.operational_status == 'failed': + self._log.debug("VL creation failed for vlr id %s", self._vlr.id) self._state = VlRecordState.FAILED - self._log.exception("Caught exception while instantial VL") - raise + self._state_details = self._vlr.operational_status_details + raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id)) - for ent in res_iter: - res = yield from ent - self._vlr = res.result - - if self._vlr.operational_status == 'failed': - self._log.debug("VL creation failed for vlr id %s", self._vlr.id) - self._state = VlRecordState.FAILED - raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id)) + except Exception as e: + self._log.error("Caught exception while instantiating VL:%s:%s, e:%s", + self.vlr_id, self._vlr.name, e) + self._state_details = str(e) + raise self._log.info("Created VL with xpath %s and vlr %s", self.vlr_path(), self._vlr) @@ -1148,13 +1257,12 @@ class InternalVirtualLinkRecord(object): else: yield from instantiate_vlr() - self._state = VlRecordState.ACTIVE def vlr_in_vns(self): """ Is there a VLR record in VNS """ if (self._state == VlRecordState.ACTIVE or - self._state == VlRecordState.INSTANTIATION_PENDING or - self._state == VlRecordState.FAILED): + self._state == VlRecordState.INSTANTIATION_PENDING or + self._state == VlRecordState.FAILED): return True return False @@ -1169,25 +1277,50 @@ class InternalVirtualLinkRecord(object): self._log.debug("Terminating VL with path %s", self.vlr_path()) self._state = VlRecordState.TERMINATE_PENDING + self._state_details = "VL Terminate pending" block = xact.block_create() block.add_query_delete(self.vlr_path()) yield from block.execute(flags=0, now=True) self._state = VlRecordState.TERMINATED + self._state_details = "VL Terminated" self._log.debug("Terminated VL with path %s", self.vlr_path()) + def set_state_from_op_status(self, operational_status, operational_status_details): + """ Set the state of this VL based on operational_status""" + + self._state_details = operational_status_details + + if operational_status == 'running': + self._log.info("VL %s moved to active state", self.vlr_id) + self._state = VlRecordState.ACTIVE + elif operational_status == 'failed': + self._log.info("VL %s moved to failed state", self.vlr_id) + self._state = VlRecordState.FAILED + elif operational_status == 'vl_alloc_pending': + self._log.debug("VL %s is in alloc pending state", self.vlr_id) + self._state = VlRecordState.INSTANTIATION_PENDING + else: + raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status)) + + def msg(self): + """ Get a proto corresponding to this VLR """ + msg = self._vlr + return msg + class VirtualNetworkFunctionRecord(object): """ Virtual Network Function Record """ - def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None): + def __init__(self, dts, log, loop, cluster_name, vnfm, vnfr_msg, + mgmt_network=None, external_ro=False): self._dts = dts self._log = log - self._loop = loop + self._loop = loop### + self._project = vnfm._project self._cluster_name = cluster_name self._vnfr_msg = vnfr_msg self._vnfr_id = vnfr_msg.id self._vnfd_id = vnfr_msg.vnfd.id self._vnfm = vnfm - self._vcs_handler = vcs_handler self._vnfr = vnfr_msg self._mgmt_network = mgmt_network @@ -1195,7 +1328,7 @@ class VirtualNetworkFunctionRecord(object): self._state = VirtualNetworkFunctionRecordState.INIT self._state_failed_reason = None self._ext_vlrs = {} # The list of external virtual links - self._vlrs = [] # The list of internal virtual links + self._vlrs = {} # The list of internal virtual links self._vdus = [] # The list of vdu self._vlr_by_cp = {} self._cprs = [] @@ -1203,10 +1336,20 @@ class VirtualNetworkFunctionRecord(object): self._create_time = int(time.time()) self._vnf_mon = None self._config_status = vnfr_msg.config_status - self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log) + self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log, project=self._project.name) self._rw_vnfd = None self._vnfd_ref_count = 0 + self._ssh_pub_key = None + self._ssh_key_file = None + self._task = None + # Create an asyncio loop to know when the virtual links are ready + self._vls_ready = asyncio.Event(loop=self._loop) + + # Counter for pre-init VNFR State Update DTS Query + self._init = False + self._external_ro = external_ro + def _get_vdur_from_vdu_id(self, vdu_id): self._log.debug("Finding vdur for vdu_id %s", vdu_id) self._log.debug("Searching through vdus: %s", self._vdus) @@ -1220,7 +1363,8 @@ class VirtualNetworkFunctionRecord(object): @property def operational_status(self): """ Operational status of this VNFR """ - op_status_map = {"INIT": "init", + op_status_map = {"PRE_INIT": "pre_init", + "INIT": "init", "VL_INIT_PHASE": "vl_init_phase", "VM_INIT_PHASE": "vm_init_phase", "READY": "running", @@ -1234,7 +1378,20 @@ class VirtualNetworkFunctionRecord(object): @staticmethod def vnfd_xpath(vnfd_id): """ VNFD xpath associated with this VNFR """ - return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id) + return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd[project-vnfd:id={}]". + format(quoted_key(vnfd_id))) + + @property + def external_ro(self): + return self._external_ro + + @property + def task(self): + return self._task + + @task.setter + def task(self, task): + self._task = task @property def vnfd_ref_count(self): @@ -1278,9 +1435,9 @@ class VirtualNetworkFunctionRecord(object): return self._vnfr.name @property - def cloud_account_name(self): + def datacenter_name(self): """ Name of the cloud account this VNFR is instantiated in """ - return self._vnfr.cloud_account + return self._vnfr.datacenter @property def vnfd_id(self): @@ -1302,20 +1459,15 @@ class VirtualNetworkFunctionRecord(object): """ Config agent status for this VNFR """ return self._config_status - def component_by_name(self, component_name): - """ Find a component by name in the inventory list""" - mangled_name = VcsComponent.mangle_name(component_name, - self.vnf_name, - self.vnfd_id) - return self._inventory[mangled_name] - - + @property + def public_key(self): + return self._ssh_pub_key @asyncio.coroutine def get_nsr_config(self): ### Need access to NS instance configuration for runtime resolution. ### This shall be replaced when deployment flavors are implemented - xpath = "C,/nsr:ns-instance-config" + xpath = self._project.add_project("C,/nsr:ns-instance-config") results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) for result in results: @@ -1327,10 +1479,22 @@ class VirtualNetworkFunctionRecord(object): return None @asyncio.coroutine - def start_component(self, component_name, ip_addr): - """ Start a component in the VNFR by name """ - comp = self.component_by_name(component_name) - yield from comp.start(None, None, ip_addr) + def get_nsr_opdata(self): + """ NSR opdata associated with this VNFR """ + xpath = self._project.add_project( + "D,/nsr:ns-instance-opdata/nsr:nsr" \ + "[nsr:ns-instance-config-ref={}]". \ + format(quoted_key(self._vnfr_msg.nsr_id_ref))) + + results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) + + for result in results: + entry = yield from result + nsr_op = entry.result + return nsr_op + + return None + def cp_ip_addr(self, cp_name): """ Get ip address for connection point """ @@ -1365,39 +1529,43 @@ class VirtualNetworkFunctionRecord(object): vnfd_fields = ["short_name", "vendor", "description", "version"] vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields} - mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface() + mgmt_intf = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MgmtInterface() ip_address, port = self.mgmt_intf_info() - if ip_address is not None: + if ip_address: mgmt_intf.ip_address = ip_address if port is not None: mgmt_intf.port = port + if self._ssh_pub_key: + mgmt_intf.ssh_key.public_key = self._ssh_pub_key + mgmt_intf.ssh_key.private_key_file = self._ssh_key_file + vnfr_dict = {"id": self._vnfr_id, "nsr_id_ref": self._vnfr_msg.nsr_id_ref, "name": self.name, "member_vnf_index_ref": self.member_vnf_index, "operational_status": self.operational_status, "operational_status_details": self._state_failed_reason, - "cloud_account": self.cloud_account_name, + "datacenter": self.datacenter_name, "config_status": self._config_status } vnfr_dict.update(vnfd_copy_dict) - vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) - vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict()) + vnfr_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict) + vnfr_msg.vnfd = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict()) vnfr_msg.create_time = self._create_time vnfr_msg.uptime = int(time.time()) - self._create_time vnfr_msg.mgmt_interface = mgmt_intf # Add all the VLRs to VNFR - for vlr in self._vlrs: + for vlr_id, vlr in self._vlrs.items(): ivlr = vnfr_msg.internal_vlr.add() ivlr.vlr_ref = vlr.vlr_id - # Add all the VDURs to VDUR + # Add all the VDUs to VDUR if self._vdus is not None: for vdu in self._vdus: vdur = vnfr_msg.vdur.add() @@ -1407,27 +1575,50 @@ class VirtualNetworkFunctionRecord(object): vnfr_msg.dashboard_url = self.dashboard_url for cpr in self._cprs: - new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict()) + new_cp = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict()) vnfr_msg.connection_point.append(new_cp) if self._vnf_mon is not None: for monp in self._vnf_mon.msg: vnfr_msg.monitoring_param.append( - VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict())) + VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict())) if self._vnfr.vnf_configuration is not None: vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict()) - if (ip_address is not None and - vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None): - vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address for group in self._vnfr_msg.placement_groups_info: - group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo() + group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo() group_info.from_dict(group.as_dict()) vnfr_msg.placement_groups_info.append(group_info) return vnfr_msg + @asyncio.coroutine + def update_config(self, msg, xact): + self._log.debug("VNFM vnf config: {}". + format(msg.vnf_configuration.as_dict())) + self._config_status = msg.config_status + self._vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict( + msg.as_dict()) + self._log.debug("VNFR msg config: {}". + format(self._vnfr.as_dict())) + + yield from self.publish(xact) + + @asyncio.coroutine + def update_vnfr_after_substitution(self, msg, xact): + self._log.debug("Updating VNFR after Input Param Substitution: {}". + format(msg.as_dict())) + self._state = VirtualNetworkFunctionRecordState.INIT + self._vnfd = msg.vnfd + msg.operational_status = 'init' + self._vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict( + msg.as_dict()) + + self._log.debug("VNFR updated: {}". + format(self._vnfr.as_dict())) + yield from self.publish(xact) + @property def dashboard_url(self): ip, cfg_port = self.mgmt_intf_info() @@ -1452,8 +1643,8 @@ class VirtualNetworkFunctionRecord(object): @property def xpath(self): """ path for this VNFR """ - return("D,/vnfr:vnfr-catalog" - "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id)) + return self._project.add_project("D,/vnfr:vnfr-catalog" + "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self.vnfr_id))) @asyncio.coroutine def publish(self, xact): @@ -1486,12 +1677,15 @@ class VirtualNetworkFunctionRecord(object): vlr = InternalVirtualLinkRecord(dts=self._dts, log=self._log, loop=self._loop, + project=self._project, + vnfm=self._vnfm, ivld_msg=ivld_msg, vnfr_name=self.name, - cloud_account_name=self.cloud_account_name, + datacenter_name=self.datacenter_name, ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg) ) - self._vlrs.append(vlr) + self._vlrs[vlr.vlr_id] = vlr + self._vnfm.add_vlr_id_vnfr_map(vlr.vlr_id, self) for int_cp in ivld_msg.internal_connection_point: if int_cp.id_ref in self._vlr_by_cp: @@ -1508,10 +1702,20 @@ class VirtualNetworkFunctionRecord(object): self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s", self.vnfd_id) - for vlr in self._vlrs: + for vlr_id, vlr in self._vlrs.items(): self._log.debug("Instantiating VLR %s", vlr) yield from vlr.instantiate(xact, restart_mode) + # Wait for the VLs to be ready before yielding control out + if self._vlrs: + self._log.debug("VNFR id:%s, name:%s - Waiting for %d VLs to be ready", + self.vnfr_id, self.name, len(self._vlrs)) + yield from self._vls_ready.wait() + else: + self._log.debug("VNFR id:%s, name:%s, No virtual links found", + self.vnfr_id, self.name) + self._vls_ready.set() + def find_vlr_by_cp(self, cp_name): """ Find the VLR associated with the cp name """ return self._vlr_by_cp[cp_name] @@ -1527,7 +1731,7 @@ class VirtualNetworkFunctionRecord(object): for group_info in nsr_config.vnfd_placement_group_maps: if group_info.placement_group_ref == input_group.name and \ group_info.vnfd_id_ref == self.vnfd_id: - group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo() + group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo() group_dict = {k:v for k,v in group_info.as_dict().items() if (k != 'placement_group_ref' and k !='vnfd_id_ref')} @@ -1542,7 +1746,7 @@ class VirtualNetworkFunctionRecord(object): placement_groups = [] ### Step-1: Get VNF level placement groups for group in self._vnfr_msg.placement_groups_info: - #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo() + #group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo() #group_info.from_dict(group.as_dict()) placement_groups.append(group) @@ -1553,10 +1757,11 @@ class VirtualNetworkFunctionRecord(object): group_info = self.resolve_placement_group_cloud_construct(group, nsr_config) if group_info is None: - self._log.info("Could not resolve cloud-construct for placement group: %s", group.name) - ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name)) + self._log.info("Could not resolve cloud-construct for " + + "placement group: %s", group.name) else: - self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)", + self._log.info("Successfully resolved cloud construct for " + + "placement group: %s for VDU: %s in VNF: %s (Member Index: %s)", str(group_info), vdu.name, self.vnf_name, @@ -1565,6 +1770,17 @@ class VirtualNetworkFunctionRecord(object): return placement_groups + @asyncio.coroutine + def substitute_vdu_input_parameters(self, vdu): + result = vdu + for vdu_vnfr in self.vnfd.vdu: + if vdu["id"] == vdu_vnfr.id: + result = vdu_vnfr.as_dict() + break + + return RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd_Vdu.from_dict(result) + + @asyncio.coroutine def vdu_cloud_init_instantiation(self): [vdu.vdud_cloud_init for vdu in self._vdus] @@ -1610,16 +1826,20 @@ class VirtualNetworkFunctionRecord(object): [ group.name for group in placement_groups], vdur_id) + # Update VDU Info from VNFR (It contains the input parameter for VDUs as well) + vdu_updated = yield from self.substitute_vdu_input_parameters(vdu.as_dict()) + vdur = VirtualDeploymentUnitRecord( dts=self._dts, log=self._log, loop=self._loop, - vdud=vdu, + project = self._project, + vdud=vdu_updated, vnfr=vnfr, nsr_config=nsr_config, mgmt_intf=self.has_mgmt_interface(vdu), mgmt_network=self._mgmt_network, - cloud_account_name=self.cloud_account_name, + datacenter_name=self.datacenter_name, vnfd_package_store=self._vnfd_package_store, vdur_id=vdur_id, placement_groups = placement_groups, @@ -1688,6 +1908,7 @@ class VirtualNetworkFunctionRecord(object): VirtualDeploymentUnitRecordError is raised. """ + for dependency in dependencies[vdu.vdu_id]: self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id)) @@ -1704,8 +1925,9 @@ class VirtualNetworkFunctionRecord(object): # Substitute any variables contained in the cloud config script config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else "" - + parts = re.split("\{\{ ([^\}]+) \}\}", config) + if len(parts) > 1: # Extract the variable names @@ -1715,6 +1937,7 @@ class VirtualNetworkFunctionRecord(object): # Iterate of the variables and substitute values from the # datastore. + for variable in variables: # Handle a reference to a VDU by ID @@ -1733,6 +1956,19 @@ class VirtualNetworkFunctionRecord(object): config = config.replace("{{ %s }}" % variable, value) continue + # Handle a reference to Cloud Init Variables: Start with 'CI' + if variable.startswith('CI'): + custom_meta_data = datastore.get('vdu[{}]'.format(vdu.vdu_id) + ".custom_meta_data") + try: + for meta_data in custom_meta_data: + if meta_data.destination == 'CLOUD_INIT': + if meta_data.name == variable: + config = config.replace("{{ %s }}" % variable, meta_data.value) + except Exception: + raise ValueError("Unrecognized Cloud Init Variable") + + continue + # Handle unrecognized variables msg = 'unrecognized cloud-config variable: {}' raise ValueError(msg.format(variable)) @@ -1761,42 +1997,13 @@ class VirtualNetworkFunctionRecord(object): def vlr_xpath(self, vlr_id): """ vlr xpath """ - return( - "D,/vlr:vlr-catalog/" - "vlr:vlr[vlr:id = '{}']".format(vlr_id)) + return self._project.add_project("D,/vlr:vlr-catalog/" + "vlr:vlr[vlr:id={}]".format(quoted_key(vlr_id))) def ext_vlr_by_id(self, vlr_id): """ find ext vlr by id """ return self._ext_vlrs[vlr_id] - @asyncio.coroutine - def publish_inventory(self, xact): - """ Publish the inventory associated with this VNF """ - self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id) - - for component in self._rw_vnfd.component: - self._log.debug("Creating inventory component %s", component) - mangled_name = VcsComponent.mangle_name(component.component_name, - self.vnf_name, - self.vnfd_id - ) - comp = VcsComponent(dts=self._dts, - log=self._log, - loop=self._loop, - cluster_name=self._cluster_name, - vcs_handler=self._vcs_handler, - component=component, - mangled_name=mangled_name, - ) - if comp.name in self._inventory: - self._log.debug("Duplicate entries in inventory %s for vnfr %s", - component, self._vnfd_id) - return - self._log.debug("Adding component %s for vnrf %s", - comp.name, self._vnfr_id) - self._inventory[comp.name] = comp - yield from comp.publish(xact) - def all_vdus_active(self): """ Are all VDUS in this VNFR active? """ for vdu in self._vdus: @@ -1830,7 +2037,7 @@ class VirtualNetworkFunctionRecord(object): # Update the VNFR with the changed status yield from self.publish(None) - def update_cp(self, cp_name, ip_address, mac_addr, cp_id): + def update_cp(self, cp_name, ip_address, mac_addr, cp_id, virtual_cps = list()): """Updated the connection point with ip address""" for cp in self._cprs: if cp.name == cp_name: @@ -1839,6 +2046,8 @@ class VirtualNetworkFunctionRecord(object): cp.ip_address = ip_address cp.mac_address = mac_addr cp.connection_point_id = cp_id + if virtual_cps: + cp.virtual_cps = [VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint_VirtualCps.from_dict(v) for v in virtual_cps] return err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id) @@ -1852,9 +2061,15 @@ class VirtualNetworkFunctionRecord(object): @asyncio.coroutine def instantiate(self, xact, restart_mode=False): """ instantiate this VNF """ + self._log.info("Instantiate VNF {}: {}".format(self._vnfr_id, self._state)) self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE) self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id) + nsr_op = yield from self.get_nsr_opdata() + if nsr_op: + self._ssh_key_file = nsr_op.ssh_key_generated.private_key_file + self._ssh_pub_key = nsr_op.ssh_key_generated.public_key + @asyncio.coroutine def fetch_vlrs(): """ Fetch VLRs """ @@ -1863,11 +2078,11 @@ class VirtualNetworkFunctionRecord(object): def cpr_from_cp(cp): """ Creates a record level connection point from the desciptor cp""" - cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"] + cp_fields = ["name", "image", "vm-flavor", "port_security_enabled", "type_yang"] cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields} cpr_dict = {} cpr_dict.update(cp_copy_dict) - return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict) + return VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict) self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s", self._vnfr_id, self._vnfr.connection_point) @@ -1879,7 +2094,7 @@ class VirtualNetworkFunctionRecord(object): vlr_path = self.vlr_xpath(cp.vlr_ref) self._log.debug("Fetching VLR with path = %s", vlr_path) - res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref), + res_iter = yield from self._dts.query_read(vlr_path, rwdts.XactFlag.MERGE) for i in res_iter: r = yield from i @@ -1897,16 +2112,12 @@ class VirtualNetworkFunctionRecord(object): self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id) yield from fetch_vlrs() - # Publish inventory - self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id) - yield from self.publish_inventory(xact) - - # Publish inventory + # Publish VLs self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id) yield from self.create_vls() # publish the VNFR - self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id) + self._log.debug("Publish VNFR {}: {}".format(self._vnfr_id, self._state)) yield from self.publish(xact) @@ -1919,6 +2130,12 @@ class VirtualNetworkFunctionRecord(object): yield from self.instantiation_failed(str(e)) return + vl_state, failed_vl = self.vl_instantiation_state() + if vl_state == VlRecordState.FAILED: + self._log.error("VL Instantiation failed for one or more of the internal virtual links, vl:%s",failed_vl) + yield from self.instantiation_failed(failed_vl.state_details) + return + self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE) # instantiate VDUs @@ -1933,12 +2150,13 @@ class VirtualNetworkFunctionRecord(object): yield from self.publish(xact) # publish the VNFR - self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id) + self._log.debug("VNFR {}: Publish VNFR with state {}". + format(self._vnfr_id, self._state)) yield from self.publish(xact) # instantiate VDUs # ToDo: Check if this should be prevented during restart - self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id) + self._log.debug("Instantiate VDUs {}: {}".format(self._vnfr_id, self._state)) _ = self._loop.create_task(self.instantiate_vdus(xact, self)) # publish the VNFR @@ -1947,14 +2165,14 @@ class VirtualNetworkFunctionRecord(object): self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id) - # create task updating uptime for this vnfr - self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id) - self._loop.create_task(self.vnfr_uptime_update(xact)) - @asyncio.coroutine def terminate(self, xact): """ Terminate this virtual network function """ + if self._task: + self._log.debug("Canceling scheduled tasks for VNFR %s", self._vnfr_id) + self._task.cancel() + self._log.debug("Terminatng VNF id %s", self.vnfr_id) self.set_state(VirtualNetworkFunctionRecordState.TERMINATE) @@ -1968,7 +2186,8 @@ class VirtualNetworkFunctionRecord(object): @asyncio.coroutine def terminate_vls(): """ Terminate VLs in this VNF """ - for vl in self._vlrs: + for vlr_id, vl in self._vlrs.items(): + self._vnfm.remove_vlr_id_vnfr_map(vlr_id) yield from vl.terminate(xact) @asyncio.coroutine @@ -1988,23 +2207,83 @@ class VirtualNetworkFunctionRecord(object): self._log.debug("Terminated VNF id %s", self.vnfr_id) self.set_state(VirtualNetworkFunctionRecordState.TERMINATED) - @asyncio.coroutine - def vnfr_uptime_update(self, xact): - while True: - # Return when vnfr state is FAILED or TERMINATED etc - if self._state not in [VirtualNetworkFunctionRecordState.INIT, - VirtualNetworkFunctionRecordState.VL_INIT_PHASE, - VirtualNetworkFunctionRecordState.VM_INIT_PHASE, - VirtualNetworkFunctionRecordState.READY]: - return - yield from self.publish(xact) - yield from asyncio.sleep(2, loop=self._loop) + # Unref the VNFD + self.vnfd_unref() + + def vl_instantiation_state(self): + """ Get the state of VL instantiation of this VNF """ + failed_vl = None + for vl_id, vlr in self._vlrs.items(): + if vlr.state == VlRecordState.ACTIVE: + continue + elif vlr.state == VlRecordState.FAILED: + failed_vl = vlr + return VlRecordState.FAILED, failed_vl + elif vlr.state == VlRecordState.INSTANTIATION_PENDING: + failed_vl = vlr, failed_vl + return VlRecordState.INSTANTIATION_PENDING, failed_vl + else: + self._log.debug("vlr %s still in state %s", vlr, vlr.state) + raise VlRecordError("Invalid state %s", vlr.state) + return VlRecordState.ACTIVE, failed_vl + + def vl_instantiation_successful(self): + """ Mark that all VLs in this VNF are active """ + if self._vls_ready.is_set(): + self._log.debug("VNFR id %s, vls_ready is already set", self.id) + + vl_state, failed_vl = self.vl_instantiation_state() + + if vl_state == VlRecordState.ACTIVE: + self._log.info("VNFR id:%s name:%s has all Virtual Links in active state, Ready to orchestrate VDUs", + self.vnfr_id, self.name) + self._vls_ready.set() + + elif vl_state == VlRecordState.FAILED: + self._log.error("VNFR id:%s name:%s One of the Virtual Links failed to reach active state.Failed to orchestrate VNF", + self.vnfr_id, self.name) + self.instantiation_failed("VNFR id %s: failed since VL %s did not come up".format(self.vnfr_id, failed_vl.name)) + self._vls_ready.set() + + def find_vlr(self, vlr_id): + """ Find VLR matching the passed VLR id """ + if vlr_id in self._vlrs: + return self._vlrs[vlr_id] + return None + + def vlr_event(self, vlr, action): + self._log.debug("Received VLR %s with action:%s", vlr, action) + + vlr_local = self.find_vlr(vlr.id) + if vlr_local is None: + self._log.error("VLR %s:%s received for unknown id, state:%s ignoring event", + vlr.id, vlr.name, vlr.state) + return + + if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE: + if vlr.operational_status == 'running': + vlr_local.set_state_from_op_status(vlr.operational_status, vlr.operational_status_details) + self._log.info("VLR %s:%s moving to active state", + vlr.id, vlr.name) + elif vlr.operational_status == 'failed': + vlr_local.set_state_from_op_status(vlr.operational_status, vlr.operational_status_details) + self._log.info("VLR %s:%s moving to failed state", + vlr.id, vlr.name) + else: + self._log.warning("VLR %s:%s received state:%s", + vlr.id, vlr.name, vlr.operational_status) + + if vlr.has_field('network_id'): + vlr_local.network_id = vlr.network_id + + # Check if vl instantiation successful for this VNFR + self.vl_instantiation_successful() class VnfdDtsHandler(object): """ DTS handler for VNFD config changes """ - XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd" + XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd" def __init__(self, dts, log, loop, vnfm): self._dts = dts @@ -2012,28 +2291,56 @@ class VnfdDtsHandler(object): self._loop = loop self._vnfm = vnfm self._regh = None + self._reg_ready = 0 @asyncio.coroutine def regh(self): """ DTS registration handle """ return self._regh + def deregister(self): + '''De-register from DTS''' + self._log.debug("De-register VNFD DTS handler for project {}". + format(self._vnfm._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + @asyncio.coroutine def register(self): """ Register for VNFD configuration""" + @asyncio.coroutine def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)", xact, action, scratch) is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL + # Create/Update a VNFD record + if self._regh: + for cfg in self._regh.get_xact_elements(xact): + # Only interested in those VNFD cfgs whose ID was received in prepare callback + if cfg.id in scratch.get('vnfds', []) or is_recovery: + self._vnfm.update_vnfd(cfg) + else: + self._log.warning("Reg handle none for {} in project {}". + format(self.__class__, self._vnfm._project)) + + scratch.pop('vnfds', None) + + if is_recovery: + #yield from self._vnfm.vnfr_handler.register() + #yield from self._vnfm.vnfr_ref_handler.register() + self._reg_ready = 1 @asyncio.coroutine def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): """ on prepare callback """ - self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)", - ks_path.to_xpath(RwVnfmYang.get_schema()), msg) + xpath = ks_path.to_xpath(RwVnfmYang.get_schema()) + self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)", + xpath, + xact_info.query_action, msg) fref = ProtobufC.FieldReference.alloc() fref.goto_whole_message(msg.to_pbcm()) @@ -2043,73 +2350,43 @@ class VnfdDtsHandler(object): self._log.debug("Deleting VNFD with id %s", msg.id) if self._vnfm.vnfd_in_use(msg.id): self._log.debug("Cannot delete VNFD in use - %s", msg) - err = "Cannot delete a VNFD in use - %s" % msg - raise VirtualNetworkFunctionDescriptorRefCountExists(err) + err_msg = "Cannot delete a VNFD in use - %s" % msg + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, xpath, err_msg) + xact_info.respond_xpath(rwdts.XactRspCode.NACK, xpath) + return # Delete a VNFD record yield from self._vnfm.delete_vnfd(msg.id) - xact_info.respond_xpath(rwdts.XactRspCode.ACK) + try: + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + except rift.tasklets.dts.ResponseError as e: + self._log.warning( + "VnfdDtsHandler in project {} with path {} for action {} failed: {}". + format(self._vnfm._project, xpath, xact_info.query_action, e)) + + xpath = self._vnfm._project.add_project(VnfdDtsHandler.XPATH) + self._log.debug("Registering for VNFD config using xpath: {}". + format(xpath)) - self._log.debug( - "Registering for VNFD config using xpath: %s", - VnfdDtsHandler.XPATH, - ) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: self._regh = acg.register( - xpath=VnfdDtsHandler.XPATH, + xpath=xpath, flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY, on_prepare=on_prepare) - -class VcsComponentDtsHandler(object): - """ Vcs Component DTS handler """ - XPATH = ("D,/rw-manifest:manifest" + - "/rw-manifest:operational-inventory" + - "/rw-manifest:component") - - def __init__(self, dts, log, loop, vnfm): - self._dts = dts - self._log = log - self._loop = loop - self._regh = None - self._vnfm = vnfm - - @property - def regh(self): - """ DTS registration handle """ - return self._regh - - @asyncio.coroutine - def register(self): - """ Registers VCS component dts publisher registration""" - self._log.debug("VCS Comp publisher DTS handler registering path %s", - VcsComponentDtsHandler.XPATH) - - hdl = rift.tasklets.DTS.RegistrationHandler() - handlers = rift.tasklets.Group.Handler() - with self._dts.group_create(handler=handlers) as group: - self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH, - handler=hdl, - flags=(rwdts.Flag.PUBLISHER | - rwdts.Flag.NO_PREP_READ | - rwdts.Flag.DATASTORE),) - - @asyncio.coroutine - def publish(self, xact, path, msg): - """ Publishes the VCS component """ - self._log.debug("Publishing the VcsComponent xact = %s, %s:%s", - xact, path, msg) - self.regh.create_element(path, msg) - self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s", - VcsComponentDtsHandler.XPATH, xact, path, msg) - class VnfrConsoleOperdataDtsHandler(object): - """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS""" + """ + Registers 'D,/rw-project:project/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' + and handles CRUD from DTS + """ + @property def vnfr_vdu_console_xpath(self): """ path for resource-mgr""" - return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id)) + return self._project.add_project( + "D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id={}]".format(quoted_key(self._vnfr_id)) + + "/rw-vnfr:vdur[vnfr:id={}]".format(quoted_key(self._vdur_id))) def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id): self._dts = dts @@ -2122,6 +2399,16 @@ class VnfrConsoleOperdataDtsHandler(object): self._vdur_id = vdur_id self._vdu_id = vdu_id + self._project = vnfm._project + + def deregister(self): + '''De-register from DTS''' + self._log.debug("De-register VNFR console DTS handler for project {}". + format(self._project)) + if self._regh: + self._regh.deregister() + self._regh = None + @asyncio.coroutine def register(self): """ Register for VNFR VDU Operational Data read from dts """ @@ -2136,7 +2423,7 @@ class VnfrConsoleOperdataDtsHandler(object): ) if action == rwdts.QueryAction.READ: - schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema() + schema = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur.schema() path_entry = schema.keyspec_to_entry(ks_path) self._log.debug("VDU Opdata path is {}".format(path_entry.key00.id)) try: @@ -2153,7 +2440,7 @@ class VnfrConsoleOperdataDtsHandler(object): return with self._dts.transaction() as new_xact: resp = yield from vdur.read_resource(new_xact) - vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur() + vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur() vdur_console.id = self._vdur_id if resp.console_url: vdur_console.console_url = resp.console_url @@ -2162,13 +2449,13 @@ class VnfrConsoleOperdataDtsHandler(object): self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console)) except Exception: self._log.exception("Caught exception while reading VDU %s", self._vdu_id) - vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur() + vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur() vdur_console.id = self._vdur_id vdur_console.console_url = 'none' xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK, - xpath=self.vnfr_vdu_console_xpath, - msg=vdur_console) + xpath=self.vnfr_vdu_console_xpath, + msg=vdur_console) else: #raise VnfRecordError("Not supported operation %s" % action) self._log.error("Not supported operation %s" % action) @@ -2187,7 +2474,7 @@ class VnfrConsoleOperdataDtsHandler(object): class VnfrDtsHandler(object): - """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS""" + """ registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS""" XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr" def __init__(self, dts, log, loop, vnfm): @@ -2197,6 +2484,7 @@ class VnfrDtsHandler(object): self._vnfm = vnfm self._regh = None + self._project = vnfm._project @property def regh(self): @@ -2208,17 +2496,17 @@ class VnfrDtsHandler(object): """ Return VNF manager instance """ return self._vnfm + def deregister(self): + '''De-register from DTS''' + self._log.debug("De-register VNFR DTS handler for project {}". + format(self._project)) + if self._regh: + self._regh.deregister() + self._regh = None + @asyncio.coroutine def register(self): """ Register for vnfr create/update/delete/read requests from dts """ - def on_commit(xact_info): - """ The transaction has been committed """ - self._log.debug("Got vnfr commit (xact_info: %s)", xact_info) - return rwdts.MemberRspCode.ACTION_OK - - def on_abort(*args): - """ Abort callback """ - self._log.debug("VNF transaction got aborted") @asyncio.coroutine def on_event(dts, g_reg, xact, xact_event, scratch_data): @@ -2234,13 +2522,22 @@ class VnfrDtsHandler(object): yield from vnfr.instantiate(None, restart_mode=True) + self._log.debug("Got on_event in vnfm: {}".format(xact_event)) + if xact_event == rwdts.MemberEvent.INSTALL: curr_cfg = self.regh.elements for cfg in curr_cfg: - vnfr = self.vnfm.create_vnfr(cfg) - self._loop.create_task(instantiate_realloc_vnfr(vnfr)) + try: + vnfr = self.vnfm.create_vnfr(cfg, restart_mode = True) + if vnfr is None: + self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(cfg.id)) + else: + self._log.debug("Creating VNFR {}".format(vnfr.vnfr_id)) + except Exception as e: + self._log.exception(e) + raise e - self._log.debug("Got on_event in vnfm") + self._loop.create_task(instantiate_realloc_vnfr(vnfr)) return rwdts.MemberRspCode.ACTION_OK @@ -2252,62 +2549,125 @@ class VnfrDtsHandler(object): xact_info, action, msg ) + @asyncio.coroutine + def create_vnf(vnfr): + + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + + if msg.operational_status == 'pre_init': + vnfr.set_state(VirtualNetworkFunctionRecordState.PRE_INIT) + yield from vnfr.publish(None) + + if vnfr.external_ro: + return + + if msg.operational_status == 'init': + vnfr._init = True + def on_instantiate_done(fut): + # If the do_instantiate fails, then publish NSR with failed result + e = fut.exception() + if e is not None: + import traceback, sys + print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True) + self._log.exception("VNFR instantiation failed for VNFR id %s: %s", vnfr.vnfr_id, str(e)) + self._loop.create_task(vnfr.instantiation_failed(failed_reason=str(e))) + + try: + # RIFT-9105: Unable to add a READ query under an existing transaction + # xact = xact_info.xact + assert vnfr.task is None + vnfr.task = self._loop.create_task(vnfr.instantiate(None)) + vnfr.task.add_done_callback(on_instantiate_done) + + + except Exception as e: + self._log.exception(e) + self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id) + vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED) + yield from vnfr.publish(None) + + return + if action == rwdts.QueryAction.CREATE: if not msg.has_field("vnfd"): err = "Vnfd not provided" self._log.error(err) raise VnfRecordError(err) - vnfr = self.vnfm.create_vnfr(msg) - try: - # RIFT-9105: Unable to add a READ query under an existing transaction - # xact = xact_info.xact - yield from vnfr.instantiate(None) - except Exception as e: - self._log.exception(e) - self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id) - vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED) - yield from vnfr.publish(None) + if vnfr is None: + self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg.id)) + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + else: + yield from create_vnf(vnfr) + return + elif action == rwdts.QueryAction.DELETE: - schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema() + schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema() path_entry = schema.keyspec_to_entry(ks_path) vnfr = self._vnfm.get_vnfr(path_entry.key00.id) if vnfr is None: - self._log.debug("VNFR id %s not found for delete", path_entry.key00.id) - raise VirtualNetworkFunctionRecordNotFound( - "VNFR id %s", path_entry.key00.id) + self._log.error("VNFR id %s not found for delete", path_entry.key00.id) + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + return + # Preventing exception here if VNFR id is not found. This means delete is + # invoked before Creation. + # raise VirtualNetworkFunctionRecordNotFound( + # "VNFR id %s", path_entry.key00.id) try: - yield from vnfr.terminate(xact_info.xact) - # Unref the VNFD - vnfr.vnfd_unref() + if not vnfr.external_ro: + yield from vnfr.terminate(xact_info.xact) yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr) except Exception as e: self._log.exception(e) self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id) elif action == rwdts.QueryAction.UPDATE: - schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema() + schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema() path_entry = schema.keyspec_to_entry(ks_path) vnfr = None try: vnfr = self._vnfm.get_vnfr(path_entry.key00.id) + + if vnfr is None: + # This means one of two things : The VNFR has been deleted or its a Launchpad restart. + if msg.id in self._vnfm._deleted_vnfrs: + # VNFR is deleted. + self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg.id)) + return + + self._log.debug("Launchpad Restart - Recreating VNFR - %s", msg.id) + vnfr = self.vnfm.create_vnfr(msg) + if vnfr is None: + self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg.id)) + else: + yield from create_vnf(vnfr) + + return + except Exception as e: - self._log.debug("No vnfr found with id %s", path_entry.key00.id) + self._log.error("Exception in VNFR Update : %s", str(e)) xact_info.respond_xpath(rwdts.XactRspCode.NA) return - if vnfr is None: - self._log.debug("VNFR id %s not found for update", path_entry.key00.id) - xact_info.respond_xpath(rwdts.XactRspCode.NA) + if vnfr.external_ro: + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + return + + if (msg.operational_status == 'pre_init' and not vnfr._init): + # Creating VNFR INSTANTIATION TASK + self._log.debug("VNFR {} update after substitution {} (operational_status {})". + format(vnfr.name, msg.vnfd, msg.operational_status)) + yield from vnfr.update_vnfr_after_substitution(msg, xact_info) + yield from create_vnf(vnfr) return - self._log.debug("VNFR {} update config status {} (current {})". - format(vnfr.name, msg.config_status, vnfr.config_status)) - # Update the config status and publish - vnfr._config_status = msg.config_status - yield from vnfr.publish(None) + else: + self._log.debug("VNFR {} update config status {} (current {})". + format(vnfr.name, msg.config_status, vnfr.config_status)) + # Update the config and publish + yield from vnfr.update_config(msg, xact_info) else: raise NotImplementedError( @@ -2316,25 +2676,26 @@ class VnfrDtsHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) - self._log.debug("Registering for VNFR using xpath: %s", - VnfrDtsHandler.XPATH,) + xpath = self._project.add_project(VnfrDtsHandler.XPATH) + self._log.debug("Registering for VNFR using xpath: {}". + format(xpath)) - hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit, - on_prepare=on_prepare,) + hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) handlers = rift.tasklets.Group.Handler(on_event=on_event,) with self._dts.group_create(handler=handlers) as group: - self._regh = group.register(xpath=VnfrDtsHandler.XPATH, + self._regh = group.register(xpath=xpath, handler=hdl, flags=(rwdts.Flag.PUBLISHER | + rwdts.Flag.SHARED | rwdts.Flag.NO_PREP_READ | - rwdts.Flag.CACHE | rwdts.Flag.DATASTORE),) @asyncio.coroutine - def create(self, xact, path, msg): + def create(self, xact, xpath, msg): """ Create a VNFR record in DTS with path and message """ + path = self._project.add_project(xpath) self._log.debug("Creating VNFR xact = %s, %s:%s", xact, path, msg) @@ -2343,21 +2704,23 @@ class VnfrDtsHandler(object): xact, path, msg) @asyncio.coroutine - def update(self, xact, path, msg): + def update(self, xact, xpath, msg, flags=rwdts.XactFlag.REPLACE): """ Update a VNFR record in DTS with path and message """ + path = self._project.add_project(xpath) self._log.debug("Updating VNFR xact = %s, %s:%s", xact, path, msg) - self.regh.update_element(path, msg) + self.regh.update_element(path, msg, flags) self._log.debug("Updated VNFR xact = %s, %s:%s", xact, path, msg) @asyncio.coroutine - def delete(self, xact, path): + def delete(self, xact, xpath): """ Delete a VNFR record in DTS with path and message """ + path = self._project.add_project(xpath) self._log.debug("Deleting VNFR xact = %s, %s", xact, path) self.regh.delete_element(path) self._log.debug("Deleted VNFR xact = %s, %s", xact, path) @@ -2385,6 +2748,14 @@ class VnfdRefCountDtsHandler(object): """ Return the NS manager instance """ return self._vnfm + def deregister(self): + '''De-register from DTS''' + self._log.debug("De-register VNFD Ref DTS handler for project {}". + format(self._vnfm._project)) + if self._regh: + self._regh.deregister() + self._regh = None + @asyncio.coroutine def register(self): """ Register for VNFD ref count read from dts """ @@ -2399,7 +2770,7 @@ class VnfdRefCountDtsHandler(object): ) if action == rwdts.QueryAction.READ: - schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema() + schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount.schema() path_entry = schema.keyspec_to_entry(ks_path) vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref) for xpath, msg in vnfd_list: @@ -2414,7 +2785,8 @@ class VnfdRefCountDtsHandler(object): hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH, + self._regh = group.register(xpath=self._vnfm._project.add_project( + VnfdRefCountDtsHandler.XPATH), handler=hdl, flags=rwdts.Flag.PUBLISHER, ) @@ -2463,7 +2835,8 @@ class VdurDatastore(object): set_if_not_none('mgmt.ip', vdur.vm_management_ip) # The below can be used for hostname set_if_not_none('vdur_name', vdur.unique_short_name) - + set_if_not_none('custom_meta_data', vdur._vdud.supplemental_boot_data.custom_meta_data) + def update(self, vdur): """Update the VDUR information in the datastore @@ -2493,6 +2866,7 @@ class VdurDatastore(object): set_or_delete('mgmt.ip', vdur.vm_management_ip) # The below can be used for hostname set_or_delete('vdur_name', vdur.unique_short_name) + set_or_delete('custom_meta_data', vdur._vdud.supplemental_boot_data.custom_meta_data) def remove(self, vdur_id): """Remove all of the data associated with specified VDUR @@ -2532,6 +2906,7 @@ class VdurDatastore(object): The requested data or None """ + result = self._pattern.match(expr) if result is None: raise ValueError('data expression not recognized ({})'.format(expr)) @@ -2546,25 +2921,36 @@ class VdurDatastore(object): class VnfManager(object): """ The virtual network function manager class """ - def __init__(self, dts, log, loop, cluster_name): + def __init__(self, dts, log, loop, project, cluster_name): self._dts = dts self._log = log self._loop = loop + self._project = project self._cluster_name = cluster_name - self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self) - self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self) + # This list maintains a list of all the deleted vnfrs' ids. This is done to be able to determine + # if the vnfr is not found because of restart or simply because it was deleted. In the first case we + # recreate the vnfr while in the latter we do not. + self._deleted_vnfrs = [] + + self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self) + self._vnfd_handler = VnfdDtsHandler(dts, log, loop, self) self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self) - self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr) + self._nsr_handler = mano_dts.NsInstanceConfigSubscriber( + log, dts, loop, project, callback=self.handle_nsr) + self._vlr_handler = subscriber.VlrSubscriberDtsHandler(log, dts, loop, project, + callback=self.vlr_event) - self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self), + self._dts_handlers = [self._vnfd_handler, self._vnfr_handler, - self._vcs_handler, self._vnfr_ref_handler, - self._nsr_handler] + self._nsr_handler, + self._vlr_handler + ] self._vnfrs = {} self._vnfds_to_vnfr = {} self._nsrs = {} + self._vnfr_for_vlr = {} @property def vnfr_handler(self): @@ -2572,9 +2958,9 @@ class VnfManager(object): return self._vnfr_handler @property - def vcs_handler(self): - """ VCS dts handler """ - return self._vcs_handler + def vnfr_ref_handler(self): + """ VNFR dts handler """ + return self._vnfr_ref_handler @asyncio.coroutine def register(self): @@ -2582,6 +2968,11 @@ class VnfManager(object): for hdl in self._dts_handlers: yield from hdl.register() + def deregister(self): + self._log.debug("De-register VNFM project {}".format(self._project.name)) + for hdl in self._dts_handlers: + hdl.deregister() + @asyncio.coroutine def run(self): """ Run this VNFM instance """ @@ -2589,19 +2980,48 @@ class VnfManager(object): yield from self.register() def handle_nsr(self, nsr, action): - if action in [rwdts.QueryAction.CREATE]: + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: self._nsrs[nsr.id] = nsr elif action == rwdts.QueryAction.DELETE: if nsr.id in self._nsrs: del self._nsrs[nsr.id] - def get_linked_mgmt_network(self, vnfr): + def get_nsr_config(self, nsr_id): + """ + Gets the NSR config from the DTS cache. + Called in recovery mode only. + """ + if nsr_id in self._nsrs: + return self._nsrs[nsr_id] + + if len(self._nsrs): + self._log.error("VNFR with id {} not found".format(nsr_id)) + return None + + curr_cfgs = list(self._nsr_handler.reg.elements) + key_map = { getattr(cfg, self._nsr_handler.key_name()): cfg for cfg in curr_cfgs } + curr_cfgs = [key_map[key] for key in key_map] + + for cfg in curr_cfgs: + self._nsrs[cfg.id] = cfg + + if nsr_id in self._nsrs: + return self._nsrs[nsr_id] + + self._log.error("VNFR with id {} not found in DTS cache".format(nsr_id)) + return None + + + def get_linked_mgmt_network(self, vnfr, restart_mode=False): """For the given VNFR get the related mgmt network from the NSD, if available. """ vnfd_id = vnfr.vnfd.id nsr_id = vnfr.nsr_id_ref + if restart_mode: + self._nsrs[nsr_id] = self.get_nsr_config(vnfr.nsr_id_ref) + # for the given related VNFR, get the corresponding NSR-config nsr_obj = None try: @@ -2613,7 +3033,13 @@ class VnfManager(object): # network for vld in nsr_obj.nsd.vld: if vld.mgmt_network: - return vld.name + for vnfd in vld.vnfd_connection_point_ref: + if vnfd.vnfd_id_ref == vnfd_id: + if vld.vim_network_name is not None: + mgmt_net = vld.vim_network_name + else: + mgmt_net = self._project.name + "." + nsr_obj.name + "." + vld.name + return mgmt_net return None @@ -2621,11 +3047,19 @@ class VnfManager(object): """ get VNFR by vnfr id """ if vnfr_id not in self._vnfrs: - raise VnfRecordError("VNFR id %s not found", vnfr_id) + self._log.error("VNFR id {} not found".format(vnfr_id)) + return None + # Returning None to prevent exception here. The caller raises the exception. + # raise VnfRecordError("VNFR id %s not found", vnfr_id) return self._vnfrs[vnfr_id] - def create_vnfr(self, vnfr): + def create_vnfr(self, vnfr, restart_mode=False): + # Check if NSR is present. This is a situation where the NS has been deleted before + # VNFR Create starts. + if vnfr.nsr_id_ref not in self._nsrs: + return None + """ Create a VNFR instance """ if vnfr.id in self._vnfrs: msg = "Vnfr id %s already exists" % vnfr.id @@ -2636,11 +3070,24 @@ class VnfManager(object): vnfr.id, vnfr.vnfd.id) - mgmt_network = self.get_linked_mgmt_network(vnfr) + try: + mgmt_network = self.get_linked_mgmt_network(vnfr, restart_mode) + except Exception as e: + self._log.exception(e) + raise e + + # Identify if we are using Rift RO or external RO + external_ro = False + nsr = self._nsrs[vnfr.nsr_id_ref] + if (nsr.resource_orchestrator and + nsr.resource_orchestrator != 'rift'): + self._log.debug("VNFR {} using external RO". + format(vnfr.name)) + external_ro = True self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord( - self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr, - mgmt_network=mgmt_network + self._dts, self._log, self._loop, self._cluster_name, self, vnfr, + mgmt_network=mgmt_network, external_ro=external_ro, ) #Update ref count @@ -2663,15 +3110,18 @@ class VnfManager(object): self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1 del self._vnfrs[vnfr.vnfr_id] + self._deleted_vnfrs.append(vnfr.vnfr_id) @asyncio.coroutine def fetch_vnfd(self, vnfd_id): """ Fetch VNFDs based with the vnfd id""" - vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id) + vnfd_path = self._project.add_project( + VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)) self._log.debug("Fetch vnfd with path %s", vnfd_path) vnfd = None - res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE) + res_iter = yield from self._dts.query_read(vnfd_path, + rwdts.XactFlag.MERGE) for ent in res_iter: res = yield from ent @@ -2716,22 +3166,10 @@ class VnfManager(object): del self._vnfds_to_vnfr[vnfd_id] - # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/ - try: - rift_artifacts_dir = os.environ['RIFT_ARTIFACTS'] - vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id) - if os.path.exists(vnfd_dir): - shutil.rmtree(vnfd_dir, ignore_errors=True) - except Exception as e: - self._log.error("Exception in cleaning up VNFD {}: {}". - format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e)) - self._log.exception(e) - - def vnfd_refcount_xpath(self, vnfd_id): """ xpath for ref count entry """ - return (VnfdRefCountDtsHandler.XPATH + - "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id) + return self._project.add_project(VnfdRefCountDtsHandler.XPATH + + "[rw-vnfr:vnfd-id-ref={}]").format(quoted_key(vnfd_id)) @asyncio.coroutine def get_vnfd_refcount(self, vnfd_id): @@ -2739,18 +3177,75 @@ class VnfManager(object): vnfd_list = [] if vnfd_id is None or vnfd_id == "": for vnfd in self._vnfds_to_vnfr.keys(): - vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount() + vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount() vnfd_msg.vnfd_id_ref = vnfd vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd] vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg)) elif vnfd_id in self._vnfds_to_vnfr: - vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount() + vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount() vnfd_msg.vnfd_id_ref = vnfd_id vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id] vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg)) return vnfd_list + def add_vlr_id_vnfr_map(self, vlr_id, vnfr): + """ Add a mapping for vlr_id into VNFR """ + self._vnfr_for_vlr[vlr_id] = vnfr + + def remove_vlr_id_vnfr_map(self, vlr_id): + """ Remove a mapping for vlr_id into VNFR """ + del self._vnfr_for_vlr[vlr_id] + + def find_vnfr_for_vlr_id(self, vlr_id): + """ Find VNFR for VLR id """ + vnfr = None + if vlr_id in self._vnfr_for_vlr: + vnfr = self._vnfr_for_vlr[vlr_id] + + def vlr_event(self, vlr, action): + """ VLR event handler """ + self._log.debug("VnfManager: Received VLR %s with action:%s", vlr, action) + + if vlr.id not in self._vnfr_for_vlr: + self._log.warning("VLR %s:%s received for unknown id; %s", + vlr.id, vlr.name, vlr) + return + vnfr = self._vnfr_for_vlr[vlr.id] + + vnfr.vlr_event(vlr, action) + + +class VnfmProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(VnfmProject, self).__init__(tasklet.log, name) + self.update(tasklet) + + self._vnfm = None + + @asyncio.coroutine + def register (self): + try: + vm_parent_name = self._tasklet.tasklet_info.get_parent_vm_parent_instance_name() + assert vm_parent_name is not None + self._vnfm = VnfManager(self._dts, self.log, self.loop, self, vm_parent_name) + yield from self._vnfm.run() + except Exception: + print("Caught Exception in VNFM init:", sys.exc_info()[0]) + raise + + def deregister(self): + self._log.debug("De-register project {} for VnfmProject". + format(self.name)) + self._vnfm.deregister() + + @asyncio.coroutine + def delete_prepare(self): + if self._vnfm and self._vnfm._vnfrs: + delete_msg = "Project has VNFR associated with it. Delete all Project NSR and try again." + return False, delete_msg + return True, "True" class VnfmTasklet(rift.tasklets.Tasklet): """ VNF Manager tasklet class """ @@ -2760,7 +3255,12 @@ class VnfmTasklet(rift.tasklets.Tasklet): self.rwlog.set_subcategory("vnfm") self._dts = None - self._vnfm = None + self._project_handler = None + self.projects = {} + + @property + def dts(self): + return self._dts def start(self): try: @@ -2777,7 +3277,7 @@ class VnfmTasklet(rift.tasklets.Tasklet): self.log.debug("Created DTS Api GI Object: %s", self._dts) except Exception: - print("Caught Exception in VNFM start:", sys.exc_info()[0]) + self._log.error("Caught Exception in VNFM start:", sys.exc_info()[0]) raise def on_instance_started(self): @@ -2788,20 +3288,15 @@ class VnfmTasklet(rift.tasklets.Tasklet): try: self._dts.deinit() except Exception: - print("Caught Exception in VNFM stop:", sys.exc_info()[0]) + self._log.error("Caught Exception in VNFM stop:", sys.exc_info()[0]) raise @asyncio.coroutine def init(self): """ Task init callback """ - try: - vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name() - assert vm_parent_name is not None - self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name) - yield from self._vnfm.run() - except Exception: - print("Caught Exception in VNFM init:", sys.exc_info()[0]) - raise + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, VnfmProject) + self.project_handler.register() @asyncio.coroutine def run(self):