From b4aadc1e833e36a00d3ab47df7f4c6b37c087169 Mon Sep 17 00:00:00 2001 From: prithiv Date: Thu, 30 Mar 2017 14:17:45 -0400 Subject: [PATCH] Bug: #240 - NS Scaling Basic Changes Change-Id: I6ba1e4f6bc1bc21191250b6e005d2a37b21c95af Signed-off-by: prithiv --- .../python/rift/openmano/rift2openmano.py | 135 ++-- .../tasklets/rwnsmtasklet/openmano_nsm.py | 367 ++++++--- .../tasklets/rwnsmtasklet/rwnsmtasklet.py | 716 +++++++++++------- 3 files changed, 777 insertions(+), 441 deletions(-) diff --git a/models/openmano/python/rift/openmano/rift2openmano.py b/models/openmano/python/rift/openmano/rift2openmano.py index 2ce17c08..178cc1ef 100755 --- a/models/openmano/python/rift/openmano/rift2openmano.py +++ b/models/openmano/python/rift/openmano/rift2openmano.py @@ -34,7 +34,7 @@ from gi.repository import ( RwYang, RwVnfdYang, RwNsdYang, - ) +) import rift.package.store import rift.package.cloud_init @@ -221,13 +221,16 @@ def convert_vnfd_name(vnfd_name, member_idx): return vnfd_name + "__" + str(member_idx) -def rift2openmano_nsd(rift_nsd, rift_vnfds, openmano_vnfd_ids): - for vnfd_id in rift_nsd.vnfd_ids: - if vnfd_id not in rift_vnfds: - raise VNFNotFoundError("VNF id %s not provided" % vnfd_id) +def rift2openmano_nsd(rift_nsd, rift_vnfds, openmano_vnfd_ids, rift_vnfd_id=None): + if rift_vnfd_id is None: + for vnfd_id in rift_nsd.vnfd_ids: + if vnfd_id not in rift_vnfds: + raise VNFNotFoundError("VNF id %s not provided" % vnfd_id) openmano = {} openmano["name"] = rift_nsd.name + if rift_vnfd_id is not None: + openmano["name"] += "scale1" openmano["description"] = rift_nsd.description topology = {} openmano["topology"] = topology @@ -235,6 +238,8 @@ def rift2openmano_nsd(rift_nsd, rift_vnfds, openmano_vnfd_ids): topology["nodes"] = {} for vnfd in rift_nsd.constituent_vnfds: vnfd_id = vnfd.vnfd_id_ref + if rift_vnfd_id is not None and rift_vnfd_id != vnfd_id: + continue rift_vnfd = rift_vnfds[vnfd_id] member_idx = vnfd.member_vnf_index openmano_vnfd_id = openmano_vnfd_ids.get(vnfd_id,None) @@ -242,12 +247,12 @@ def rift2openmano_nsd(rift_nsd, rift_vnfds, openmano_vnfd_ids): topology["nodes"][rift_vnfd.name + "__" + str(member_idx)] = { "type": "VNF", "vnf_id": openmano_vnfd_id - } + } else: topology["nodes"][rift_vnfd.name + "__" + str(member_idx)] = { "type": "VNF", "VNF model": rift_vnfd.name - } + } for vld in rift_nsd.vlds: # Openmano has both bridge_net and dataplane_net models for network types @@ -267,14 +272,14 @@ def rift2openmano_nsd(rift_nsd, rift_vnfds, openmano_vnfd_ids): if True: if vld.name not in topology["nodes"]: topology["nodes"][vld.name] = { - "type": "external_network", - "model": vld.name, - } + "type": "external_network", + "model": vld.name, + } # Add the external network to the list of connection points topology["connections"][vld.name]["nodes"].append( - {vld.name: "0"} - ) + {vld.name: "0"} + ) elif vld.provider_network.has_field("physical_network"): # Add the external datacenter network to the topology # node list if it isn't already added @@ -285,14 +290,14 @@ def rift2openmano_nsd(rift_nsd, rift_vnfds, openmano_vnfd_ids): if ext_net_name not in topology["nodes"]: topology["nodes"][ext_net_name] = { - "type": "external_network", - "model": ext_net_name_with_seg, - } + "type": "external_network", + "model": ext_net_name_with_seg, + } # Add the external network to the list of connection points topology["connections"][vld.name]["nodes"].append( - {ext_net_name: "0"} - ) + {ext_net_name: "0"} + ) for vnfd_cp in vld.vnfd_connection_point_ref: @@ -310,9 +315,53 @@ def rift2openmano_nsd(rift_nsd, rift_vnfds, openmano_vnfd_ids): vnf_ref: vnfd_cp.vnfd_connection_point_ref } ) - return openmano +def rift2openmano_vnfd_nsd(rift_nsd, rift_vnfds, rift_vnfd_id): + + if rift_vnfd_id not in rift_vnfds: + print ("IDS", rift_vnfds) + raise VNFNotFoundError("VNF id %s not provided" % rift_vnfd_id) + + openmano_vnfd_nsd = {} + openmano_vnfd_nsd["name"] = rift_vnfd_id+'-'+'scaling_group' + openmano_vnfd_nsd["description"] = "Scaling Group" + topology = {} + openmano_vnfd_nsd["topology"] = topology + + topology["nodes"] = {} + openmano_vnfd_nsd["topology"] = topology + topology["connections"] = {} + topology["nodes"] = {} + topology["nodes"] = { + "type": "VNF", + "vnf_id": rift_vnfd_id + } + + for vld in rift_nsd.vlds: + + # Create a connections entry for each external VLD + topology["connections"][vld.name] = {} + topology["connections"][vld.name]["nodes"] = [] + + for vnfd_cp in vld.vnfd_connection_point_ref: + if not rit_vnfd_id in vnfd_cp.vnfd_id_ref: + continue + if rift_vnfd_id in vnfd_cp.vnfd_id_ref: + # Get the RIFT VNF for this external VLD connection point + vnfd = rift_vnfds[vnfd_cp.vnfd_id_ref] + # For each VNF in this connection, use the same interface name + topology["connections"][vld.name]["type"] = "link" + # Vnf ref is the vnf name with the member_vnf_idx appended + member_idx = vnfd_cp.member_vnf_index_ref + vnf_ref = rift_vnfd_id + "__" + str(member_idx) + topology["connections"][vld.name]["nodes"].append( + { + vnf_ref: vnfd_cp.vnfd_connection_point_ref + } + ) + return openmano_vnfd_nsd + def cloud_init(rift_vnfd_id, vdu): """ Populate cloud_init with script from either the inline contents or from the file provided @@ -324,7 +373,7 @@ def cloud_init(rift_vnfd_id, vdu): logger.debug("cloud_init script provided inline %s", vdu.cloud_init) cloud_init_msg = vdu.cloud_init elif vdu.cloud_init_file is not None: - # Get cloud-init script contents from the file provided in the cloud_init_file param + # Get cloud-init script contents from the file provided in the cloud_init_file param logger.debug("cloud_init script provided in file %s", vdu.cloud_init_file) filename = vdu.cloud_init_file vnfd_package_store.refresh() @@ -353,9 +402,9 @@ def config_file_init(rift_vnfd_id, vdu, cfg_file): stored_package = vnfd_package_store.get_package(rift_vnfd_id) cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(logger) try: - cfg_file_msg = cloud_init_extractor.read_script(stored_package, filename) + cfg_file_msg = cloud_init_extractor.read_script(stored_package, filename) except rift.package.cloud_init.CloudInitExtractionError as e: - raise ValueError(e) + raise ValueError(e) logger.debug("Current config file msg is {}".format(cfg_file_msg)) return cfg_file_msg @@ -427,7 +476,7 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): "VNFC": vdu.name, "local_iface_name": ext_if.name, "description": "%s iface on VDU %s" % (ext_if.name, vdu.name), - } + } vnf["external-connections"].append(connection) @@ -438,7 +487,7 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): "description": vld.description, "type": "bridge", "elements": [], - } + } # Add the specific VDU connection points for int_cp in vld.internal_connection_point: @@ -446,7 +495,7 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): connection["elements"].append({ "VNFC": vdu.name, "local_iface_name": int_if.name, - }) + }) if "internal-connections" not in vnf: vnf["internal-connections"] = [] @@ -459,7 +508,7 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): "name": vdu.name, "description": vdu.name, "bridge-ifaces": [], - } + } if vdu.vm_flavor.has_field("storage_gb") and vdu.vm_flavor.storage_gb: vnfc["disk"] = vdu.vm_flavor.storage_gb @@ -478,9 +527,9 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): dedicated_int = True if vdu.guest_epa.has_field("numa_node_policy") or dedicated_int: vnfc["numas"] = [{ - "memory": max(int(vdu.vm_flavor.memory_mb/1024), 1), - "interfaces":[], - }] + "memory": max(int(vdu.vm_flavor.memory_mb/1024), 1), + "interfaces":[], + }] numa_node_policy = vdu.guest_epa.numa_node_policy if numa_node_policy.has_field("node"): numa_node = numa_node_policy.node[0] @@ -491,9 +540,9 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): if len(numa_node.paired_threads.paired_thread_ids) > 0: vnfc["numas"][0]["paired-threads-id"] = [] for pair in numa_node.paired_threads.paired_thread_ids: - vnfc["numas"][0]["paired-threads-id"].append( - [pair.thread_a, pair.thread_b] - ) + vnfc["numas"][0]["paired-threads-id"].append( + [pair.thread_a, pair.thread_b] + ) else: if vdu.vm_flavor.has_field("vcpu_count"): @@ -528,7 +577,7 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): if vdu.has_field("volumes"): vnfc["devices"] = [] # Sort volumes as device-list is implictly ordered by Openmano - newvollist = sorted(vdu.volumes, key=lambda k: k.name) + newvollist = sorted(vdu.volumes, key=lambda k: k.name) for iter_num, volume in enumerate(newvollist): if iter_num == 0: # Convert the first volume to vnfc.image @@ -543,7 +592,7 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): device = {} device["type"] = volume.device_type device["image"] = volume.image - vnfc["devices"].append(device) + vnfc["devices"].append(device) vnfc_boot_data_init = False if vdu.has_field("cloud_init") or vdu.has_field("cloud_init_file"): @@ -565,14 +614,14 @@ def rift2openmano_vnfd(rift_vnfd, rift_nsd): cfg_source = config_file_init(rift_vnfd.id, vdu, custom_config_file.source) om_cfgfile_list.append({"dest":custom_config_file.dest, "content": cfg_source}) vnfc['boot-data']['config-files'] = om_cfgfile_list - + vnf["VNFC"].append(vnfc) for int_if in list(vdu.internal_interface) + list(vdu.external_interface): intf = { "name": int_if.name, - } + } if int_if.virtual_interface.has_field("vpci"): intf["vpci"] = int_if.virtual_interface.vpci @@ -627,14 +676,14 @@ def parse_args(argv=sys.argv[1:]): '-o', '--outdir', default='-', help="Directory to output converted descriptors. Default is stdout", - ) + ) parser.add_argument( '-n', '--nsd-file-hdl', metavar="nsd_file", type=argparse.FileType('r'), help="Rift NSD Descriptor File", - ) + ) parser.add_argument( '-v', '--vnfd-file-hdls', @@ -642,7 +691,7 @@ def parse_args(argv=sys.argv[1:]): action='append', type=argparse.FileType('r'), help="Rift VNFD Descriptor File", - ) + ) args = parser.parse_args(argv) @@ -676,8 +725,8 @@ def write_yaml_to_file(name, outdir, desc_dict): def main(argv=sys.argv[1:]): args = parse_args(argv) - nsd = None + rift_vnfd_id = 'test_vnfd' openmano_vnfr_ids = dict() vnf_dict = None if args.vnfd_file_hdls is not None: @@ -689,10 +738,10 @@ def main(argv=sys.argv[1:]): if args.nsd_file_hdl is not None: nsd = create_nsd_from_file(args.nsd_file_hdl) - openmano_nsd = rift2openmano_nsd(nsd, vnf_dict, openmano_vnfr_ids) - + openmano_nsd = rift2openmano_nsd(nsd, vnf_dict, openmano_vnfr_ids,rift_vnfd_id) + vnfd_nsd = rift2openmano_vnfd_nsd(nsd, vnf_dict, openmano_vnfr_ids, rift_vnfd_id) write_yaml_to_file(openmano_nsd["name"], args.outdir, openmano_nsd) - + write_yaml_to_file(vnfd_nsd["name"], args.outdir, vnfd_nsd) for vnf in vnf_dict.values(): openmano_vnf = rift2openmano_vnfd(vnf, nsd) write_yaml_to_file(openmano_vnf["vnf"]["name"], args.outdir, openmano_vnf) @@ -700,4 +749,4 @@ def main(argv=sys.argv[1:]): if __name__ == "__main__": logging.basicConfig(level=logging.WARNING) - main() + main() \ No newline at end of file diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/openmano_nsm.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/openmano_nsm.py index d53d701d..6a19cb93 100644 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/openmano_nsm.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/openmano_nsm.py @@ -8,6 +8,7 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # +#test # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -42,21 +43,21 @@ if sys.version_info < (3, 4, 4): DUMP_OPENMANO_DIR = os.path.join( - os.environ["RIFT_ARTIFACTS"], - "openmano_descriptors" - ) + os.environ["RIFT_ARTIFACTS"], + "openmano_descriptors" +) def dump_openmano_descriptor(name, descriptor_str): filename = "{}_{}.yaml".format( time.strftime("%Y%m%d-%H%M%S"), name - ) + ) filepath = os.path.join( - DUMP_OPENMANO_DIR, - filename - ) + DUMP_OPENMANO_DIR, + filename + ) try: if not os.path.exists(DUMP_OPENMANO_DIR): @@ -99,7 +100,7 @@ class VnfrConsoleOperdataDtsHandler(object): self._log.debug( "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s", xact_info, action, xpath, msg - ) + ) if action == rwdts.QueryAction.READ: schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema() @@ -107,17 +108,17 @@ class VnfrConsoleOperdataDtsHandler(object): try: console_url = yield from self._loop.run_in_executor( - None, - self._nsr._http_api.get_instance_vm_console_url, - self._nsr._nsr_uuid, - self._vdur_id - ) + None, + self._nsr._http_api.get_instance_vm_console_url, + self._nsr._nsr_uuid, + self._vdur_id, + ) self._log.debug("Got console response: %s for NSR ID %s vdur ID %s", - console_url, - self._nsr._nsr_uuid, - self._vdur_id - ) + console_url, + self._nsr._nsr_uuid, + self._vdur_id + ) vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur() vdur_console.id = self._vdur_id if console_url: @@ -127,14 +128,14 @@ class VnfrConsoleOperdataDtsHandler(object): self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console)) except openmano_client.InstanceStatusError as e: self._log.error("Could not get NS instance console URL: %s", - str(e)) + str(e)) vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur() vdur_console.id = self._vdur_id vdur_console.console_url = 'none' xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK, - xpath=self.vnfr_vdu_console_xpath, - msg=vdur_console) + xpath=self.vnfr_vdu_console_xpath, + msg=vdur_console) else: #raise VnfRecordError("Not supported operation %s" % action) self._log.error("Not supported operation %s" % action) @@ -200,9 +201,9 @@ class OpenmanoVnfr(object): # If the name already exists, get the openmano vnfd id name_uuid_map = yield from self._loop.run_in_executor( - None, - self._cli_api.vnf_list, - ) + None, + self._cli_api.vnf_list, + ) if name in name_uuid_map: vnf_id = name_uuid_map[name] @@ -211,15 +212,15 @@ class OpenmanoVnfr(object): return self._vnf_id, _ = yield from self._loop.run_in_executor( - None, - self._cli_api.vnf_create, - self.openmano_vnfd_yaml, - ) + None, + self._cli_api.vnf_create, + self.openmano_vnfd_yaml, + ) fpath = dump_openmano_descriptor( - "{}_vnf".format(name), - self.openmano_vnfd_yaml - ) + "{}_vnf".format(name), + self.openmano_vnfd_yaml + ) self._log.debug("Dumped Openmano VNF descriptor to: %s", fpath) @@ -236,10 +237,10 @@ class OpenmanoVnfr(object): return yield from self._loop.run_in_executor( - None, - self._cli_api.vnf_delete, - self._vnf_id, - ) + None, + self._cli_api.vnf_delete, + self._vnf_id, + ) class OpenmanoNSRecordState(Enum): @@ -261,7 +262,7 @@ class OpenmanoNSRecordState(Enum): class OpenmanoNsr(object): TIMEOUT_SECS = 300 - def __init__(self, dts, log, loop, publisher, cli_api, http_api, nsd_msg, nsr_config_msg,key_pairs): + def __init__(self, dts, log, loop, publisher, cli_api, http_api, nsd_msg, nsr_config_msg,key_pairs, rift_vnfd_id=None): self._dts = dts self._log = log self._loop = loop @@ -274,6 +275,7 @@ class OpenmanoNsr(object): self._vlrs = [] self._vnfrs = [] + self._nsrs = {} self._vdur_console_handler = {} self._key_pairs = key_pairs @@ -285,12 +287,29 @@ class OpenmanoNsr(object): self._created = False self._monitor_task = None + self._rift_vnfd_id = rift_vnfd_id self._state = OpenmanoNSRecordState.INIT @property def nsd(self): return rift2openmano.RiftNSD(self._nsd_msg) + @property + def nsd_msg(self): + return self._nsd_msg + + @property + def nsr_config_msg(self): + return self._nsr_config_msg + + @property + def nsr_msg(self): + return self._nsr_msg + + @property + def key_pairs(self): + return self._key_pairs + @property def vnfds(self): return {v.rift_vnfd_id: v.vnfd for v in self._vnfrs} @@ -309,6 +328,13 @@ class OpenmanoNsr(object): openmano_nsd = rift2openmano.rift2openmano_nsd(self.nsd, self.vnfds,self.vnfr_ids) return yaml.safe_dump(openmano_nsd, default_flow_style=False) + @property + def scaling_group_nsd_yaml(self): + self._log.debug("Creating Scaling Group Scenarion Descriptor for VNF %s", self._rift_vnfd_id) + openmano_vnfd_nsd = rift2openmano.rift2openmano_vnfd_nsd(self.nsd, self.vnfds, self.vnfr_ids, self._rift_vnfd_id) + return yaml.safe_dump(vnfd_nsd, default_flow_style=False) + + @property def get_ssh_key_pairs(self): cloud_config = {} key_pairs = list() @@ -353,15 +379,15 @@ class OpenmanoNsr(object): @property def openmano_instance_create_yaml(self): - self._log.debug("Creating instance-scenario-create input file for nsd %s with name %s", self.nsd.id, self._nsr_config_msg.name) + self._log.debug("Creating instance-scenario-create input file for nsd %s with name %s", self.nsd.id, self._nsr_config_msg.name, self._rift_vnfd_id) openmano_instance_create = {} openmano_instance_create["name"] = self._nsr_config_msg.name openmano_instance_create["description"] = self._nsr_config_msg.description openmano_instance_create["scenario"] = self._nsd_uuid - cloud_config = self.get_ssh_key_pairs() - if cloud_config: - openmano_instance_create["cloud-config"] = cloud_config + #cloud_config = self.get_ssh_key_pairs() + #if cloud_config: + # openmano_instance_create["cloud-config"] = cloud_config if self._nsr_config_msg.has_field("om_datacenter"): openmano_instance_create["datacenter"] = self._nsr_config_msg.om_datacenter openmano_instance_create["vnfs"] = {} @@ -376,7 +402,7 @@ class OpenmanoNsr(object): for vlr in self._vlrs: if vlr.vld_msg.name == vld_msg.name: self._log.debug("Received VLR name %s, VLR DC: %s for VLD: %s",vlr.vld_msg.name, - vlr.om_datacenter_name,vld_msg.name) + vlr.om_datacenter_name,vld_msg.name) #network["vim-network-name"] = vld_msg.name network = {} ip_profile = {} @@ -414,6 +440,38 @@ class OpenmanoNsr(object): return yaml.safe_dump(openmano_instance_create, default_flow_style=False,width=1000) + @property + def scaling_instance_create_yaml(self): + self._log.debug("Creating instance-scenario-create input file for nsd %s with name %s", self.nsd.id, self._nsr_config_msg.name) + scaling_instance_create = {} + scaling_instance_create["vnfs"] = {} + scaling_instance_create["name"] = self._nsr_config_msg.name+"scal1" + scaling_instance_create["description"] = "Scaling Group Instance Scenario File" + scaling_instance_create["scenario"] = self._rift_vnfd_id + + if self._nsr_config_msg.has_field("om_datacenter"): + scaling_instance_create["datacenter"] = self._nsr_config_msg.om_datacenter + scaling_instance_create["vnfs"] = {} + for vnfr in self._vnfrs: + if "om_datacenter" in vnfr.vnfr.vnfr_msg: + vnfr_name = vnfr.vnfr.vnfd.name + "__" + str(vnfr.vnfr.vnfr_msg.member_vnf_index_ref) + scaling_instance_create["vnfs"][vnfr_name] = {"datacenter": vnfr.vnfr.vnfr_msg.om_datacenter} + scaling_instance_create["networks"] = {} + for vld_msg in self._nsd_msg.vld: + scaling_instance_create["networks"][vld_msg.name] = {} + scaling_instance_create["networks"][vld_msg.name]["sites"] = list() + for vlr in self._vlrs: + if vlr.vld_msg.name == vld_msg.name: + self._log.debug("Received VLR name %s, VLR DC: %s for VLD: %s",vlr.vld_msg.name, + vlr.om_datacenter_name,vld_msg.name) + #network["vim-network-name"] = vld_msg.name + network = {} + ip_profile = {} + if vld_msg.vim_network_name: + network["netmap-use"] = vld_msg.vim_network_name + return yaml.safe_dump(scaling_instance_create, default_flow_style=False,width=1000) + + @asyncio.coroutine def add_vlr(self, vlr): self._vlrs.append(vlr) @@ -446,6 +504,10 @@ class OpenmanoNsr(object): yield from vnfr.create() self._vnfrs.append(vnfr) + @asyncio.coroutine + def add_nsr(self, nsr, vnfr): + self._nsrs[vnfr.id] = nsr + @asyncio.coroutine def delete(self): if not self._created: @@ -455,10 +517,10 @@ class OpenmanoNsr(object): self._log.debug("Deleting openmano nsr") yield from self._loop.run_in_executor( - None, - self._cli_api.ns_delete, - self._nsd_uuid, - ) + None, + self._cli_api.ns_delete, + self._nsd_uuid, + ) self._log.debug("Deleting openmano vnfrs") for vnfr in self._vnfrs: @@ -469,9 +531,9 @@ class OpenmanoNsr(object): def create(self): self._log.debug("Creating openmano scenario") name_uuid_map = yield from self._loop.run_in_executor( - None, - self._cli_api.ns_list, - ) + None, + self._cli_api.ns_list, + ) if self._nsd_msg.name in name_uuid_map: self._log.debug("Found existing openmano scenario") @@ -483,20 +545,36 @@ class OpenmanoNsr(object): # scenario on reload or to support muliple instances of the name # nsd self._nsd_uuid, _ = yield from self._loop.run_in_executor( - None, - self._cli_api.ns_create, - self.openmano_nsd_yaml, - self._nsd_msg.name - ) + None, + self._cli_api.ns_create, + self.openmano_nsd_yaml, + self._nsd_msg.name + ) fpath = dump_openmano_descriptor( - "{}_nsd".format(self._nsd_msg.name), - self.openmano_nsd_yaml, - ) + "{}_nsd".format(self._nsd_msg.name), + self.openmano_nsd_yaml, + ) self._log.debug("Dumped Openmano NS descriptor to: %s", fpath) self._created = True + @asyncio.coroutine + def scaling_scenario_create(self): + self._log.debug("Creating Openmano Scenario for the Scaling Group VNF") + self._vnfd_nsd_uuid, _ = yield from self._loop.run_in_executor( + None, + self._cli_api.ns_create, + self.scaling_group_nsd_yaml, + self._rift_vnfd_id + ) + fpath = dump_openmano_descriptor( + "{}_scaling_group_nsd".format(self._rift_vnfd_id), + self.scaling_group_nsd_yaml, + ) + self._log.debug("Dumped Scaling group scenario to '%s'", fpath) + self._created = True + @asyncio.coroutine def instance_monitor_task(self): self._log.debug("Starting Instance monitoring task") @@ -509,14 +587,14 @@ class OpenmanoNsr(object): try: instance_resp_json = yield from self._loop.run_in_executor( - None, - self._http_api.get_instance, - self._nsr_uuid, - ) + None, + self._http_api.get_instance, + self._nsr_uuid, + ) self._log.debug("Got instance response: %s for NSR ID %s", - instance_resp_json, - self._nsr_uuid) + instance_resp_json, + self._nsr_uuid) except openmano_client.InstanceStatusError as e: self._log.error("Could not get NS instance status: %s", str(e)) @@ -593,8 +671,8 @@ class OpenmanoNsr(object): # to come up with openmano constituent VNF name. Use this # knowledge to map the vnfr back. openmano_vnfr_suffix = "__{}".format( - vnfr.vnfr.vnfr_msg.member_vnf_index_ref - ) + vnfr.vnfr.vnfr_msg.member_vnf_index_ref + ) for vnf in instance_resp_json["vnfs"]: if vnf["vnf_name"].endswith(openmano_vnfr_suffix): @@ -643,8 +721,8 @@ class OpenmanoNsr(object): if vnf_ip_address is None: self._log.warning("No IP address obtained " - "for VNF: {}, will retry.".format( - vnf_status['vnf_name'])) + "for VNF: {}, will retry.".format( + vnf_status['vnf_name'])) continue self._log.debug("All VMs in VNF are active. Marking as running.") @@ -658,7 +736,7 @@ class OpenmanoNsr(object): for vm in vnf_status["vms"]: if vm["uuid"] not in self._vdur_console_handler: vdur_console_handler = VnfrConsoleOperdataDtsHandler(self._dts, self._log, self._loop, - self, vnfr_msg.id,vm["uuid"],vm["name"]) + self, vnfr_msg.id,vm["uuid"],vm["name"]) yield from vdur_console_handler.register() self._vdur_console_handler[vm["uuid"]] = vdur_console_handler @@ -699,9 +777,9 @@ class OpenmanoNsr(object): self._log.debug("Deploying openmano scenario") name_uuid_map = yield from self._loop.run_in_executor( - None, - self._cli_api.ns_instance_list, - ) + None, + self._cli_api.ns_instance_list, + ) if self._nsr_config_msg.name in name_uuid_map: self._log.debug("Found existing instance with nsr name: %s", self._nsr_config_msg.name) @@ -709,21 +787,37 @@ class OpenmanoNsr(object): else: self._nsr_msg = nsr_msg fpath = dump_openmano_descriptor( - "{}_instance_sce_create".format(self._nsr_config_msg.name), - self.openmano_instance_create_yaml, - ) + "{}_instance_sce_create".format(self._nsr_config_msg.name), + self.openmano_instance_create_yaml, + ) self._log.debug("Dumped Openmano NS Scenario Cretae to: %s", fpath) self._nsr_uuid = yield from self._loop.run_in_executor( - None, - self._cli_api.ns_instance_scenario_create, - self.openmano_instance_create_yaml) + None, + self._cli_api.ns_instance_scenario_create, + self.scaling_instance_create_yaml) + self._state = OpenmanoNSRecordState.INSTANTIATION_PENDING self._monitor_task = asyncio.ensure_future( - self.instance_monitor_task(), loop=self._loop - ) + self.instance_monitor_task(), loop=self._loop + ) + + @asyncio.coroutine + def scaling_scenario_deploy(self, nsr_msg): + self._log.debug("Deploying Scaling scenario") + self._nsr_msg = nsr_msg + self._rift_vnfd_id = rift_vnfd_id + fpath = dump_openmano_descriptor( + "{}_scale_instance".format(self._nsr_config_msg.name), + self.scaling_instance_create_yaml + ) + self._state = OpenmanoNSRecordState.INIT + + self._monitor_task = asyncio.ensure_future( + self.instance_monitor_task(), loop=self._loop + ) @asyncio.coroutine def terminate(self): @@ -741,15 +835,15 @@ class OpenmanoNsr(object): self._log.debug("Terminating openmano nsr") yield from self._loop.run_in_executor( - None, - self._cli_api.ns_terminate, - self._nsr_uuid, - ) + None, + self._cli_api.ns_terminate, + self._nsr_uuid, + ) @asyncio.coroutine def create_vlr(self,vlr): self._log.debug("Creating openmano vim network VLR name %s, VLR DC: %s",vlr.vld_msg.name, - vlr.om_datacenter_name) + vlr.om_datacenter_name) net_create = {} net = {} net['name'] = vlr.name @@ -762,7 +856,7 @@ class OpenmanoNsr(object): if ip_profile_params.ip_version == "ipv6": ip_profile['ip_version'] = "IPv6" else: - ip_profile['ip_version'] = "IPv4" + ip_profile['ip_version'] = "IPv4" if ip_profile_params.has_field('subnet_address'): ip_profile['subnet_address'] = ip_profile_params.subnet_address if ip_profile_params.has_field('gateway_address'): @@ -774,22 +868,23 @@ class OpenmanoNsr(object): ip_profile['dhcp_start_address'] = ip_profile_params.dhcp_params.start_address ip_profile['dhcp_count'] = ip_profile_params.dhcp_params.count net['ip_profile'] = ip_profile - net_create["network"]= net + net_create["network"]= net net_create_msg = yaml.safe_dump(net_create,default_flow_style=False) fpath = dump_openmano_descriptor( "{}_vim_net_create_{}".format(self._nsr_config_msg.name,vlr.name), - net_create_msg) + net_create_msg) self._log.debug("Dumped Openmano VIM Net create to: %s", fpath) vim_network_uuid = yield from self._loop.run_in_executor( - None, - self._cli_api.ns_vim_network_create, - net_create_msg, - vlr.om_datacenter_name) + None, + self._cli_api.ns_vim_network_create, + net_create_msg, + vlr.om_datacenter_name) self._vlrs.append(vlr) - + + class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): """ @@ -800,7 +895,6 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): self._log = log self._loop = loop self._publisher = publisher - self._cli_api = None self._http_api = None self._openmano_nsrs = {} @@ -811,18 +905,18 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): def _set_ro_account(self, ro_account): self._log.debug("Setting openmano plugin cloud account: %s", ro_account) self._cli_api = openmano_client.OpenmanoCliAPI( - self.log, - ro_account.openmano.host, - ro_account.openmano.port, - ro_account.openmano.tenant_id, - ) + self.log, + ro_account.openmano.host, + ro_account.openmano.port, + ro_account.openmano.tenant_id, + ) self._http_api = openmano_client.OpenmanoHttpAPI( - self.log, - ro_account.openmano.host, - ro_account.openmano.port, - ro_account.openmano.tenant_id, - ) + self.log, + ro_account.openmano.host, + ro_account.openmano.port, + ro_account.openmano.tenant_id, + ) def set_state(self, nsr_id, state): # Currently we update only during terminate to @@ -833,21 +927,21 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): OpenmanoNSRecordState.__members__.items() \ if member.value == state.value] - def create_nsr(self, nsr_config_msg, nsd_msg, key_pairs=None): + def create_nsr(self, nsr_config_msg, nsd_msg, key_pairs=None): """ Create Network service record """ openmano_nsr = OpenmanoNsr( - self._dts, - self._log, - self._loop, - self._publisher, - self._cli_api, - self._http_api, - nsd_msg, - nsr_config_msg, - key_pairs - ) + self._dts, + self._log, + self._loop, + self._publisher, + self._cli_api, + self._http_api, + nsd_msg, + nsr_config_msg, + key_pairs + ) self._openmano_nsrs[nsr_config_msg.id] = openmano_nsr @asyncio.coroutine @@ -857,6 +951,13 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): yield from openmano_nsr.create() yield from openmano_nsr.deploy(nsr_msg) + @asyncio.coroutine + def deploy(self, nsr_msg): + self._log.debug("Received Scale out msg") + openmano_nsr = self._openmano_nsrs[nsr_msg.ns_instance_config_ref] + yield from openmano_nsr.scaling_scenario_create() + yield from openmano_nsr.scaling_scenario_deploy(nsr_msg) + @asyncio.coroutine def instantiate_ns(self, nsr, xact): """ @@ -865,14 +966,44 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): yield from nsr.instantiate(xact) @asyncio.coroutine - def instantiate_vnf(self, nsr, vnfr): + def instantiate_vnf(self, nsr, vnfr, scaleout=False): """ Instantiate NSR with the passed nsr id """ openmano_nsr = self._openmano_nsrs[nsr.id] - yield from openmano_nsr.add_vnfr(vnfr) + if scaleout: + openmano_vnf_nsr = OpenmanoNsr( + self._dts, + self._log, + self._loop, + self._publisher, + self._cli_api, + self._http_api, + openmano_nsr.nsd_msg, + openmano_nsr.nsr_config_msg, + openmano_nsr.key_pairs, + vnfr.vnfd.id + ) + # Add new nsr to parent nsr + try: + yield from openmano_nsr.add_nsr(openmano_vnf_nsr, vnfr) + except Exception as e: + self.log.exception("Add VNFR Error ", str(e)) + try: + yield from openmano_vnf_nsr.add_vnfr(vnfr) + except Exception as e: + self.log.exception("Scenario Create Error", str(e)) + try: + yield from openmano_vnf_nsr.create() + except Exception as e: + self.log.exception("Starting instance scenario create ", str(e)) + try: + yield from openmano_vnf_nsr.deploy(openmano_vnf_nsr.nsr_msg) + except Exception as e: + self.log.exception("Unable to deploy scaling scenario ", str(e)) + else: + yield from openmano_nsr.add_vnfr(vnfr) - # Mark the VNFR as running # TODO: Create a task to monitor nsr/vnfr status vnfr_msg = vnfr.vnfr_msg.deep_copy() vnfr_msg.operational_status = "init" @@ -882,6 +1013,8 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): yield from self._publisher.publish_vnfr(xact, vnfr_msg) self._log.debug("Creating a task to update uptime for vnfr: %s", vnfr.id) self._vnfr_uptime_tasks[vnfr.id] = self._loop.create_task(self.vnfr_uptime_update(vnfr)) + with self._dts.transaction() as xact: + yield from openmano_vnf_nsr._publisher.publish_vnfr(xact, vnfr_msg) def vnfr_uptime_update(self, vnfr): try: @@ -903,7 +1036,7 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): if openmano_nsr._state == OpenmanoNSRecordState.RUNNING: yield from openmano_nsr.create_vlr(vlr) yield from self._publisher.publish_vlr(None, vlr.vlr_msg) - else: + else: yield from openmano_nsr.add_vlr(vlr) @asyncio.coroutine @@ -942,4 +1075,4 @@ class OpenmanoNsPlugin(rwnsmplugin.NsmPluginBase): if openmano_nsr._state == OpenmanoNSRecordState.RUNNING: yield from openmano_nsr.delete_vlr(vlr) else: - yield from openmano_nsr.remove_vlr(vlr) + yield from openmano_nsr.remove_vlr(vlr) \ No newline at end of file diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index b78a279b..78a84e82 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -257,7 +257,7 @@ class VnffgRecord(object): "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, - } + } vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) for rsp in self._vnffgd_msg.rsp: vnffgr_rsp = vnffgr.rsp.add() @@ -281,38 +281,38 @@ class VnffgRecord(object): vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref vnfr_cp_ref.service_function_type = vnfd[0].service_function_type for nsr_vnfr in self._nsr.vnfrs.values(): - if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and - nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref): - vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id - vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name - vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref - - vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) - self._log.debug(" Received VNFR is %s", vnfr) - while vnfr.operational_status != 'running': - self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) - if vnfr.operational_status == 'failed': - self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) - yield from asyncio.sleep(2, loop=self._loop) - vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) - self._log.debug("Received VNFR is %s", vnfr) - - vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address - for cp in vnfr.connection_point: - if cp.name == vnfr_cp_ref.vnfr_connection_point_ref: - vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id - vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name - for vdu in vnfr.vdur: - for ext_intf in vdu.external_interface: - if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref: - vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id - self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, + if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and + nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref): + vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id + vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name + vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref + + vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) + self._log.debug(" Received VNFR is %s", vnfr) + while vnfr.operational_status != 'running': + self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) + if vnfr.operational_status == 'failed': + self._log.error("Fetching VNFR for %s failed", vnfr.id) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + yield from asyncio.sleep(2, loop=self._loop) + vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) + self._log.debug("Received VNFR is %s", vnfr) + + vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address + for cp in vnfr.connection_point: + if cp.name == vnfr_cp_ref.vnfr_connection_point_ref: + vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id + vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name + for vdu in vnfr.vdur: + for ext_intf in vdu.external_interface: + if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref: + vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id + self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, vnfr_cp_ref.connection_point_params.vm_id) - break + break - vnfr_cp_ref.connection_point_params.address = cp.ip_address - vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT + vnfr_cp_ref.connection_point_params.address = cp.ip_address + vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT for vnffgd_classifier in self._vnffgd_msg.classifier: _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref] @@ -329,37 +329,37 @@ class VnffgRecord(object): vnffgr_classifier.rsp_id_ref = rsp_id_ref vnffgr_classifier.rsp_name = rsp_name for nsr_vnfr in self._nsr.vnfrs.values(): - if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and - nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref): - vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id - vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name - vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref - - if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER': - vnffgr_classifier.sff_name = nsr_vnfr.name - - vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) - self._log.debug(" Received VNFR is %s", vnfr) - while vnfr.operational_status != 'running': - self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) - if vnfr.operational_status == 'failed': - self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) - yield from asyncio.sleep(2, loop=self._loop) - vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) - self._log.debug("Received VNFR is %s", vnfr) - - for cp in vnfr.connection_point: - if cp.name == vnffgr_classifier.vnfr_connection_point_ref: - vnffgr_classifier.port_id = cp.connection_point_id - vnffgr_classifier.ip_address = cp.ip_address - for vdu in vnfr.vdur: - for ext_intf in vdu.external_interface: - if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref: - vnffgr_classifier.vm_id = vdu.vim_id - self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, - vnfr_cp_ref.connection_point_params.vm_id) - break + if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and + nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref): + vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id + vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name + vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref + + if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER': + vnffgr_classifier.sff_name = nsr_vnfr.name + + vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) + self._log.debug(" Received VNFR is %s", vnfr) + while vnfr.operational_status != 'running': + self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) + if vnfr.operational_status == 'failed': + self._log.error("Fetching VNFR for %s failed", vnfr.id) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + yield from asyncio.sleep(2, loop=self._loop) + vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) + self._log.debug("Received VNFR is %s", vnfr) + + for cp in vnfr.connection_point: + if cp.name == vnffgr_classifier.vnfr_connection_point_ref: + vnffgr_classifier.port_id = cp.connection_point_id + vnffgr_classifier.ip_address = cp.ip_address + for vdu in vnfr.vdur: + for ext_intf in vdu.external_interface: + if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref: + vnffgr_classifier.vm_id = vdu.vim_id + self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, + vnfr_cp_ref.connection_point_params.vm_id) + break self._log.info("VNFFGR msg to be sent is %s", vnffgr) return vnffgr @@ -377,8 +377,8 @@ class VnffgRecord(object): while vnfr.operational_status != 'running': self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) if vnfr.operational_status == 'failed': - self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + self._log.error("Fetching VNFR for %s failed", vnfr.id) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) yield from asyncio.sleep(2, loop=self._loop) vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) @@ -431,8 +431,8 @@ class VnffgRecord(object): def vnffgr_in_vnffgrm(self): """ Is there a VNFR record in VNFM """ if (self._vnffgr_state == VnffgRecordState.ACTIVE or - self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or - self._vnffgr_state == VnffgRecordState.FAILED): + self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or + self._vnffgr_state == VnffgRecordState.FAILED): return True return False @@ -469,21 +469,21 @@ class VirtualLinkRecord(object): VirtualLinkRecord """ vlr_obj = VirtualLinkRecord( - dts, - log, - loop, - nsr_name, - vld_msg, - cloud_account_name, - om_datacenter, - ip_profile, - nsr_id, - ) + dts, + log, + loop, + nsr_name, + vld_msg, + cloud_account_name, + om_datacenter, + ip_profile, + nsr_id, + ) if restart_mode: res_iter = yield from dts.query_read( - "D,/vlr:vlr-catalog/vlr:vlr", - rwdts.XactFlag.MERGE) + "D,/vlr:vlr-catalog/vlr:vlr", + rwdts.XactFlag.MERGE) for fut in res_iter: response = yield from fut @@ -632,9 +632,9 @@ class VirtualLinkRecord(object): for conn in self.vld_msg.vnfd_connection_point_ref: for vnfr in vnfrs: if (vnfr.vnfd.id == conn.vnfd_id_ref and - vnfr.member_vnf_index == conn.member_vnf_index_ref and - self.cloud_account_name == vnfr.cloud_account_name and - self.om_datacenter_name == vnfr.om_datacenter_name): + vnfr.member_vnf_index == conn.member_vnf_index_ref and + self.cloud_account_name == vnfr.cloud_account_name and + self.om_datacenter_name == vnfr.om_datacenter_name): cp_entry = nsr_vlr.vnfr_connection_point_ref.add() cp_entry.vnfr_id = vnfr.id cp_entry.connection_point = conn.vnfd_connection_point_ref @@ -678,9 +678,9 @@ class VirtualLinkRecord(object): def vlr_in_vns(self): """ Is there a VLR record in VNS """ if (self._state == VlRecordState.ACTIVE or - self._state == VlRecordState.INSTANTIATION_PENDING or - self._state == VlRecordState.TERMINATE_PENDING or - self._state == VlRecordState.FAILED): + self._state == VlRecordState.INSTANTIATION_PENDING or + self._state == VlRecordState.TERMINATE_PENDING or + self._state == VlRecordState.FAILED): return True return False @@ -722,8 +722,8 @@ class VirtualNetworkFunctionRecord(object): @staticmethod @asyncio.coroutine def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name, - cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id, - placement_groups, restart_mode=False): + cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id, + placement_groups, restart_mode=False): """Creates a new VNFR object based on the given data. If restart mode is enabled, then we look for existing records in the @@ -733,25 +733,25 @@ class VirtualNetworkFunctionRecord(object): VirtualNetworkFunctionRecord """ vnfr_obj = VirtualNetworkFunctionRecord( - dts, - log, - loop, - vnfd, - const_vnfd_msg, - nsd_id, - nsr_name, - cloud_account_name, - om_datacenter_name, - nsr_id, - group_name, - group_instance_id, - placement_groups, - restart_mode=restart_mode) + dts, + log, + loop, + vnfd, + const_vnfd_msg, + nsd_id, + nsr_name, + cloud_account_name, + om_datacenter_name, + nsr_id, + group_name, + group_instance_id, + placement_groups, + restart_mode=restart_mode) if restart_mode: res_iter = yield from dts.query_read( - "D,/vnfr:vnfr-catalog/vnfr:vnfr", - rwdts.XactFlag.MERGE) + "D,/vnfr:vnfr-catalog/vnfr:vnfr", + rwdts.XactFlag.MERGE) for fut in res_iter: response = yield from fut @@ -931,34 +931,34 @@ class VirtualNetworkFunctionRecord(object): def configure(self): self.config_store.merge_vnfd_config( - self._nsd_id, - self._vnfd, - self.member_vnf_index, - ) + self._nsd_id, + self._vnfd, + self.member_vnf_index, + ) def create_vnfr_msg(self): """ VNFR message for this VNFR """ vnfd_fields = [ - "short_name", - "vendor", - "description", - "version", - "type_yang", - ] + "short_name", + "vendor", + "description", + "version", + "type_yang", + ] vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields} vnfr_dict = { - "id": self.id, - "nsr_id_ref": self._nsr_id, - "name": self.name, - "cloud_account": self._cloud_account_name, - "om_datacenter": self._om_datacenter_name, - "config_status": self.config_status - } + "id": self.id, + "nsr_id_ref": self._nsr_id, + "name": self.name, + "cloud_account": self._cloud_account_name, + "om_datacenter": self._om_datacenter_name, + "config_status": self.config_status + } vnfr_dict.update(vnfd_copy_dict) vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(), - ignore_missing_keys=True) + ignore_missing_keys=True) vnfr.member_vnf_index_ref = self.member_vnf_index vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict()) @@ -980,10 +980,10 @@ class VirtualNetworkFunctionRecord(object): self._log.debug("Send an update to VNFM for VNFR {} with {}". format(self.name, self.vnfr_msg)) yield from self._dts.query_update( - self.xpath, - rwdts.XactFlag.TRACE, - self.vnfr_msg - ) + self.xpath, + rwdts.XactFlag.TRACE, + self.vnfr_msg + ) def get_config_status(self): """Return the config status as YANG ENUM""" @@ -1030,7 +1030,7 @@ class VirtualNetworkFunctionRecord(object): yield from self.update_vnfm() except Exception as e: self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}". - format(status, self.name, e)) + format(status, self.name, e)) self._log.exception(e) def is_configured(self): @@ -1059,9 +1059,9 @@ class VirtualNetworkFunctionRecord(object): for vlr in nsr.vlrs: for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref: if (vnfd_cp.vnfd_id_ref == self._vnfd.id and - vnfd_cp.vnfd_connection_point_ref == conn.name and - vnfd_cp.member_vnf_index_ref == self.member_vnf_index and - vlr.cloud_account_name == self.cloud_account_name): + vnfd_cp.vnfd_connection_point_ref == conn.name and + vnfd_cp.member_vnf_index_ref == self.member_vnf_index and + vlr.cloud_account_name == self.cloud_account_name): self._log.debug("Found VLR for cp_name:%s and vnf-index:%d", conn.name, self.member_vnf_index) return vlr @@ -1073,13 +1073,13 @@ class VirtualNetworkFunctionRecord(object): cpr.name = conn_p.name cpr.type_yang = conn_p.type_yang if conn_p.has_field('port_security_enabled'): - cpr.port_security_enabled = conn_p.port_security_enabled + cpr.port_security_enabled = conn_p.port_security_enabled vlr_ref = find_vlr_for_cp(conn_p) if vlr_ref is None: msg = "Failed to find VLR for cp = %s" % conn_p.name self._log.debug("%s", msg) -# raise VirtualNetworkFunctionRecordError(msg) + # raise VirtualNetworkFunctionRecordError(msg) continue cpr.vlr_ref = vlr_ref.id @@ -1127,8 +1127,8 @@ class VirtualNetworkFunctionRecord(object): def vnfr_in_vnfm(self): """ Is there a VNFR record in VNFM """ if (self._state == VnfRecordState.ACTIVE or - self._state == VnfRecordState.INSTANTIATION_PENDING or - self._state == VnfRecordState.FAILED): + self._state == VnfRecordState.INSTANTIATION_PENDING or + self._state == VnfRecordState.FAILED): return True return False @@ -1208,7 +1208,7 @@ class NetworkServiceStatus(object): "FAILED": "failed", "VL_INSTANTIATE": "vl_instantiate", "VL_TERMINATE": "vl_terminate", - } + } return state_to_str_map[self._state.name] @property @@ -1405,8 +1405,8 @@ class NetworkServiceRecord(object): def __str__(self): return "NSR(name={}, nsd_id={}, cloud_account={})".format( - self.name, self.nsd_id, self.cloud_account_name - ) + self.name, self.nsd_id, self.cloud_account_name + ) def _get_vnfd(self, vnfd_id, config_xact): """ Fetch vnfd msg for the passed vnfd id """ @@ -1415,10 +1415,10 @@ class NetworkServiceRecord(object): def _get_vnfd_cloud_account(self, vnfd_member_index): """ Fetch Cloud Account for the passed vnfd id """ if self._nsr_cfg_msg.vnf_cloud_account_map: - vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \ - if vnfd_member_index == vnf.member_vnf_index_ref] - if vim_accounts and vim_accounts[0]: - return vim_accounts[0] + vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \ + if vnfd_member_index == vnf.member_vnf_index_ref] + if vim_accounts and vim_accounts[0]: + return vim_accounts[0] return (self.cloud_account_name,self.om_datacenter_name) def _get_constituent_vnfd_msg(self, vnf_index): @@ -1573,7 +1573,7 @@ class NetworkServiceRecord(object): tmp_file = None with tempfile.NamedTemporaryFile(delete=False) as tmp_file: tmp_file.write(yaml.dump(data, default_flow_style=True) - .encode("UTF-8")) + .encode("UTF-8")) self._log.debug("Creating a temp file: {} with input data: {}". format(tmp_file.name, data)) @@ -1646,19 +1646,19 @@ class NetworkServiceRecord(object): err_msg = None if not rc: err_msg = "Failed config for trigger {} using config script '{}'". \ - format(self.scaling_trigger_str(trigger), - config_primitive.user_defined_script) + format(self.scaling_trigger_str(trigger), + config_primitive.user_defined_script) yield from update_config_status(success=rc, err_msg=err_msg) return rc else: err_msg = "Failed config for trigger {} as config script is not specified". \ - format(self.scaling_trigger_str(trigger)) + format(self.scaling_trigger_str(trigger)) yield from update_config_status(success=False, err_msg=err_msg) raise NotImplementedError("Only script based config support for scale group for now: {}". format(group.name)) else: - err_msg = "Failed config for trigger {} as config primitive is not specified".\ - format(self.scaling_trigger_str(trigger)) + err_msg = "Failed config for trigger {} as config primitive is not specified". \ + format(self.scaling_trigger_str(trigger)) yield from update_config_status(success=False, err_msg=err_msg) self._log.error("Config primitive not specified for config action in scale group %s" % (group.name)) @@ -1673,9 +1673,9 @@ class NetworkServiceRecord(object): scaling_group_msg.name, self.id) group_record = scale_group.ScalingGroup( - self._log, - scaling_group_msg - ) + self._log, + scaling_group_msg + ) self._scaling_groups[group_record.name] = group_record @@ -1701,7 +1701,6 @@ class NetworkServiceRecord(object): vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index) scale_instance.add_vnfr(vnfr) vnfrs.append(vnfr) - return vnfrs @asyncio.coroutine @@ -1722,7 +1721,7 @@ class NetworkServiceRecord(object): format(group.name, index)) scale_instance.operational_status = "failed" else: - yield from self.instantiate_vnfs(vnfrs) + yield from self.instantiate_vnfs(vnfrs, scaleout=True) except Exception as e: self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}". @@ -1803,7 +1802,7 @@ class NetworkServiceRecord(object): if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]): instance.operational_status = "terminated" rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN, - group, instance) + group, instance) if rc: self._log.debug("Scale in for group {} and instance {} succeeded". format(group.name, instance.instance_id)) @@ -1847,16 +1846,16 @@ class NetworkServiceRecord(object): VirtualLinkRecord """ vlr = yield from VirtualLinkRecord.create_record( - self._dts, - self._log, - self._loop, - self.name, - vld, - cloud_account, - om_datacenter, - self.resolve_vld_ip_profile(self.nsd_msg, vld), - self.id, - restart_mode=self.restart_mode) + self._dts, + self._log, + self._loop, + self.name, + vld, + cloud_account, + om_datacenter, + self.resolve_vld_ip_profile(self.nsd_msg, vld), + self.id, + restart_mode=self.restart_mode) return vlr @@ -1883,8 +1882,8 @@ class NetworkServiceRecord(object): for vnfc in vld.vnfd_connection_point_ref: cloud_account = vnf_cloud_map.get( - vnfc.member_vnf_index_ref, - (self.cloud_account_name,self.om_datacenter_name)) + vnfc.member_vnf_index_ref, + (self.cloud_account_name,self.om_datacenter_name)) cloud_account_list.append(cloud_account) @@ -1950,7 +1949,7 @@ class NetworkServiceRecord(object): except Exception as e: err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \ - format(self.id, vld.id, e) + format(self.id, vld.id, e) self._log.error(err_msg) self._log.exception(e) vlr.state = VlRecordState.FAILED @@ -1973,7 +1972,7 @@ class NetworkServiceRecord(object): except Exception as e: err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \ - format(self.id, vld.id, e) + format(self.id, vld.id, e) self._log.error(err_msg) self._log.exception(e) vlr.state = VlRecordState.FAILED @@ -2008,7 +2007,7 @@ class NetworkServiceRecord(object): for group in self.nsd_msg.placement_groups: for member_vnfd in group.member_vnfd: if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \ - (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): + (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): group_info = self.resolve_placement_group_cloud_construct(group) if group_info is None: self._log.error("Could not resolve cloud-construct for placement group: %s", group.name) @@ -2026,26 +2025,25 @@ class NetworkServiceRecord(object): # Fetch the VNFD associated with this VNF placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd) self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name) - self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s, restart mode self.restart_mode %s", + self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s", vnfd_msg.name, const_vnfd.member_vnf_index, - [ group.name for group in placement_groups], - self.restart_mode) + [ group.name for group in placement_groups]) vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts, - self._log, - self._loop, - vnfd_msg, - const_vnfd, - self.nsd_id, - self.name, - cloud_account_name, - om_datacenter_name, - self.id, - group_name, - group_instance_id, - placement_groups, - restart_mode=self.restart_mode, - ) + self._log, + self._loop, + vnfd_msg, + const_vnfd, + self.nsd_id, + self.name, + cloud_account_name, + om_datacenter_name, + self.id, + group_name, + group_instance_id, + placement_groups, + restart_mode=self.restart_mode, + ) if vnfr.id in self._vnfrs: err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,) raise NetworkServiceRecordError(err) @@ -2069,16 +2067,16 @@ class NetworkServiceRecord(object): end_value = param_pool.range.end_value if end_value < start_value: raise NetworkServiceRecordError( - "Parameter pool %s has invalid range (start: {}, end: {})".format( - start_value, end_value - ) - ) + "Parameter pool %s has invalid range (start: {}, end: {})".format( + start_value, end_value + ) + ) self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool( - self._log, - param_pool.name, - range(start_value, end_value) - ) + self._log, + param_pool.name, + range(start_value, end_value) + ) @asyncio.coroutine def fetch_vnfr(self, vnfr_path): @@ -2095,14 +2093,14 @@ class NetworkServiceRecord(object): return vnfr @asyncio.coroutine - def instantiate_vnfs(self, vnfrs): + def instantiate_vnfs(self, vnfrs, scaleout=False): """ This function instantiates VNFs for every VNF in this Network Service """ self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id) for vnf in vnfrs: self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id) - yield from self.nsm_plugin.instantiate_vnf(self, vnf) + yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout) @asyncio.coroutine def instantiate_vnffgs(self): @@ -2137,8 +2135,8 @@ class NetworkServiceRecord(object): for i in range(group.min_instance_count): self._log.debug("Instantiating %s default scaling instance %s", group, i) yield from self.create_scale_group_instance( - group.name, i, config_xact, is_default=True - ) + group.name, i, config_xact, is_default=True + ) for group_msg in self._nsr_cfg_msg.scaling_group: if group_msg.scaling_group_name_ref != group.name: @@ -2147,8 +2145,8 @@ class NetworkServiceRecord(object): for instance in group_msg.instance: self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id) yield from self.create_scale_group_instance( - group.name, instance.id, config_xact, is_default=False - ) + group.name, instance.id, config_xact, is_default=False + ) def has_scaling_instances(self): """ Return boolean indicating if the network service has default scaling groups """ @@ -2192,7 +2190,7 @@ class NetworkServiceRecord(object): return( "D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" - ).format(self.id) + ).format(self.id) @staticmethod def xpath_from_nsr(nsr): @@ -2205,7 +2203,7 @@ class NetworkServiceRecord(object): """ Return NSD config xpath.""" return( "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']" - ).format(self.nsd_id) + ).format(self.nsd_id) @asyncio.coroutine def instantiate(self, config_xact): @@ -2351,9 +2349,12 @@ class NetworkServiceRecord(object): def on_instantiate_done(fut): # If the do_instantiate fails, then publish NSR with failed result - if fut.exception() is not None: - self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(fut.exception())) - self._loop.create_task(self.instantiation_failed(failed_reason=str(fut.exception()))) + e = fut.exception() + if e is not None: + import traceback, sys + print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True) + self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(e)) + self._loop.create_task(self.instantiation_failed(failed_reason=str(e))) instantiate_task = self._loop.create_task(do_instantiate()) instantiate_task.add_done_callback(on_instantiate_done) @@ -2368,7 +2369,7 @@ class NetworkServiceRecord(object): if self._config_status == NsrYang.ConfigStates.FAILED: self.record_event("config-failed", "NS configuration failed", - evt_details=self._config_status_details) + evt_details=self._config_status_details) yield from self.publish() @@ -2502,7 +2503,7 @@ class NetworkServiceRecord(object): for cfg_prim in self.nsd_msg.service_primitive: cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( - cfg_prim.as_dict()) + cfg_prim.as_dict()) nsr.service_primitive.append(cfg_prim) for init_cfg in self.nsd_msg.initial_config_primitive: @@ -2601,7 +2602,7 @@ class NetworkServiceRecord(object): break else: self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s", - vnffgr.id, self.id, vnffgr.state) + vnffgr.id, self.id, vnffgr.state) new_state = curr_state # Update all the scaling group instance operational status to @@ -2679,11 +2680,11 @@ class InputParameterSubstitution(object): continue self.log.debug( - "input-parameter:{} = {}".format( - param.xpath, - param.value, - ) - ) + "input-parameter:{} = {}".format( + param.xpath, + param.value, + ) + ) try: xpath.setxattr(nsd, param.xpath, param.value) @@ -2839,7 +2840,7 @@ class NsdDtsHandler(object): self._log.debug( "Registering for NSD config using xpath: %s", NsdDtsHandler.XPATH, - ) + ) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: @@ -2914,7 +2915,7 @@ class VnfdDtsHandler(object): self._log.debug( "Registering for VNFD config using xpath: %s", VnfdDtsHandler.XPATH, - ) + ) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: # Need a list in scratch to store VNFDs to create/update later @@ -2971,15 +2972,15 @@ class NsrRpcDtsHandler(object): self._log.debug("Attemping NsmTasklet netconf connection.") manager = yield from ncclient.asyncio_manager.asyncio_connect( - loop=self._loop, - host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS, - port=NsrRpcDtsHandler.NETCONF_PORT, - username=NsrRpcDtsHandler.NETCONF_USER, - password=NsrRpcDtsHandler.NETCONF_PW, - allow_agent=False, - look_for_keys=False, - hostkey_verify=False, - ) + loop=self._loop, + host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS, + port=NsrRpcDtsHandler.NETCONF_PORT, + username=NsrRpcDtsHandler.NETCONF_USER, + password=NsrRpcDtsHandler.NETCONF_PW, + allow_agent=False, + look_for_keys=False, + hostkey_verify=False, + ) return manager @@ -2990,7 +2991,7 @@ class NsrRpcDtsHandler(object): yield from asyncio.sleep(5, loop=self._loop) raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" % - timeout_secs) + timeout_secs) def _apply_ns_instance_config(self,payload_dict): #self._log.debug("At apply NS instance config with payload %s",payload_dict) @@ -3007,8 +3008,8 @@ class NsrRpcDtsHandler(object): assert action == rwdts.QueryAction.RPC rpc_ip = msg rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({ - "nsr_id":str(uuid.uuid4()) - }) + "nsr_id":str(uuid.uuid4()) + }) if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip)) @@ -3026,7 +3027,7 @@ class NsrRpcDtsHandler(object): # self._manager = yield from self._connect() self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s", - rpc_ip.name, rpc_ip.nsd_ref) + rpc_ip.name, rpc_ip.nsd_ref) ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"} ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items() @@ -3044,7 +3045,7 @@ class NsrRpcDtsHandler(object): #self._log.debug("Sending configure ns-instance-config xml to %s: %s", # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS) self._log.debug("Sending configure ns-instance-config json to %s: %s", - self._nsr_config_url,ns_instance_config) + self._nsr_config_url,ns_instance_config) #response = yield from self._manager.edit_config( # target="running", @@ -3054,7 +3055,7 @@ class NsrRpcDtsHandler(object): None, self._apply_ns_instance_config, payload_dict - ) + ) response.raise_for_status() self._log.debug("Received edit config response: %s", response.json()) @@ -3075,7 +3076,7 @@ class NsrRpcDtsHandler(object): self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH, handler=hdl_ns, flags=rwdts.Flag.PUBLISHER, - ) + ) class NsrDtsHandler(object): @@ -3248,7 +3249,7 @@ class NsrDtsHandler(object): raise NetworkServiceRecordError(err) self._log.debug("Creating NetworkServiceRecord %s from nsr config %s", - msg.id, msg.as_dict()) + msg.id, msg.as_dict()) nsr = self.nsm.create_nsr(msg, key_pairs=key_pairs, restart_mode=restart_mode) return nsr @@ -3337,9 +3338,9 @@ class NsrDtsHandler(object): xpath = ks_path.to_xpath(RwNsrYang.get_schema()) action = xact_info.query_action self._log.debug( - "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)", - xact, action, xact_info, xpath, msg - ) + "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)", + xact, action, xact_info, xpath, msg + ) @asyncio.coroutine def delete_instantiation(ns_id): @@ -3408,18 +3409,18 @@ class NsrDtsHandler(object): acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - on_prepare=on_prepare) + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + on_prepare=on_prepare) self._scale_regh = acg.register( - xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, - ) + xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH, + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, + ) self._key_pair_regh = acg.register( - xpath=NsrDtsHandler.KEY_PAIR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - ) + xpath=NsrDtsHandler.KEY_PAIR_XPATH, + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + ) class NsrOpDataDtsHandler(object): @@ -3522,7 +3523,7 @@ class VnfrDtsHandler(object): self._log.debug( "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s", xact_info, action, ks_path, msg - ) + ) schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema() path_entry = schema.keyspec_to_entry(ks_path) @@ -3620,11 +3621,11 @@ class NsManager(object): self._ro_plugin_selector = ro_plugin_selector self._ncclient = rift.mano.ncclient.NcClient( - host="127.0.0.1", - port=2022, - username="admin", - password="admin", - loop=self._loop) + host="127.0.0.1", + port=2022, + username="admin", + password="admin", + loop=self._loop) self._nsrs = {} self._nsds = {} @@ -3641,6 +3642,7 @@ class NsManager(object): VnfrDtsHandler(dts, log, loop, self), NsdRefCountDtsHandler(dts, log, loop, self), NsrDtsHandler(dts, log, loop, self), + ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback), NsrRpcDtsHandler(dts,log,loop,self), self._vnfd_dts_handler, self.cfgmgr_obj, @@ -3749,6 +3751,74 @@ class NsManager(object): self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id)) + def scale_rpc_callback(self, xact, msg, action): + """Callback handler for RPC calls + Args: + xact : Transaction Handler + msg : RPC input + action : Scaling Action + """ + ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance + ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup + + xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( + msg.nsr_id_ref) + instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) + + @asyncio.coroutine + def get_nsr_scaling_group(): + results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) + + for result in results: + res = yield from result + nsr_config = res.result + + for scaling_group in nsr_config.scaling_group: + if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref: + break + else: + scaling_group = nsr_config.scaling_group.add() + scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref + + return (nsr_config, scaling_group) + + @asyncio.coroutine + def update_config(nsr_config): + xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config) + xml = '{}'.format(xml) + yield from self._ncclient.connect() + yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace") + + @asyncio.coroutine + def scale_out(): + nsr_config, scaling_group = yield from get_nsr_scaling_group() + scaling_group.instance.append(instance) + yield from update_config(nsr_config) + + @asyncio.coroutine + def scale_in(): + nsr_config, scaling_group = yield from get_nsr_scaling_group() + scaling_group.instance.remove(instance) + yield from update_config(nsr_config) + + if action == ScalingRpcHandler.ACTION.SCALE_OUT: + self._loop.create_task(scale_out()) + else: + self._loop.create_task(scale_in()) + + # Opdata based calls, disabled for now! + # if action == ScalingRpcHandler.ACTION.SCALE_OUT: + # self.scale_nsr_out( + # msg.nsr_id_ref, + # msg.scaling_group_name_ref, + # msg.instance_id, + # xact) + # else: + # self.scale_nsr_in( + # msg.nsr_id_ref, + # msg.scaling_group_name_ref, + # msg.instance_id) + def nsr_update_cfg(self, nsr_id, msg): nsr = self._nsrs[nsr_id] nsr.nsr_cfg_msg= msg @@ -3778,10 +3848,9 @@ class NsManager(object): self._log.error(msg) raise NetworkServiceRecordError(msg) - self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s, restart mode %s", + self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s", nsr_msg.id, - nsr_msg.nsd.id, - restart_mode) + nsr_msg.nsd.id) nsm_plugin = self._ro_plugin_selector.ro_plugin sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account) @@ -3908,12 +3977,12 @@ class NsManager(object): raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id) nsd = NetworkServiceDescriptor( - self._dts, - self._log, - self._loop, - nsd_msg, - self - ) + self._dts, + self._log, + self._loop, + nsd_msg, + self + ) self._nsds[nsd_msg.id] = nsd return nsd @@ -4122,6 +4191,91 @@ class NsmRecordsPublisherProxy(object): path = VirtualLinkRecord.vlr_xpath(vlr) return (yield from self._vlr_pub_hdlr.delete(xact, path)) + +class ScalingRpcHandler(mano_dts.DtsHandler): + """ The Network service Monitor DTS handler """ + SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in" + SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in" + + SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out" + SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out" + + ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT') + + def __init__(self, log, dts, loop, callback=None): + super().__init__(log, dts, loop) + self.callback = callback + self.last_instance_id = defaultdict(int) + + @asyncio.coroutine + def register(self): + + @asyncio.coroutine + def on_scale_in_prepare(xact_info, action, ks_path, msg): + assert action == rwdts.QueryAction.RPC + + try: + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) + + rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ + "instance_id": msg.instance_id}) + + xact_info.respond_xpath( + rwdts.XactRspCode.ACK, + self.__class__.SCALE_IN_OUTPUT_XPATH, + rpc_op) + + except Exception as e: + self.log.exception(e) + xact_info.respond_xpath( + rwdts.XactRspCode.NACK, + self.__class__.SCALE_IN_OUTPUT_XPATH) + + @asyncio.coroutine + def on_scale_out_prepare(xact_info, action, ks_path, msg): + assert action == rwdts.QueryAction.RPC + + try: + scaling_group = msg.scaling_group_name_ref + if not msg.instance_id: + last_instance_id = self.last_instance_id[scale_group] + msg.instance_id = last_instance_id + 1 + self.last_instance_id[scale_group] += 1 + + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) + + rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ + "instance_id": msg.instance_id}) + + xact_info.respond_xpath( + rwdts.XactRspCode.ACK, + self.__class__.SCALE_OUT_OUTPUT_XPATH, + rpc_op) + + except Exception as e: + self.log.exception(e) + xact_info.respond_xpath( + rwdts.XactRspCode.NACK, + self.__class__.SCALE_OUT_OUTPUT_XPATH) + + scale_in_hdl = rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_scale_in_prepare) + scale_out_hdl = rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_scale_out_prepare) + + with self.dts.group_create() as group: + group.register( + xpath=self.__class__.SCALE_IN_INPUT_XPATH, + handler=scale_in_hdl, + flags=rwdts.Flag.PUBLISHER) + group.register( + xpath=self.__class__.SCALE_OUT_INPUT_XPATH, + handler=scale_out_hdl, + flags=rwdts.Flag.PUBLISHER) + + class NsmTasklet(rift.tasklets.Tasklet): """ The network service manager tasklet @@ -4193,28 +4347,28 @@ class NsmTasklet(rift.tasklets.Tasklet): self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop) self._records_publisher_proxy = NsmRecordsPublisherProxy( - self._dts, - self.log, - self.loop, - self._nsr_pub_handler, - self._vnfr_pub_handler, - self._vlr_pub_handler, - ) + self._dts, + self.log, + self.loop, + self._nsr_pub_handler, + self._vnfr_pub_handler, + self._vlr_pub_handler, + ) # Register the NSM to receive the nsm plugin # when cloud account is configured self._ro_plugin_selector = cloud.ROAccountPluginSelector( - self._dts, - self.log, - self.loop, - self._records_publisher_proxy, - ) + self._dts, + self.log, + self.loop, + self._records_publisher_proxy, + ) yield from self._ro_plugin_selector.register() self._cloud_account_handler = cloud.CloudAccountConfigSubscriber( - self._log, - self._dts, - self.log_hdl) + self._log, + self._dts, + self.log_hdl) yield from self._cloud_account_handler.register() @@ -4222,17 +4376,17 @@ class NsmTasklet(rift.tasklets.Tasklet): yield from self._vnffgmgr.register() self._nsm = NsManager( - self._dts, - self.log, - self.loop, - self._nsr_pub_handler, - self._vnfr_pub_handler, - self._vlr_pub_handler, - self._ro_plugin_selector, - self._vnffgmgr, - self._vnfd_pub_handler, - self._cloud_account_handler - ) + self._dts, + self.log, + self.loop, + self._nsr_pub_handler, + self._vnfr_pub_handler, + self._vlr_pub_handler, + self._ro_plugin_selector, + self._vnffgmgr, + self._vnfd_pub_handler, + self._cloud_account_handler + ) yield from self._nsm.register() -- 2.25.1