# limitations under the License.
##
-# import yaml
+from http import HTTPStatus
import logging
-from typing import Any, Dict, Tuple
+from random import choice as random_choice
+from threading import Lock
+from time import time
from traceback import format_exc as traceback_format_exc
-from osm_ng_ro.ns_thread import NsWorker, NsWorkerException, deep_get
-from osm_ng_ro.validation import validate_input, deploy_schema
+from typing import Any, Dict, Tuple, Type
+from uuid import uuid4
+
+from cryptography.hazmat.backends import default_backend as crypto_default_backend
+from cryptography.hazmat.primitives import serialization as crypto_serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from jinja2 import (
+ Environment,
+ StrictUndefined,
+ TemplateError,
+ TemplateNotFound,
+ UndefinedError,
+)
from osm_common import (
- dbmongo,
dbmemory,
+ dbmongo,
fslocal,
fsmongo,
- msglocal,
msgkafka,
+ msglocal,
version as common_version,
)
-from osm_common.dbbase import DbException
-from osm_common.fsbase import FsException
+from osm_common.dbbase import DbBase, DbException
+from osm_common.fsbase import FsBase, FsException
from osm_common.msgbase import MsgException
-from http import HTTPStatus
-from uuid import uuid4
-from threading import Lock
-from random import choice as random_choice
-from time import time
-from jinja2 import (
- Environment,
- TemplateError,
- TemplateNotFound,
- StrictUndefined,
- UndefinedError,
-)
-from cryptography.hazmat.primitives import serialization as crypto_serialization
-from cryptography.hazmat.primitives.asymmetric import rsa
-from cryptography.hazmat.backends import default_backend as crypto_default_backend
+from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException
+from osm_ng_ro.validation import deploy_schema, validate_input
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
min_common_version = "0.1.16"
for target_id in vims_to_unload:
self._unload_vim(target_id)
- def _get_cloud_init(self, where):
- """
- Not used as cloud init content is provided in the http body. This method reads cloud init from a file
- :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
- :return:
+ @staticmethod
+ def _get_cloud_init(
+ db: Type[DbBase],
+ fs: Type[FsBase],
+ location: str,
+ ) -> str:
+ """This method reads cloud init from a file.
+
+ Note: Not used as cloud init content is provided in the http body.
+
+ Args:
+ db (Type[DbBase]): [description]
+ fs (Type[FsBase]): [description]
+ location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
+
+ Raises:
+ NsException: [description]
+ NsException: [description]
+
+ Returns:
+ str: [description]
"""
- vnfd_id, _, other = where.partition(":")
+ vnfd_id, _, other = location.partition(":")
_type, _, name = other.partition(":")
- vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ vnfd = db.get_one("vnfds", {"_id": vnfd_id})
if _type == "file":
base_folder = vnfd["_admin"]["storage"]
base_folder["folder"], base_folder["pkg-dir"], name
)
- if not self.fs:
+ if not fs:
raise NsException(
"Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format(
cloud_init_file
)
)
- with self.fs.file_open(cloud_init_file, "r") as ci_file:
+ with fs.file_open(cloud_init_file, "r") as ci_file:
cloud_init_content = ci_file.read()
elif _type == "vdu":
cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"]
else:
- raise NsException("Mismatch descriptor for cloud init: {}".format(where))
+ raise NsException("Mismatch descriptor for cloud init: {}".format(location))
return cloud_init_content
- def _parse_jinja2(self, cloud_init_content, params, context):
+ @staticmethod
+ def _parse_jinja2(
+ cloud_init_content: str,
+ params: Dict[str, Any],
+ context: str,
+ ) -> str:
+ """Function that processes the cloud init to replace Jinja2 encoded parameters.
+
+ Args:
+ cloud_init_content (str): [description]
+ params (Dict[str, Any]): [description]
+ context (str): [description]
+
+ Raises:
+ NsException: [description]
+ NsException: [description]
+
+ Returns:
+ str: [description]
+ """
try:
env = Environment(undefined=StrictUndefined)
template = env.from_string(cloud_init_content)
indata: Dict[str, Any],
vim_info: Dict[str, Any],
target_record_id: str,
+ **kwargs: Dict[str, Any],
) -> Dict[str, Any]:
"""Function to process VDU image parameters.
indata: Dict[str, Any],
vim_info: Dict[str, Any],
target_record_id: str,
+ **kwargs: Dict[str, Any],
) -> Dict[str, Any]:
"""[summary]
indata: Dict[str, Any],
vim_info: Dict[str, Any],
target_record_id: str,
+ **kwargs: Dict[str, Any],
) -> Dict[str, Any]:
"""Function to process network parameters.
return extra_dict
+ @staticmethod
+ def _process_vdu_params(
+ target_vdu: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ **kwargs: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """Function to process VDU parameters.
+
+ Args:
+ target_vdu (Dict[str, Any]): [description]
+ indata (Dict[str, Any]): [description]
+ vim_info (Dict[str, Any]): [description]
+ target_record_id (str): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ vnfr_id = kwargs.get("vnfr_id")
+ nsr_id = kwargs.get("nsr_id")
+ vnfr = kwargs.get("vnfr")
+ vdu2cloud_init = kwargs.get("vdu2cloud_init")
+ tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id")
+ logger = kwargs.get("logger")
+ db = kwargs.get("db")
+ fs = kwargs.get("fs")
+ ro_nsr_public_key = kwargs.get("ro_nsr_public_key")
+
+ vnf_preffix = "vnfrs:{}".format(vnfr_id)
+ ns_preffix = "nsrs:{}".format(nsr_id)
+ image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
+ flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
+ extra_dict = {"depends_on": [image_text, flavor_text]}
+ net_list = []
+
+ for iface_index, interface in enumerate(target_vdu["interfaces"]):
+ if interface.get("ns-vld-id"):
+ net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
+ elif interface.get("vnf-vld-id"):
+ net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
+ else:
+ logger.error(
+ "Interface {} from vdu {} not connected to any vld".format(
+ iface_index, target_vdu["vdu-name"]
+ )
+ )
+
+ continue # interface not connected to any vld
+
+ extra_dict["depends_on"].append(net_text)
+
+ if "port-security-enabled" in interface:
+ interface["port_security"] = interface.pop("port-security-enabled")
+
+ if "port-security-disable-strategy" in interface:
+ interface["port_security_disable_strategy"] = interface.pop(
+ "port-security-disable-strategy"
+ )
+
+ net_item = {
+ x: v
+ for x, v in interface.items()
+ if x
+ in (
+ "name",
+ "vpci",
+ "port_security",
+ "port_security_disable_strategy",
+ "floating_ip",
+ )
+ }
+ net_item["net_id"] = "TASK-" + net_text
+ net_item["type"] = "virtual"
+
+ # TODO mac_address: used for SR-IOV ifaces #TODO for other types
+ # TODO floating_ip: True/False (or it can be None)
+ if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+ # mark the net create task as type data
+ if deep_get(
+ tasks_by_target_record_id,
+ net_text,
+ "params",
+ "net_type",
+ ):
+ tasks_by_target_record_id[net_text]["params"]["net_type"] = "data"
+
+ net_item["use"] = "data"
+ net_item["model"] = interface["type"]
+ net_item["type"] = interface["type"]
+ elif (
+ interface.get("type") == "OM-MGMT"
+ or interface.get("mgmt-interface")
+ or interface.get("mgmt-vnf")
+ ):
+ net_item["use"] = "mgmt"
+ else:
+ # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
+ net_item["use"] = "bridge"
+ net_item["model"] = interface.get("type")
+
+ if interface.get("ip-address"):
+ net_item["ip_address"] = interface["ip-address"]
+
+ if interface.get("mac-address"):
+ net_item["mac_address"] = interface["mac-address"]
+
+ net_list.append(net_item)
+
+ if interface.get("mgmt-vnf"):
+ extra_dict["mgmt_vnf_interface"] = iface_index
+ elif interface.get("mgmt-interface"):
+ extra_dict["mgmt_vdu_interface"] = iface_index
+
+ # cloud config
+ cloud_config = {}
+
+ if target_vdu.get("cloud-init"):
+ if target_vdu["cloud-init"] not in vdu2cloud_init:
+ vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init(
+ db=db,
+ fs=fs,
+ location=target_vdu["cloud-init"],
+ )
+
+ cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
+ cloud_config["user-data"] = Ns._parse_jinja2(
+ cloud_init_content=cloud_content_,
+ params=target_vdu.get("additionalParams"),
+ context=target_vdu["cloud-init"],
+ )
+
+ if target_vdu.get("boot-data-drive"):
+ cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
+
+ ssh_keys = []
+
+ if target_vdu.get("ssh-keys"):
+ ssh_keys += target_vdu.get("ssh-keys")
+
+ if target_vdu.get("ssh-access-required"):
+ ssh_keys.append(ro_nsr_public_key)
+
+ if ssh_keys:
+ cloud_config["key-pairs"] = ssh_keys
+
+ disk_list = None
+ if target_vdu.get("virtual-storages"):
+ disk_list = [
+ {"size": disk["size-of-storage"]}
+ for disk in target_vdu["virtual-storages"]
+ if disk.get("type-of-storage")
+ == "persistent-storage:persistent-storage"
+ ]
+
+ extra_dict["params"] = {
+ "name": "{}-{}-{}-{}".format(
+ indata["name"][:16],
+ vnfr["member-vnf-index-ref"][:16],
+ target_vdu["vdu-name"][:32],
+ target_vdu.get("count-index") or 0,
+ ),
+ "description": target_vdu["vdu-name"],
+ "start": True,
+ "image_id": "TASK-" + image_text,
+ "flavor_id": "TASK-" + flavor_text,
+ "net_list": net_list,
+ "cloud_config": cloud_config or None,
+ "disk_list": disk_list,
+ "availability_zone_index": None, # TODO
+ "availability_zone_list": None, # TODO
+ }
+
+ return extra_dict
+
def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
validate_input(indata, deploy_schema)
index += 1
- def _process_vdu_params(target_vdu, indata, vim_info, target_record_id):
- nonlocal vnfr_id
- nonlocal nsr_id
- nonlocal vnfr
- nonlocal vdu2cloud_init
- nonlocal tasks_by_target_record_id
-
- vnf_preffix = "vnfrs:{}".format(vnfr_id)
- ns_preffix = "nsrs:{}".format(nsr_id)
- image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
- flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
- extra_dict = {"depends_on": [image_text, flavor_text]}
- net_list = []
-
- for iface_index, interface in enumerate(target_vdu["interfaces"]):
- if interface.get("ns-vld-id"):
- net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
- elif interface.get("vnf-vld-id"):
- net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
- else:
- self.logger.error(
- "Interface {} from vdu {} not connected to any vld".format(
- iface_index, target_vdu["vdu-name"]
- )
- )
-
- continue # interface not connected to any vld
-
- extra_dict["depends_on"].append(net_text)
-
- if "port-security-enabled" in interface:
- interface["port_security"] = interface.pop(
- "port-security-enabled"
- )
-
- if "port-security-disable-strategy" in interface:
- interface["port_security_disable_strategy"] = interface.pop(
- "port-security-disable-strategy"
- )
-
- net_item = {
- x: v
- for x, v in interface.items()
- if x
- in (
- "name",
- "vpci",
- "port_security",
- "port_security_disable_strategy",
- "floating_ip",
- )
- }
- net_item["net_id"] = "TASK-" + net_text
- net_item["type"] = "virtual"
-
- # TODO mac_address: used for SR-IOV ifaces #TODO for other types
- # TODO floating_ip: True/False (or it can be None)
- if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
- # mark the net create task as type data
- if deep_get(
- tasks_by_target_record_id, net_text, "params", "net_type"
- ):
- tasks_by_target_record_id[net_text]["params"][
- "net_type"
- ] = "data"
-
- net_item["use"] = "data"
- net_item["model"] = interface["type"]
- net_item["type"] = interface["type"]
- elif (
- interface.get("type") == "OM-MGMT"
- or interface.get("mgmt-interface")
- or interface.get("mgmt-vnf")
- ):
- net_item["use"] = "mgmt"
- else:
- # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
- net_item["use"] = "bridge"
- net_item["model"] = interface.get("type")
-
- if interface.get("ip-address"):
- net_item["ip_address"] = interface["ip-address"]
-
- if interface.get("mac-address"):
- net_item["mac_address"] = interface["mac-address"]
-
- net_list.append(net_item)
-
- if interface.get("mgmt-vnf"):
- extra_dict["mgmt_vnf_interface"] = iface_index
- elif interface.get("mgmt-interface"):
- extra_dict["mgmt_vdu_interface"] = iface_index
-
- # cloud config
- cloud_config = {}
-
- if target_vdu.get("cloud-init"):
- if target_vdu["cloud-init"] not in vdu2cloud_init:
- vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init(
- target_vdu["cloud-init"]
- )
-
- cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
- cloud_config["user-data"] = self._parse_jinja2(
- cloud_content_,
- target_vdu.get("additionalParams"),
- target_vdu["cloud-init"],
- )
-
- if target_vdu.get("boot-data-drive"):
- cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
-
- ssh_keys = []
-
- if target_vdu.get("ssh-keys"):
- ssh_keys += target_vdu.get("ssh-keys")
-
- if target_vdu.get("ssh-access-required"):
- ssh_keys.append(ro_nsr_public_key)
-
- if ssh_keys:
- cloud_config["key-pairs"] = ssh_keys
-
- disk_list = None
- if target_vdu.get("virtual-storages"):
- disk_list = [
- {"size": disk["size-of-storage"]}
- for disk in target_vdu["virtual-storages"]
- if disk.get("type-of-storage")
- == "persistent-storage:persistent-storage"
- ]
-
- extra_dict["params"] = {
- "name": "{}-{}-{}-{}".format(
- indata["name"][:16],
- vnfr["member-vnf-index-ref"][:16],
- target_vdu["vdu-name"][:32],
- target_vdu.get("count-index") or 0,
- ),
- "description": target_vdu["vdu-name"],
- "start": True,
- "image_id": "TASK-" + image_text,
- "flavor_id": "TASK-" + flavor_text,
- "net_list": net_list,
- "cloud_config": cloud_config or None,
- "disk_list": disk_list,
- "availability_zone_index": None, # TODO
- "availability_zone_list": None, # TODO
- }
-
- return extra_dict
-
def _process_items(
target_list,
existing_list,
item_ = "sdn_net"
target_record_id += ".sdn"
+ kwargs = {}
+ if process_params == Ns._process_vdu_params:
+ kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "vnfr": vnfr,
+ "vdu2cloud_init": vdu2cloud_init,
+ "tasks_by_target_record_id": tasks_by_target_record_id,
+ "logger": self.logger,
+ "db": self.db,
+ "fs": self.fs,
+ "ro_nsr_public_key": ro_nsr_public_key,
+ }
+ )
+
extra_dict = process_params(
- target_item, indata, target_viminfo, target_record_id
+ target_item,
+ indata,
+ target_viminfo,
+ target_record_id,
+ **kwargs,
)
self._assign_vim(target_vim)
db_update=db_vnfrs_update[vnfr["_id"]],
db_path="vdur",
item="vdu",
- process_params=_process_vdu_params,
+ process_params=Ns._process_vdu_params,
)
for db_task in db_new_tasks: