X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fns.py;h=d187dd8469f2c22940df6d660204ab45aa03a8d1;hb=refs%2Fchanges%2F83%2F11483%2F7;hp=2db33e497ee204a8957c87e733fe0bdda9052380;hpb=f29a91f8d792936bf9c264f3e6075784b58ebc68;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index 2db33e49..d187dd84 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -16,38 +16,39 @@ # limitations under the License. ## -# import yaml +from http import HTTPStatus import logging +from random import choice as random_choice +from threading import Lock +from time import time from traceback import format_exc as traceback_format_exc -from osm_ng_ro.ns_thread import NsWorker, NsWorkerException, deep_get -from osm_ng_ro.validation import validate_input, deploy_schema +from typing import Any, Dict, Tuple, Type +from uuid import uuid4 + +from cryptography.hazmat.backends import default_backend as crypto_default_backend +from cryptography.hazmat.primitives import serialization as crypto_serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from jinja2 import ( + Environment, + StrictUndefined, + TemplateError, + TemplateNotFound, + UndefinedError, +) from osm_common import ( - dbmongo, dbmemory, + dbmongo, fslocal, fsmongo, - msglocal, msgkafka, + msglocal, version as common_version, ) -from osm_common.dbbase import DbException -from osm_common.fsbase import FsException +from osm_common.dbbase import DbBase, DbException +from osm_common.fsbase import FsBase, FsException from osm_common.msgbase import MsgException -from http import HTTPStatus -from uuid import uuid4 -from threading import Lock -from random import choice as random_choice -from time import time -from jinja2 import ( - Environment, - TemplateError, - TemplateNotFound, - StrictUndefined, - UndefinedError, -) -from cryptography.hazmat.primitives import serialization as crypto_serialization -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.backends import default_backend as crypto_default_backend +from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException +from osm_ng_ro.validation import deploy_schema, validate_input __author__ = "Alfonso Tierno " min_common_version = "0.1.16" @@ -294,15 +295,31 @@ class Ns(object): for target_id in vims_to_unload: self._unload_vim(target_id) - def _get_cloud_init(self, where): - """ - Not used as cloud init content is provided in the http body. This method reads cloud init from a file - :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' - :return: + @staticmethod + def _get_cloud_init( + db: Type[DbBase], + fs: Type[FsBase], + location: str, + ) -> str: + """This method reads cloud init from a file. + + Note: Not used as cloud init content is provided in the http body. + + Args: + db (Type[DbBase]): [description] + fs (Type[FsBase]): [description] + location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' + + Raises: + NsException: [description] + NsException: [description] + + Returns: + str: [description] """ - vnfd_id, _, other = where.partition(":") + vnfd_id, _, other = location.partition(":") _type, _, name = other.partition(":") - vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + vnfd = db.get_one("vnfds", {"_id": vnfd_id}) if _type == "file": base_folder = vnfd["_admin"]["storage"] @@ -310,23 +327,42 @@ class Ns(object): base_folder["folder"], base_folder["pkg-dir"], name ) - if not self.fs: + if not fs: raise NsException( "Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format( cloud_init_file ) ) - with self.fs.file_open(cloud_init_file, "r") as ci_file: + with fs.file_open(cloud_init_file, "r") as ci_file: cloud_init_content = ci_file.read() elif _type == "vdu": cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"] else: - raise NsException("Mismatch descriptor for cloud init: {}".format(where)) + raise NsException("Mismatch descriptor for cloud init: {}".format(location)) return cloud_init_content - def _parse_jinja2(self, cloud_init_content, params, context): + @staticmethod + def _parse_jinja2( + cloud_init_content: str, + params: Dict[str, Any], + context: str, + ) -> str: + """Function that processes the cloud init to replace Jinja2 encoded parameters. + + Args: + cloud_init_content (str): [description] + params (Dict[str, Any]): [description] + context (str): [description] + + Raises: + NsException: [description] + NsException: [description] + + Returns: + str: [description] + """ try: env = Environment(undefined=StrictUndefined) template = env.from_string(cloud_init_content) @@ -385,6 +421,666 @@ class Ns(object): return db_content + @staticmethod + def _create_task( + deployment_info: Dict[str, Any], + target_id: str, + item: str, + action: str, + target_record: str, + target_record_id: str, + extra_dict: Dict[str, Any] = None, + ) -> Dict[str, Any]: + """Function to create task dict from deployment information. + + Args: + deployment_info (Dict[str, Any]): [description] + target_id (str): [description] + item (str): [description] + action (str): [description] + target_record (str): [description] + target_record_id (str): [description] + extra_dict (Dict[str, Any], optional): [description]. Defaults to None. + + Returns: + Dict[str, Any]: [description] + """ + task = { + "target_id": target_id, # it will be removed before pushing at database + "action_id": deployment_info.get("action_id"), + "nsr_id": deployment_info.get("nsr_id"), + "task_id": f"{deployment_info.get('action_id')}:{deployment_info.get('task_index')}", + "status": "SCHEDULED", + "action": action, + "item": item, + "target_record": target_record, + "target_record_id": target_record_id, + } + + if extra_dict: + task.update(extra_dict) # params, find_params, depends_on + + deployment_info["task_index"] = deployment_info.get("task_index", 0) + 1 + + return task + + @staticmethod + def _create_ro_task( + target_id: str, + task: Dict[str, Any], + ) -> Dict[str, Any]: + """Function to create an RO task from task information. + + Args: + target_id (str): [description] + task (Dict[str, Any]): [description] + + Returns: + Dict[str, Any]: [description] + """ + now = time() + + _id = task.get("task_id") + db_ro_task = { + "_id": _id, + "locked_by": None, + "locked_at": 0.0, + "target_id": target_id, + "vim_info": { + "created": False, + "created_items": None, + "vim_id": None, + "vim_name": None, + "vim_status": None, + "vim_details": None, + "refresh_at": None, + }, + "modified_at": now, + "created_at": now, + "to_check_at": now, + "tasks": [task], + } + + return db_ro_task + + @staticmethod + def _process_image_params( + target_image: Dict[str, Any], + indata: Dict[str, Any], + vim_info: Dict[str, Any], + target_record_id: str, + **kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + """Function to process VDU image parameters. + + Args: + target_image (Dict[str, Any]): [description] + indata (Dict[str, Any]): [description] + vim_info (Dict[str, Any]): [description] + target_record_id (str): [description] + + Returns: + Dict[str, Any]: [description] + """ + find_params = {} + + if target_image.get("image"): + find_params["filter_dict"] = {"name": target_image.get("image")} + + if target_image.get("vim_image_id"): + find_params["filter_dict"] = {"id": target_image.get("vim_image_id")} + + if target_image.get("image_checksum"): + find_params["filter_dict"] = { + "checksum": target_image.get("image_checksum") + } + + return {"find_params": find_params} + + @staticmethod + def _get_resource_allocation_params( + quota_descriptor: Dict[str, Any], + ) -> Dict[str, Any]: + """Read the quota_descriptor from vnfd and fetch the resource allocation properties from the + descriptor object. + + Args: + quota_descriptor (Dict[str, Any]): cpu/mem/vif/disk-io quota descriptor + + Returns: + Dict[str, Any]: quota params for limit, reserve, shares from the descriptor object + """ + quota = {} + + if quota_descriptor.get("limit"): + quota["limit"] = int(quota_descriptor["limit"]) + + if quota_descriptor.get("reserve"): + quota["reserve"] = int(quota_descriptor["reserve"]) + + if quota_descriptor.get("shares"): + quota["shares"] = int(quota_descriptor["shares"]) + + return quota + + @staticmethod + def _process_guest_epa_quota_params( + guest_epa_quota: Dict[str, Any], + epa_vcpu_set: bool, + ) -> Dict[str, Any]: + """Function to extract the guest epa quota parameters. + + Args: + guest_epa_quota (Dict[str, Any]): [description] + epa_vcpu_set (bool): [description] + + Returns: + Dict[str, Any]: [description] + """ + result = {} + + if guest_epa_quota.get("cpu-quota") and not epa_vcpu_set: + cpuquota = Ns._get_resource_allocation_params( + guest_epa_quota.get("cpu-quota") + ) + + if cpuquota: + result["cpu-quota"] = cpuquota + + if guest_epa_quota.get("mem-quota"): + vduquota = Ns._get_resource_allocation_params( + guest_epa_quota.get("mem-quota") + ) + + if vduquota: + result["mem-quota"] = vduquota + + if guest_epa_quota.get("disk-io-quota"): + diskioquota = Ns._get_resource_allocation_params( + guest_epa_quota.get("disk-io-quota") + ) + + if diskioquota: + result["disk-io-quota"] = diskioquota + + if guest_epa_quota.get("vif-quota"): + vifquota = Ns._get_resource_allocation_params( + guest_epa_quota.get("vif-quota") + ) + + if vifquota: + result["vif-quota"] = vifquota + + return result + + @staticmethod + def _process_guest_epa_numa_params( + guest_epa_quota: Dict[str, Any], + ) -> Tuple[Dict[str, Any], bool]: + """[summary] + + Args: + guest_epa_quota (Dict[str, Any]): [description] + + Returns: + Tuple[Dict[str, Any], bool]: [description] + """ + numa = {} + epa_vcpu_set = False + + if guest_epa_quota.get("numa-node-policy"): + numa_node_policy = guest_epa_quota.get("numa-node-policy") + + if numa_node_policy.get("node"): + numa_node = numa_node_policy["node"][0] + + if numa_node.get("num-cores"): + numa["cores"] = numa_node["num-cores"] + epa_vcpu_set = True + + paired_threads = numa_node.get("paired-threads", {}) + if paired_threads.get("num-paired-threads"): + numa["paired-threads"] = int( + numa_node["paired-threads"]["num-paired-threads"] + ) + epa_vcpu_set = True + + if paired_threads.get("paired-thread-ids"): + numa["paired-threads-id"] = [] + + for pair in paired_threads["paired-thread-ids"]: + numa["paired-threads-id"].append( + ( + str(pair["thread-a"]), + str(pair["thread-b"]), + ) + ) + + if numa_node.get("num-threads"): + numa["threads"] = int(numa_node["num-threads"]) + epa_vcpu_set = True + + if numa_node.get("memory-mb"): + numa["memory"] = max(int(int(numa_node["memory-mb"]) / 1024), 1) + + return numa, epa_vcpu_set + + @staticmethod + def _process_guest_epa_cpu_pinning_params( + guest_epa_quota: Dict[str, Any], + vcpu_count: int, + epa_vcpu_set: bool, + ) -> Tuple[Dict[str, Any], bool]: + """[summary] + + Args: + guest_epa_quota (Dict[str, Any]): [description] + vcpu_count (int): [description] + epa_vcpu_set (bool): [description] + + Returns: + Tuple[Dict[str, Any], bool]: [description] + """ + numa = {} + local_epa_vcpu_set = epa_vcpu_set + + if ( + guest_epa_quota.get("cpu-pinning-policy") == "DEDICATED" + and not epa_vcpu_set + ): + numa[ + "cores" + if guest_epa_quota.get("cpu-thread-pinning-policy") != "PREFER" + else "threads" + ] = max(vcpu_count, 1) + local_epa_vcpu_set = True + + return numa, local_epa_vcpu_set + + @staticmethod + def _process_epa_params( + target_flavor: Dict[str, Any], + ) -> Dict[str, Any]: + """[summary] + + Args: + target_flavor (Dict[str, Any]): [description] + + Returns: + Dict[str, Any]: [description] + """ + extended = {} + numa = {} + + if target_flavor.get("guest-epa"): + guest_epa = target_flavor["guest-epa"] + + numa, epa_vcpu_set = Ns._process_guest_epa_numa_params( + guest_epa_quota=guest_epa + ) + + if guest_epa.get("mempage-size"): + extended["mempage-size"] = guest_epa.get("mempage-size") + + tmp_numa, epa_vcpu_set = Ns._process_guest_epa_cpu_pinning_params( + guest_epa_quota=guest_epa, + vcpu_count=int(target_flavor.get("vcpu-count", 1)), + epa_vcpu_set=epa_vcpu_set, + ) + numa.update(tmp_numa) + + extended.update( + Ns._process_guest_epa_quota_params( + guest_epa_quota=guest_epa, + epa_vcpu_set=epa_vcpu_set, + ) + ) + + if numa: + extended["numas"] = [numa] + + return extended + + @staticmethod + def _process_flavor_params( + target_flavor: Dict[str, Any], + indata: Dict[str, Any], + vim_info: Dict[str, Any], + target_record_id: str, + **kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + """[summary] + + Args: + target_flavor (Dict[str, Any]): [description] + indata (Dict[str, Any]): [description] + vim_info (Dict[str, Any]): [description] + target_record_id (str): [description] + + Returns: + Dict[str, Any]: [description] + """ + flavor_data = { + "disk": int(target_flavor["storage-gb"]), + "ram": int(target_flavor["memory-mb"]), + "vcpus": int(target_flavor["vcpu-count"]), + } + + target_vdur = {} + for vnf in indata.get("vnf", []): + for vdur in vnf.get("vdur", []): + if vdur.get("ns-flavor-id") == target_flavor["id"]: + target_vdur = vdur + + for storage in target_vdur.get("virtual-storages", []): + if ( + storage.get("type-of-storage") + == "etsi-nfv-descriptors:ephemeral-storage" + ): + flavor_data["ephemeral"] = int(storage.get("size-of-storage", 0)) + elif storage.get("type-of-storage") == "etsi-nfv-descriptors:swap-storage": + flavor_data["swap"] = int(storage.get("size-of-storage", 0)) + + extended = Ns._process_epa_params(target_flavor) + if extended: + flavor_data["extended"] = extended + + extra_dict = {"find_params": {"flavor_data": flavor_data}} + flavor_data_name = flavor_data.copy() + flavor_data_name["name"] = target_flavor["name"] + extra_dict["params"] = {"flavor_data": flavor_data_name} + + return extra_dict + + @staticmethod + def _ip_profile_to_ro( + ip_profile: Dict[str, Any], + ) -> Dict[str, Any]: + """[summary] + + Args: + ip_profile (Dict[str, Any]): [description] + + Returns: + Dict[str, Any]: [description] + """ + if not ip_profile: + return None + + ro_ip_profile = { + "ip_version": "IPv4" + if "v4" in ip_profile.get("ip-version", "ipv4") + else "IPv6", + "subnet_address": ip_profile.get("subnet-address"), + "gateway_address": ip_profile.get("gateway-address"), + "dhcp_enabled": ip_profile.get("dhcp-params", {}).get("enabled", False), + "dhcp_start_address": ip_profile.get("dhcp-params", {}).get( + "start-address", None + ), + "dhcp_count": ip_profile.get("dhcp-params", {}).get("count", None), + } + + if ip_profile.get("dns-server"): + ro_ip_profile["dns_address"] = ";".join( + [v["address"] for v in ip_profile["dns-server"] if v.get("address")] + ) + + if ip_profile.get("security-group"): + ro_ip_profile["security_group"] = ip_profile["security-group"] + + return ro_ip_profile + + @staticmethod + def _process_net_params( + target_vld: Dict[str, Any], + indata: Dict[str, Any], + vim_info: Dict[str, Any], + target_record_id: str, + **kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + """Function to process network parameters. + + Args: + target_vld (Dict[str, Any]): [description] + indata (Dict[str, Any]): [description] + vim_info (Dict[str, Any]): [description] + target_record_id (str): [description] + + Returns: + Dict[str, Any]: [description] + """ + extra_dict = {} + + if vim_info.get("sdn"): + # vnf_preffix = "vnfrs:{}".format(vnfr_id) + # ns_preffix = "nsrs:{}".format(nsr_id) + # remove the ending ".sdn + vld_target_record_id, _, _ = target_record_id.rpartition(".") + extra_dict["params"] = { + k: vim_info[k] + for k in ("sdn-ports", "target_vim", "vlds", "type") + if vim_info.get(k) + } + + # TODO needed to add target_id in the dependency. + if vim_info.get("target_vim"): + extra_dict["depends_on"] = [ + f"{vim_info.get('target_vim')} {vld_target_record_id}" + ] + + return extra_dict + + if vim_info.get("vim_network_name"): + extra_dict["find_params"] = { + "filter_dict": { + "name": vim_info.get("vim_network_name"), + }, + } + elif vim_info.get("vim_network_id"): + extra_dict["find_params"] = { + "filter_dict": { + "id": vim_info.get("vim_network_id"), + }, + } + elif target_vld.get("mgmt-network"): + extra_dict["find_params"] = { + "mgmt": True, + "name": target_vld["id"], + } + else: + # create + extra_dict["params"] = { + "net_name": ( + f"{indata.get('name')[:16]}-{target_vld.get('name', target_vld.get('id'))[:16]}" + ), + "ip_profile": Ns._ip_profile_to_ro(vim_info.get("ip_profile")), + "provider_network_profile": vim_info.get("provider_network"), + } + + if not target_vld.get("underlay"): + extra_dict["params"]["net_type"] = "bridge" + else: + extra_dict["params"]["net_type"] = ( + "ptp" if target_vld.get("type") == "ELINE" else "data" + ) + + return extra_dict + + @staticmethod + def _process_vdu_params( + target_vdu: Dict[str, Any], + indata: Dict[str, Any], + vim_info: Dict[str, Any], + target_record_id: str, + **kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + """Function to process VDU parameters. + + Args: + target_vdu (Dict[str, Any]): [description] + indata (Dict[str, Any]): [description] + vim_info (Dict[str, Any]): [description] + target_record_id (str): [description] + + Returns: + Dict[str, Any]: [description] + """ + vnfr_id = kwargs.get("vnfr_id") + nsr_id = kwargs.get("nsr_id") + vnfr = kwargs.get("vnfr") + vdu2cloud_init = kwargs.get("vdu2cloud_init") + tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id") + logger = kwargs.get("logger") + db = kwargs.get("db") + fs = kwargs.get("fs") + ro_nsr_public_key = kwargs.get("ro_nsr_public_key") + + vnf_preffix = "vnfrs:{}".format(vnfr_id) + ns_preffix = "nsrs:{}".format(nsr_id) + image_text = ns_preffix + ":image." + target_vdu["ns-image-id"] + flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"] + extra_dict = {"depends_on": [image_text, flavor_text]} + net_list = [] + + for iface_index, interface in enumerate(target_vdu["interfaces"]): + if interface.get("ns-vld-id"): + net_text = ns_preffix + ":vld." + interface["ns-vld-id"] + elif interface.get("vnf-vld-id"): + net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] + else: + logger.error( + "Interface {} from vdu {} not connected to any vld".format( + iface_index, target_vdu["vdu-name"] + ) + ) + + continue # interface not connected to any vld + + extra_dict["depends_on"].append(net_text) + + if "port-security-enabled" in interface: + interface["port_security"] = interface.pop("port-security-enabled") + + if "port-security-disable-strategy" in interface: + interface["port_security_disable_strategy"] = interface.pop( + "port-security-disable-strategy" + ) + + net_item = { + x: v + for x, v in interface.items() + if x + in ( + "name", + "vpci", + "port_security", + "port_security_disable_strategy", + "floating_ip", + ) + } + net_item["net_id"] = "TASK-" + net_text + net_item["type"] = "virtual" + + # TODO mac_address: used for SR-IOV ifaces #TODO for other types + # TODO floating_ip: True/False (or it can be None) + if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): + # mark the net create task as type data + if deep_get( + tasks_by_target_record_id, + net_text, + "params", + "net_type", + ): + tasks_by_target_record_id[net_text]["params"]["net_type"] = "data" + + net_item["use"] = "data" + net_item["model"] = interface["type"] + net_item["type"] = interface["type"] + elif ( + interface.get("type") == "OM-MGMT" + or interface.get("mgmt-interface") + or interface.get("mgmt-vnf") + ): + net_item["use"] = "mgmt" + else: + # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"): + net_item["use"] = "bridge" + net_item["model"] = interface.get("type") + + if interface.get("ip-address"): + net_item["ip_address"] = interface["ip-address"] + + if interface.get("mac-address"): + net_item["mac_address"] = interface["mac-address"] + + net_list.append(net_item) + + if interface.get("mgmt-vnf"): + extra_dict["mgmt_vnf_interface"] = iface_index + elif interface.get("mgmt-interface"): + extra_dict["mgmt_vdu_interface"] = iface_index + + # cloud config + cloud_config = {} + + if target_vdu.get("cloud-init"): + if target_vdu["cloud-init"] not in vdu2cloud_init: + vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init( + db=db, + fs=fs, + location=target_vdu["cloud-init"], + ) + + cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]] + cloud_config["user-data"] = Ns._parse_jinja2( + cloud_init_content=cloud_content_, + params=target_vdu.get("additionalParams"), + context=target_vdu["cloud-init"], + ) + + if target_vdu.get("boot-data-drive"): + cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive") + + ssh_keys = [] + + if target_vdu.get("ssh-keys"): + ssh_keys += target_vdu.get("ssh-keys") + + if target_vdu.get("ssh-access-required"): + ssh_keys.append(ro_nsr_public_key) + + if ssh_keys: + cloud_config["key-pairs"] = ssh_keys + + disk_list = None + if target_vdu.get("virtual-storages"): + disk_list = [ + {"size": disk["size-of-storage"]} + for disk in target_vdu["virtual-storages"] + if disk.get("type-of-storage") + == "persistent-storage:persistent-storage" + ] + + extra_dict["params"] = { + "name": "{}-{}-{}-{}".format( + indata["name"][:16], + vnfr["member-vnf-index-ref"][:16], + target_vdu["vdu-name"][:32], + target_vdu.get("count-index") or 0, + ), + "description": target_vdu["vdu-name"], + "start": True, + "image_id": "TASK-" + image_text, + "flavor_id": "TASK-" + flavor_text, + "net_list": net_list, + "cloud_config": cloud_config or None, + "disk_list": disk_list, + "availability_zone_index": None, # TODO + "availability_zone_list": None, # TODO + } + + return extra_dict + def deploy(self, session, indata, version, nsr_id, *args, **kwargs): self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata)) validate_input(indata, deploy_schema) @@ -441,453 +1137,6 @@ class Ns(object): index += 1 - def _create_task( - target_id, - item, - action, - target_record, - target_record_id, - extra_dict=None, - ): - nonlocal task_index - nonlocal action_id - nonlocal nsr_id - - task = { - "target_id": target_id, # it will be removed before pushing at database - "action_id": action_id, - "nsr_id": nsr_id, - "task_id": "{}:{}".format(action_id, task_index), - "status": "SCHEDULED", - "action": action, - "item": item, - "target_record": target_record, - "target_record_id": target_record_id, - } - - if extra_dict: - task.update(extra_dict) # params, find_params, depends_on - - task_index += 1 - - return task - - def _create_ro_task(target_id, task): - nonlocal action_id - nonlocal task_index - nonlocal now - - _id = task["task_id"] - db_ro_task = { - "_id": _id, - "locked_by": None, - "locked_at": 0.0, - "target_id": target_id, - "vim_info": { - "created": False, - "created_items": None, - "vim_id": None, - "vim_name": None, - "vim_status": None, - "vim_details": None, - "refresh_at": None, - }, - "modified_at": now, - "created_at": now, - "to_check_at": now, - "tasks": [task], - } - - return db_ro_task - - def _process_image_params(target_image, vim_info, target_record_id): - find_params = {} - - if target_image.get("image"): - find_params["filter_dict"] = {"name": target_image.get("image")} - - if target_image.get("vim_image_id"): - find_params["filter_dict"] = { - "id": target_image.get("vim_image_id") - } - - if target_image.get("image_checksum"): - find_params["filter_dict"] = { - "checksum": target_image.get("image_checksum") - } - - return {"find_params": find_params} - - def _process_flavor_params(target_flavor, vim_info, target_record_id): - def _get_resource_allocation_params(quota_descriptor): - """ - read the quota_descriptor from vnfd and fetch the resource allocation properties from the - descriptor object - :param quota_descriptor: cpu/mem/vif/disk-io quota descriptor - :return: quota params for limit, reserve, shares from the descriptor object - """ - quota = {} - - if quota_descriptor.get("limit"): - quota["limit"] = int(quota_descriptor["limit"]) - - if quota_descriptor.get("reserve"): - quota["reserve"] = int(quota_descriptor["reserve"]) - - if quota_descriptor.get("shares"): - quota["shares"] = int(quota_descriptor["shares"]) - - return quota - - flavor_data = { - "disk": int(target_flavor["storage-gb"]), - "ram": int(target_flavor["memory-mb"]), - "vcpus": int(target_flavor["vcpu-count"]), - } - numa = {} - extended = {} - - if target_flavor.get("guest-epa"): - extended = {} - epa_vcpu_set = False - - if target_flavor["guest-epa"].get("numa-node-policy"): - numa_node_policy = target_flavor["guest-epa"].get( - "numa-node-policy" - ) - - if numa_node_policy.get("node"): - numa_node = numa_node_policy["node"][0] - - if numa_node.get("num-cores"): - numa["cores"] = numa_node["num-cores"] - epa_vcpu_set = True - - if numa_node.get("paired-threads"): - if numa_node["paired-threads"].get( - "num-paired-threads" - ): - numa["paired-threads"] = int( - numa_node["paired-threads"][ - "num-paired-threads" - ] - ) - epa_vcpu_set = True - - if len( - numa_node["paired-threads"].get("paired-thread-ids") - ): - numa["paired-threads-id"] = [] - - for pair in numa_node["paired-threads"][ - "paired-thread-ids" - ]: - numa["paired-threads-id"].append( - ( - str(pair["thread-a"]), - str(pair["thread-b"]), - ) - ) - - if numa_node.get("num-threads"): - numa["threads"] = int(numa_node["num-threads"]) - epa_vcpu_set = True - - if numa_node.get("memory-mb"): - numa["memory"] = max( - int(numa_node["memory-mb"] / 1024), 1 - ) - - if target_flavor["guest-epa"].get("mempage-size"): - extended["mempage-size"] = target_flavor["guest-epa"].get( - "mempage-size" - ) - - if ( - target_flavor["guest-epa"].get("cpu-pinning-policy") - and not epa_vcpu_set - ): - if ( - target_flavor["guest-epa"]["cpu-pinning-policy"] - == "DEDICATED" - ): - if ( - target_flavor["guest-epa"].get( - "cpu-thread-pinning-policy" - ) - and target_flavor["guest-epa"][ - "cpu-thread-pinning-policy" - ] - != "PREFER" - ): - numa["cores"] = max(flavor_data["vcpus"], 1) - else: - numa["threads"] = max(flavor_data["vcpus"], 1) - - epa_vcpu_set = True - - if target_flavor["guest-epa"].get("cpu-quota") and not epa_vcpu_set: - cpuquota = _get_resource_allocation_params( - target_flavor["guest-epa"].get("cpu-quota") - ) - - if cpuquota: - extended["cpu-quota"] = cpuquota - - if target_flavor["guest-epa"].get("mem-quota"): - vduquota = _get_resource_allocation_params( - target_flavor["guest-epa"].get("mem-quota") - ) - - if vduquota: - extended["mem-quota"] = vduquota - - if target_flavor["guest-epa"].get("disk-io-quota"): - diskioquota = _get_resource_allocation_params( - target_flavor["guest-epa"].get("disk-io-quota") - ) - - if diskioquota: - extended["disk-io-quota"] = diskioquota - - if target_flavor["guest-epa"].get("vif-quota"): - vifquota = _get_resource_allocation_params( - target_flavor["guest-epa"].get("vif-quota") - ) - - if vifquota: - extended["vif-quota"] = vifquota - - if numa: - extended["numas"] = [numa] - - if extended: - flavor_data["extended"] = extended - - extra_dict = {"find_params": {"flavor_data": flavor_data}} - flavor_data_name = flavor_data.copy() - flavor_data_name["name"] = target_flavor["name"] - extra_dict["params"] = {"flavor_data": flavor_data_name} - - return extra_dict - - def _ip_profile_2_ro(ip_profile): - if not ip_profile: - return None - - ro_ip_profile = { - "ip_version": "IPv4" - if "v4" in ip_profile.get("ip-version", "ipv4") - else "IPv6", - "subnet_address": ip_profile.get("subnet-address"), - "gateway_address": ip_profile.get("gateway-address"), - "dhcp_enabled": ip_profile.get("dhcp-params", {}).get( - "enabled", False - ), - "dhcp_start_address": ip_profile.get("dhcp-params", {}).get( - "start-address", None - ), - "dhcp_count": ip_profile.get("dhcp-params", {}).get( - "count", None - ), - } - - if ip_profile.get("dns-server"): - ro_ip_profile["dns_address"] = ";".join( - [v["address"] for v in ip_profile["dns-server"]] - ) - - if ip_profile.get("security-group"): - ro_ip_profile["security_group"] = ip_profile["security-group"] - - return ro_ip_profile - - def _process_net_params(target_vld, vim_info, target_record_id): - nonlocal indata - extra_dict = {} - - if vim_info.get("sdn"): - # vnf_preffix = "vnfrs:{}".format(vnfr_id) - # ns_preffix = "nsrs:{}".format(nsr_id) - # remove the ending ".sdn - vld_target_record_id, _, _ = target_record_id.rpartition(".") - extra_dict["params"] = { - k: vim_info[k] - for k in ("sdn-ports", "target_vim", "vlds", "type") - if vim_info.get(k) - } - - # TODO needed to add target_id in the dependency. - if vim_info.get("target_vim"): - extra_dict["depends_on"] = [ - vim_info.get("target_vim") + " " + vld_target_record_id - ] - - return extra_dict - - if vim_info.get("vim_network_name"): - extra_dict["find_params"] = { - "filter_dict": {"name": vim_info.get("vim_network_name")} - } - elif vim_info.get("vim_network_id"): - extra_dict["find_params"] = { - "filter_dict": {"id": vim_info.get("vim_network_id")} - } - elif target_vld.get("mgmt-network"): - extra_dict["find_params"] = {"mgmt": True, "name": target_vld["id"]} - else: - # create - extra_dict["params"] = { - "net_name": "{}-{}".format( - indata["name"][:16], - target_vld.get("name", target_vld["id"])[:16], - ), - "ip_profile": _ip_profile_2_ro(vim_info.get("ip_profile")), - "provider_network_profile": vim_info.get("provider_network"), - } - - if not target_vld.get("underlay"): - extra_dict["params"]["net_type"] = "bridge" - else: - extra_dict["params"]["net_type"] = ( - "ptp" if target_vld.get("type") == "ELINE" else "data" - ) - - return extra_dict - - def _process_vdu_params(target_vdu, vim_info, target_record_id): - nonlocal vnfr_id - nonlocal nsr_id - nonlocal indata - nonlocal vnfr - nonlocal vdu2cloud_init - nonlocal tasks_by_target_record_id - - vnf_preffix = "vnfrs:{}".format(vnfr_id) - ns_preffix = "nsrs:{}".format(nsr_id) - image_text = ns_preffix + ":image." + target_vdu["ns-image-id"] - flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"] - extra_dict = {"depends_on": [image_text, flavor_text]} - net_list = [] - - for iface_index, interface in enumerate(target_vdu["interfaces"]): - if interface.get("ns-vld-id"): - net_text = ns_preffix + ":vld." + interface["ns-vld-id"] - elif interface.get("vnf-vld-id"): - net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] - else: - self.logger.error( - "Interface {} from vdu {} not connected to any vld".format( - iface_index, target_vdu["vdu-name"] - ) - ) - - continue # interface not connected to any vld - - extra_dict["depends_on"].append(net_text) - net_item = { - x: v - for x, v in interface.items() - if x - in ( - "name", - "vpci", - "port_security", - "port_security_disable_strategy", - "floating_ip", - ) - } - net_item["net_id"] = "TASK-" + net_text - net_item["type"] = "virtual" - - # TODO mac_address: used for SR-IOV ifaces #TODO for other types - # TODO floating_ip: True/False (or it can be None) - if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): - # mark the net create task as type data - if deep_get( - tasks_by_target_record_id, net_text, "params", "net_type" - ): - tasks_by_target_record_id[net_text]["params"][ - "net_type" - ] = "data" - - net_item["use"] = "data" - net_item["model"] = interface["type"] - net_item["type"] = interface["type"] - elif ( - interface.get("type") == "OM-MGMT" - or interface.get("mgmt-interface") - or interface.get("mgmt-vnf") - ): - net_item["use"] = "mgmt" - else: - # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"): - net_item["use"] = "bridge" - net_item["model"] = interface.get("type") - - if interface.get("ip-address"): - net_item["ip_address"] = interface["ip-address"] - - if interface.get("mac-address"): - net_item["mac_address"] = interface["mac-address"] - - net_list.append(net_item) - - if interface.get("mgmt-vnf"): - extra_dict["mgmt_vnf_interface"] = iface_index - elif interface.get("mgmt-interface"): - extra_dict["mgmt_vdu_interface"] = iface_index - - # cloud config - cloud_config = {} - - if target_vdu.get("cloud-init"): - if target_vdu["cloud-init"] not in vdu2cloud_init: - vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init( - target_vdu["cloud-init"] - ) - - cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]] - cloud_config["user-data"] = self._parse_jinja2( - cloud_content_, - target_vdu.get("additionalParams"), - target_vdu["cloud-init"], - ) - - if target_vdu.get("boot-data-drive"): - cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive") - - ssh_keys = [] - - if target_vdu.get("ssh-keys"): - ssh_keys += target_vdu.get("ssh-keys") - - if target_vdu.get("ssh-access-required"): - ssh_keys.append(ro_nsr_public_key) - - if ssh_keys: - cloud_config["key-pairs"] = ssh_keys - - extra_dict["params"] = { - "name": "{}-{}-{}-{}".format( - indata["name"][:16], - vnfr["member-vnf-index-ref"][:16], - target_vdu["vdu-name"][:32], - target_vdu.get("count-index") or 0, - ), - "description": target_vdu["vdu-name"], - "start": True, - "image_id": "TASK-" + image_text, - "flavor_id": "TASK-" + flavor_text, - "net_list": net_list, - "cloud_config": cloud_config or None, - "disk_list": None, # TODO - "availability_zone_index": None, # TODO - "availability_zone_list": None, # TODO - } - - return extra_dict - def _process_items( target_list, existing_list, @@ -899,7 +1148,10 @@ class Ns(object): ): nonlocal db_new_tasks nonlocal tasks_by_target_record_id + nonlocal action_id + nonlocal nsr_id nonlocal task_index + nonlocal indata # ensure all the target_list elements has an "id". If not assign the index as id for target_index, tl in enumerate(target_list): @@ -938,15 +1190,23 @@ class Ns(object): item_ = "sdn_net" target_record_id += ".sdn" - task = _create_task( - target_vim, - item_, - "DELETE", - target_record="{}.{}.vim_info.{}".format( - db_record, item_index, target_vim - ), + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + task = Ns._create_task( + deployment_info=deployment_info, + target_id=target_vim, + item=item_, + action="DELETE", + target_record=f"{db_record}.{item_index}.vim_info.{target_vim}", target_record_id=target_record_id, ) + + task_index = deployment_info.get("task_index") + tasks_by_target_record_id[target_record_id] = task db_new_tasks.append(task) # TODO delete @@ -988,20 +1248,49 @@ class Ns(object): item_ = "sdn_net" target_record_id += ".sdn" + kwargs = {} + if process_params == Ns._process_vdu_params: + kwargs.update( + { + "vnfr_id": vnfr_id, + "nsr_id": nsr_id, + "vnfr": vnfr, + "vdu2cloud_init": vdu2cloud_init, + "tasks_by_target_record_id": tasks_by_target_record_id, + "logger": self.logger, + "db": self.db, + "fs": self.fs, + "ro_nsr_public_key": ro_nsr_public_key, + } + ) + extra_dict = process_params( - target_item, target_viminfo, target_record_id + target_item, + indata, + target_viminfo, + target_record_id, + **kwargs, ) self._assign_vim(target_vim) - task = _create_task( - target_vim, - item_, - "CREATE", - target_record="{}.{}.vim_info.{}".format( - db_record, item_index, target_vim - ), + + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + task = Ns._create_task( + deployment_info=deployment_info, + target_id=target_vim, + item=item_, + action="CREATE", + target_record=f"{db_record}.{item_index}.vim_info.{target_vim}", target_record_id=target_record_id, extra_dict=extra_dict, ) + + task_index = deployment_info.get("task_index") + tasks_by_target_record_id[target_record_id] = task db_new_tasks.append(task) @@ -1012,6 +1301,8 @@ class Ns(object): def _process_action(indata): nonlocal db_new_tasks + nonlocal action_id + nonlocal nsr_id nonlocal task_index nonlocal db_vnfrs nonlocal db_ro_nsr @@ -1067,14 +1358,25 @@ class Ns(object): ], }, } - task = _create_task( - target_vim, - "vdu", - "EXEC", + + deployment_info = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_index": task_index, + } + + task = Ns._create_task( + deployment_info=deployment_info, + target_id=target_vim, + item="vdu", + action="EXEC", target_record=target_record, target_record_id=None, extra_dict=extra_dict, ) + + task_index = deployment_info.get("task_index") + db_new_tasks.append(task) with self.write_lock: @@ -1091,7 +1393,7 @@ class Ns(object): db_update=db_nsr_update, db_path="vld", item="net", - process_params=_process_net_params, + process_params=Ns._process_net_params, ) step = "process NS images" @@ -1102,7 +1404,7 @@ class Ns(object): db_update=db_nsr_update, db_path="image", item="image", - process_params=_process_image_params, + process_params=Ns._process_image_params, ) step = "process NS flavors" @@ -1113,7 +1415,7 @@ class Ns(object): db_update=db_nsr_update, db_path="flavor", item="flavor", - process_params=_process_flavor_params, + process_params=Ns._process_flavor_params, ) # VNF.vld @@ -1136,7 +1438,7 @@ class Ns(object): db_update=db_vnfrs_update[vnfr["_id"]], db_path="vld", item="net", - process_params=_process_net_params, + process_params=Ns._process_net_params, ) target_list = target_vnf.get("vdur") if target_vnf else None @@ -1148,7 +1450,7 @@ class Ns(object): db_update=db_vnfrs_update[vnfr["_id"]], db_path="vdur", item="vdu", - process_params=_process_vdu_params, + process_params=Ns._process_vdu_params, ) for db_task in db_new_tasks: @@ -1181,7 +1483,7 @@ class Ns(object): ): # Create a ro_task step = "Updating database, Creating ro_tasks" - db_ro_task = _create_ro_task(target_id, db_task) + db_ro_task = Ns._create_ro_task(target_id, db_task) nb_ro_tasks += 1 self.db.create("ro_tasks", db_ro_task)