k8srepo_edit_schema,
vca_new_schema,
vca_edit_schema,
- paas_new_schema,
- paas_edit_schema,
osmrepo_new_schema,
osmrepo_edit_schema,
validate_input,
"""
super().format_on_new(content, project_id=project_id, make_public=make_public)
content["schema_version"] = schema_version = "1.11"
- self._encrypt_password(content, schema_version)
- self._encrypt_config_fields(content, schema_version)
- content["_admin"]["operationalState"] = "PROCESSING"
- self._insert_create_operation(content)
- return "{}:0".format(content["_id"])
- def _encrypt_password(self, content, schema_version):
+ # encrypt passwords
if content.get(self.password_to_encrypt):
content[self.password_to_encrypt] = self.db.encrypt(
content[self.password_to_encrypt],
schema_version=schema_version,
salt=content["_id"],
)
-
- def _encrypt_config_fields(self, content, schema_version):
config_to_encrypt_keys = self.config_to_encrypt.get(
schema_version
) or self.config_to_encrypt.get("default")
salt=content["_id"],
)
- def _insert_create_operation(self, content):
+ content["_admin"]["operationalState"] = "PROCESSING"
+
# create operation
content["_admin"]["operations"] = [self._create_operation("create")]
content["_admin"]["current_operation"] = None
if content.get("vim_type"):
if content["vim_type"] == "openstack":
compute = {
- "ram": {"total": None, "used": None},
- "vcpus": {"total": None, "used": None},
- "instances": {"total": None, "used": None},
+ "ram": {
+ "total": None,
+ "used": None
+ },
+ "vcpus": {
+ "total": None,
+ "used": None
+ },
+ "instances": {
+ "total": None,
+ "used": None
+ }
}
storage = {
- "volumes": {"total": None, "used": None},
- "snapshots": {"total": None, "used": None},
- "storage": {"total": None, "used": None},
+ "volumes": {
+ "total": None,
+ "used": None
+ },
+ "snapshots": {
+ "total": None,
+ "used": None
+ },
+ "storage": {
+ "total": None,
+ "used": None
+ }
}
network = {
- "networks": {"total": None, "used": None},
- "subnets": {"total": None, "used": None},
- "floating_ips": {"total": None, "used": None},
- }
- content["resources"] = {
- "compute": compute,
- "storage": storage,
- "network": network,
+ "networks": {
+ "total": None,
+ "used": None
+ },
+ "subnets": {
+ "total": None,
+ "used": None
+ },
+ "floating_ips": {
+ "total": None,
+ "used": None
+ }
}
+ content["resources"] = {"compute": compute, "storage": storage, "network": network}
+
+ return "{}:0".format(content["_id"])
def delete(self, session, _id, dry_run=False, not_send_msg=None):
"""
super().check_conflict_on_del(session, _id, db_content)
-class PaasTopic(CommonVimWimSdn):
- topic = "paas"
- topic_msg = "paas"
- schema_new = paas_new_schema
- schema_edit = paas_edit_schema
- multiproject = True
- password_to_encrypt = "secret"
- config_to_encrypt = {}
-
- def format_on_edit(self, final_content, edit_content):
- oid = super().format_on_edit(final_content, edit_content)
- final_content["_admin"]["operationalState"] = "PROCESSING"
- final_content["_admin"]["detailed-status"] = "Editing"
- return oid
-
- def _check_if_used_by_ns(self):
- pass
-
- def check_conflict_on_del(self, session, _id, db_content):
- """
- Check if deletion can be done because of dependencies if it is not force.
- :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param _id: internal _id
- :param db_content: The database content of this item _id
- :return: None if ok or raises EngineException with the conflict
- """
- if session["force"]:
- return
- self._check_if_used_by_ns()
-
- super().check_conflict_on_del(session, _id, db_content)
-
-
class K8sRepoTopic(CommonVimWimSdn):
topic = "k8srepos"
topic_msg = "k8srepo"
if to_add["project"] in (
mapping["project"],
mapping["project_name"],
- ) and to_add["role"] in (mapping["role"], mapping["role_name"]):
+ ) and to_add["role"] in (
+ mapping["role"],
+ mapping["role_name"],
+ ):
if mapping in mappings_to_remove: # do not remove
mappings_to_remove.remove(mapping)
if to_set["project"] in (
mapping["project"],
mapping["project_name"],
- ) and to_set["role"] in (mapping["role"], mapping["role_name"]):
+ ) and to_set["role"] in (
+ mapping["role"],
+ mapping["role_name"],
+ ):
if mapping in mappings_to_remove: # do not remove
mappings_to_remove.remove(mapping)
break # do not add, it is already at user
if to_set["project"] in (
mapping["project"],
mapping["project_name"],
- ) and to_set["role"] in (mapping["role"], mapping["role_name"]):
+ ) and to_set["role"] in (
+ mapping["role"],
+ mapping["role_name"],
+ ):
break
else:
# delete
from osm_nbi.base_topic import EngineException, versiontuple
from osm_nbi.admin_topics import VimAccountTopic, WimAccountTopic, SdnTopic
from osm_nbi.admin_topics import K8sClusterTopic, K8sRepoTopic, OsmRepoTopic
-from osm_nbi.admin_topics import VcaTopic, PaasTopic
+from osm_nbi.admin_topics import VcaTopic
from osm_nbi.admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth
from osm_nbi.descriptor_topics import (
VnfdTopic,
"sdns": SdnTopic,
"k8sclusters": K8sClusterTopic,
"vca": VcaTopic,
- "paas": PaasTopic,
"k8srepos": K8sRepoTopic,
"osmrepos": OsmRepoTopic,
"users": UserTopicAuth, # Valid for both internal and keystone authentication backends
:return: The list, it can be empty if no one match the filter_q.
"""
if topic not in self.map_topic:
- raise EngineException(
- "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
- )
+ raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
return self.map_topic[topic].list(session, filter_q, api_req)
def get_item(self, session, topic, _id, filter_q=None, api_req=False):
:return: dictionary, raise exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException(
- "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
- )
+ raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
return self.map_topic[topic].show(session, _id, filter_q, api_req)
def get_file(self, session, topic, _id, path=None, accept_header=None):
"<rw_mgmt_ip>",
"<VDU_SCALE_INFO>",
"<ns_config_info>",
+ "<OSM>"
):
continue
if (
return ns_k8s_namespace
- def _add_flavor_to_nsr(
- self, vdu, vnfd, nsr_descriptor, member_vnf_index, revision=None
- ):
+ def _add_flavor_to_nsr(self, vdu, vnfd, nsr_descriptor, member_vnf_index, revision=None):
flavor_data = {}
guest_epa = {}
# Find this vdu compute and storage descriptors
if vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]:
vdu_virtual_storage = vsd
# Get this vdu vcpus, memory and storage info for flavor_data
- if vdu_virtual_compute.get("virtual-cpu", {}).get("num-virtual-cpu"):
+ if vdu_virtual_compute.get("virtual-cpu", {}).get(
+ "num-virtual-cpu"
+ ):
flavor_data["vcpu-count"] = vdu_virtual_compute["virtual-cpu"][
"num-virtual-cpu"
]
if vdu_virtual_compute.get("virtual-memory", {}).get("size"):
flavor_data["memory-mb"] = (
- float(vdu_virtual_compute["virtual-memory"]["size"]) * 1024.0
+ float(vdu_virtual_compute["virtual-memory"]["size"])
+ * 1024.0
)
if vdu_virtual_storage.get("size-of-storage"):
- flavor_data["storage-gb"] = vdu_virtual_storage["size-of-storage"]
+ flavor_data["storage-gb"] = vdu_virtual_storage[
+ "size-of-storage"
+ ]
# Get this vdu EPA info for guest_epa
if vdu_virtual_compute.get("virtual-cpu", {}).get("cpu-quota"):
- guest_epa["cpu-quota"] = vdu_virtual_compute["virtual-cpu"]["cpu-quota"]
+ guest_epa["cpu-quota"] = vdu_virtual_compute["virtual-cpu"][
+ "cpu-quota"
+ ]
if vdu_virtual_compute.get("virtual-cpu", {}).get("pinning"):
vcpu_pinning = vdu_virtual_compute["virtual-cpu"]["pinning"]
if vcpu_pinning.get("thread-policy"):
- guest_epa["cpu-thread-pinning-policy"] = vcpu_pinning["thread-policy"]
+ guest_epa["cpu-thread-pinning-policy"] = vcpu_pinning[
+ "thread-policy"
+ ]
if vcpu_pinning.get("policy"):
cpu_policy = (
- "SHARED" if vcpu_pinning["policy"] == "dynamic" else "DEDICATED"
+ "SHARED"
+ if vcpu_pinning["policy"] == "dynamic"
+ else "DEDICATED"
)
guest_epa["cpu-pinning-policy"] = cpu_policy
if vdu_virtual_compute.get("virtual-memory", {}).get("mem-quota"):
- guest_epa["mem-quota"] = vdu_virtual_compute["virtual-memory"]["mem-quota"]
- if vdu_virtual_compute.get("virtual-memory", {}).get("mempage-size"):
- guest_epa["mempage-size"] = vdu_virtual_compute["virtual-memory"][
- "mempage-size"
- ]
- if vdu_virtual_compute.get("virtual-memory", {}).get("numa-node-policy"):
- guest_epa["numa-node-policy"] = vdu_virtual_compute["virtual-memory"][
- "numa-node-policy"
+ guest_epa["mem-quota"] = vdu_virtual_compute["virtual-memory"][
+ "mem-quota"
]
+ if vdu_virtual_compute.get("virtual-memory", {}).get(
+ "mempage-size"
+ ):
+ guest_epa["mempage-size"] = vdu_virtual_compute[
+ "virtual-memory"
+ ]["mempage-size"]
+ if vdu_virtual_compute.get("virtual-memory", {}).get(
+ "numa-node-policy"
+ ):
+ guest_epa["numa-node-policy"] = vdu_virtual_compute[
+ "virtual-memory"
+ ]["numa-node-policy"]
if vdu_virtual_storage.get("disk-io-quota"):
- guest_epa["disk-io-quota"] = vdu_virtual_storage["disk-io-quota"]
+ guest_epa["disk-io-quota"] = vdu_virtual_storage[
+ "disk-io-quota"
+ ]
if guest_epa:
flavor_data["guest-epa"] = guest_epa
revision = revision if revision is not None else 1
- flavor_data["name"] = (
- vdu["id"][:56] + "-" + member_vnf_index + "-" + str(revision) + "-flv"
- )
+ flavor_data["name"] = vdu["id"][:56] + "-" + member_vnf_index + "-" + str(revision) + "-flv"
flavor_data["id"] = str(len(nsr_descriptor["flavor"]))
nsr_descriptor["flavor"].append(flavor_data)
"configurationStatus": None,
"vcaStatus": None,
"nsd": {k: v for k, v in nsd.items()},
- "vimdatacenter": ns_request.get("vimAccountId"),
- "paasdatacenter": ns_request.get("paasAccountId"),
+ "datacenter": ns_request["vimAccountId"],
"resource-orchestrator": "osmopenmano",
"description": ns_request.get("nsDescription", ""),
"constituent-vnfr-ref": [],
"vnfd-ref": vnfd_id,
"vnfd-id": vnfd["_id"], # not at OSM model, but useful
"vim-account-id": None,
- "paas-account-id": None,
"vca-id": None,
"vdur": [],
"connection-point": [],
if "revision" in vnfd:
vnfr_descriptor["revision"] = vnfd["revision"]
+
vnf_k8s_namespace = ns_k8s_namespace
if vnf_params:
if vnf_params.get("k8s-namespace"):
try:
vdu_virtual_storage_descriptors = utils.filter_in_list(
vnfd.get("virtual-storage-desc", []),
- lambda stg_desc: stg_desc["id"] in vdu["virtual-storage-desc"],
+ lambda stg_desc: stg_desc["id"] in vdu["virtual-storage-desc"]
)
except Exception:
vdu_virtual_storage_descriptors = []
"interfaces": [],
"additionalParams": additional_params,
"vdu-name": vdu["name"],
- "virtual-storages": vdu_virtual_storage_descriptors,
+ "virtual-storages": vdu_virtual_storage_descriptors
}
if vdu_params and vdu_params.get("config-units"):
vdur["config-units"] = vdu_params["config-units"]
vdur["alt-image-ids"] = alt_image_ids
revision = revision if revision is not None else 1
- flavor_data_name = (
- vdu["id"][:56] + "-" + vnf_index + "-" + str(revision) + "-flv"
- )
+ flavor_data_name = vdu["id"][:56] + "-" + vnf_index + "-" + str(revision) + "-flv"
nsr_flavor_desc = utils.find_in_list(
nsr_descriptor["flavor"],
lambda flavor: flavor["name"] == flavor_data_name,
:param filter_q: dict: query parameter containing vcaStatus-refresh as true or false
:return: None
"""
- time_now, time_delta = (
- time(),
- time() - ns_instance_content["_admin"]["modified"],
- )
- force_refresh = (
- isinstance(filter_q, dict) and filter_q.get("vcaStatusRefresh") == "true"
- )
+ time_now, time_delta = time(), time() - ns_instance_content["_admin"]["modified"]
+ force_refresh = isinstance(filter_q, dict) and filter_q.get('vcaStatusRefresh') == 'true'
threshold_reached = time_delta > 120
if force_refresh or threshold_reached:
operation, _id = "vca_status_refresh", ns_instance_content["_id"]
ns_instance_content["_admin"]["modified"] = time_now
self.db.set_one(self.topic, {"_id": _id}, ns_instance_content)
nslcmop_desc = NsLcmOpTopic._create_nslcmop(_id, operation, None)
- self.format_on_new(
- nslcmop_desc, session["project_id"], make_public=session["public"]
- )
+ self.format_on_new(nslcmop_desc, session["project_id"], make_public=session["public"])
nslcmop_desc["_admin"].pop("nsState")
self.msg.write("ns", operation, nslcmop_desc)
return
def _check_heal_ns_operation(self, indata, nsr):
return
- def _validate_vnf_in_indata(
- self,
- indata: dict,
- vim_accounts: dict = None,
- paas_accounts: dict = None,
- nsr: dict = None,
- session: dict = None,
- ):
- """Validate the VNF parameters in indata.
-
- Args:
- indata (dict): Input data dictionary
- vim_accounts (dict): VIM accounts dictionary
- paas_accounts (dict) PaaS accounts dictionary
- nsr (dict): Network service record dictionary
- session (dict): Dictionary contains "username", "admin", "force", "public",
- "project_id", "set_project"
- """
- # Map between vnf_member_index to vnf descriptor.
- vnf_member_index_to_vnfd = {}
-
+ def _check_instantiate_ns_operation(self, indata, nsr, session):
+ vnf_member_index_to_vnfd = {} # map between vnf_member_index to vnf descriptor.
+ vim_accounts = []
+ wim_accounts = []
+ nsd = nsr["nsd"]
+ self._check_valid_vim_account(indata["vimAccountId"], vim_accounts, session)
+ self._check_valid_wim_account(indata.get("wimAccountId"), wim_accounts, session)
for in_vnf in get_iterable(indata.get("vnf")):
member_vnf_index = in_vnf["member-vnf-index"]
-
if vnf_member_index_to_vnfd.get(member_vnf_index):
vnfd = vnf_member_index_to_vnfd[member_vnf_index]
-
else:
vnfd = self._get_vnfd_from_vnf_member_index(
member_vnf_index, nsr["_id"]
vnf_member_index_to_vnfd[
member_vnf_index
] = vnfd # add to cache, avoiding a later look for
-
self._check_vnf_instantiation_params(in_vnf, vnfd)
-
if in_vnf.get("vimAccountId"):
self._check_valid_vim_account(
in_vnf["vimAccountId"], vim_accounts, session
)
- elif in_vnf.get("paasAccountId"):
- self._check_valid_paas_account(
- in_vnf["paasAccountId"], paas_accounts, session
- )
-
- def _validate_vld_in_indata(
- self,
- indata: dict,
- session: dict = None,
- nsd: dict = None,
- wim_accounts: dict = None,
- ):
- """Validate the Virtual Link Descriptor parameters in indata.
-
- Args:
- indata (dict): Input data dictionary
- session (dict): Dictionary contains "username", "admin", "force", "public",
- "project_id", "set_project"
- nsd (dict): Network service descriptor dictionary
- wim_accounts (dict): WIM accounts dictionary
-
- Raises:
- EngineException
- """
for in_vld in get_iterable(indata.get("vld")):
self._check_valid_wim_account(
in_vld.get("wimAccountId"), wim_accounts, session
)
)
- def _check_instantiate_ns_operation(
- self, indata: dict, nsr: dict, session: dict
- ) -> None:
- """Validate the parameters for NS instantiate operation
-
- Args:
- indata (dict): Input data dictionary
- nsr (dict): Network service record dictionary
- session (dict): Dictionary contains "username", "admin", "force", "public", "project_id", "set_project"
- """
- vim_accounts, wim_accounts, paas_accounts = [], [], []
- nsd = nsr["nsd"]
- self._check_valid_vim_account(indata.get("vimAccountId"), vim_accounts, session)
- self._check_valid_wim_account(indata.get("wimAccountId"), wim_accounts, session)
- self._check_valid_paas_account(indata.get("paasAccountId"), paas_accounts, session)
-
- self._validate_vnf_in_indata(
- indata,
- vim_accounts=vim_accounts,
- paas_accounts=paas_accounts,
- nsr=nsr,
- session=session,
- )
- self._validate_vld_in_indata(
- indata, session=session, nsd=nsd, wim_accounts=wim_accounts
- )
-
def _get_vnfd_from_vnf_member_index(self, member_vnf_index, nsr_id):
# Obtain vnf descriptor. The vnfr is used to get the vnfd._id used for this member_vnf_index
vnfr = self.db.get_one(
"nsd:constituent-vnfd".format(member_vnf_index)
)
- # Backwards compatibility: if there is no revision, get it from the one and only VNFD entry
+ ## Backwards compatibility: if there is no revision, get it from the one and only VNFD entry
if "revision" in vnfr:
vnfd_revision = vnfr["vnfd-id"] + ":" + str(vnfr["revision"])
- vnfd = self.db.get_one(
- "vnfds_revisions", {"_id": vnfd_revision}, fail_on_empty=False
- )
+ vnfd = self.db.get_one("vnfds_revisions", {"_id": vnfd_revision}, fail_on_empty=False)
else:
- vnfd = self.db.get_one(
- "vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False
- )
+ vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False)
if not vnfd:
raise EngineException(
)
)
- def _check_valid_vim_account(
- self, vim_account: str, vim_accounts: list, session: dict
- ) -> list:
- """Validate the given VIM account whether present in the project or not,
- add the given vim_account to vim_accounts list.
-
- Args:
- vim_account (str): VIM Account to be used
- vim_accounts (list): List of VIM accounts
- session (dict): Contains "username", "admin", "force", "public", "project_id", "set_project"
-
- Raises:
- EngineException
- """
+ def _check_valid_vim_account(self, vim_account, vim_accounts, session):
+ if vim_account in vim_accounts:
+ return
try:
- if vim_account and vim_account not in vim_accounts:
- db_filter = self._get_project_filter(session)
- db_filter["_id"] = vim_account
- self.db.get_one("vim_accounts", db_filter)
- vim_accounts.append(vim_account)
+ db_filter = self._get_project_filter(session)
+ db_filter["_id"] = vim_account
+ self.db.get_one("vim_accounts", db_filter)
except Exception:
raise EngineException(
"Invalid vimAccountId='{}' not present for the project".format(
vim_account
)
)
-
- def _check_valid_paas_account(
- self, paas_account: str, paas_accounts: list, session: dict
- ) -> list:
- """Validate the given PaaS account whether present in the project or not,
- add the given paas_account to paas_accounts list and return.
-
- Args:
- paas_account (str): PaaS Account to be used
- paas_accounts (list): List of PaaS accounts
- session (dict): Contains "username", "admin", "force", "public", "project_id", "set_project"
-
- Raises:
- EngineException
- """
- try:
- if paas_account and paas_account not in paas_accounts:
- db_filter = self._get_project_filter(session)
- db_filter["_id"] = paas_account
- self.db.get_one("paas", db_filter)
- paas_accounts.append(paas_account)
- except Exception:
- raise EngineException(
- "Invalid paasAccountId='{}' not present for the project".format(
- paas_account
- )
- )
+ vim_accounts.append(vim_account)
def _get_vim_account(self, vim_id: str, session):
try:
return self.db.get_one("vim_accounts", db_filter)
except Exception:
raise EngineException(
- "Invalid vimAccountId='{}' not present for the project".format(vim_id)
+ "Invalid vimAccountId='{}' not present for the project".format(
+ vim_id
+ )
)
- def _check_valid_wim_account(
- self, wim_account: str, wim_accounts: list, session: dict
- ) -> list:
- """Validate the given WIM account whether present in the project or not,
- add the given wim_account to wim_accounts list and return.
-
- Args:
- wim_account (str): WIM Account to be used
- wim_accounts (list): List of WIM accounts
- session (dict): Contains "username", "admin", "force", "public", "project_id", "set_project"
-
- Raises:
- EngineException
- """
+ def _check_valid_wim_account(self, wim_account, wim_accounts, session):
+ if not isinstance(wim_account, str):
+ return
+ if wim_account in wim_accounts:
+ return
try:
- if isinstance(wim_account, str) and wim_account not in wim_accounts:
- db_filter = self._get_project_filter(session)
- db_filter["_id"] = wim_account
- self.db.get_one("wim_accounts", db_filter)
- wim_accounts.append(wim_account)
+ db_filter = self._get_project_filter(session)
+ db_filter["_id"] = wim_account
+ self.db.get_one("wim_accounts", db_filter)
except Exception:
raise EngineException(
"Invalid wimAccountId='{}' not present for the project".format(
wim_account
)
)
+ wim_accounts.append(wim_account)
def _look_for_pdu(
self, session, rollback, vnfr, vim_account, vnfr_update, vnfr_update_rollback
"ns-vld-id": NSD vld where this interface is connected.
NOTE: One, and only one between 'vnf-vld-id' and 'ns-vld-id' contains a value. The other will be None
"""
+
ifaces_forcing_vim_network = []
- if not vim_account:
- return ifaces_forcing_vim_network
for vdur_index, vdur in enumerate(get_iterable(vnfr.get("vdur"))):
if not vdur.get("pdu-type"):
continue
"ns-vld-id": NSD vld where this interface is connected.
NOTE: One, and only one between 'vnf-vld-id' and 'ns-vld-id' contains a value. The other will be None
"""
- ifaces_forcing_vim_network = []
-
- if not vim_account:
- return ifaces_forcing_vim_network
+ ifaces_forcing_vim_network = []
if not vnfr.get("kdur"):
return ifaces_forcing_vim_network
for cpd in vlc.get("constituent-cpd-id", ()):
if cpd.get("ip-address"):
step = "Storing ip-address info"
- vld_fixed_ip_connection_point_data.update(
- {
- vlc.get("virtual-link-profile-id")
- + "."
- + cpd.get("constituent-base-element-id"): {
- "vnfd-connection-point-ref": cpd.get(
- "constituent-cpd-id"
- ),
- "ip-address": cpd.get("ip-address"),
- }
- }
- )
+ vld_fixed_ip_connection_point_data.update({vlc.get("virtual-link-profile-id") + '.' + cpd.get("constituent-base-element-id"): {
+ "vnfd-connection-point-ref": cpd.get(
+ "constituent-cpd-id"),
+ "ip-address": cpd.get(
+ "ip-address")}})
# Inserting ip address to vnfr
if len(vld_fixed_ip_connection_point_data) > 0:
vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
for item in vld_fixed_ip_connection_point_data.keys():
step = "Filtering vnfrs"
- vnfr = next(
- filter(
- lambda vnfr: vnfr["member-vnf-index-ref"]
- == item.split(".")[1],
- vnfrs,
- ),
- None,
- )
+ vnfr = next(filter(lambda vnfr: vnfr["member-vnf-index-ref"] == item.split('.')[1], vnfrs), None)
if vnfr:
vnfr_update = {}
for vdur_index, vdur in enumerate(vnfr["vdur"]):
for iface_index, iface in enumerate(vdur["interfaces"]):
step = "Looking for matched interface"
if (
- iface.get("external-connection-point-ref")
- == vld_fixed_ip_connection_point_data[item].get(
- "vnfd-connection-point-ref"
- )
- and iface.get("ns-vld-id") == item.split(".")[0]
+ iface.get("external-connection-point-ref")
+ == vld_fixed_ip_connection_point_data[item].get("vnfd-connection-point-ref") and
+ iface.get("ns-vld-id") == item.split('.')[0]
+
):
vnfr_update_text = "vdur.{}.interfaces.{}".format(
vdur_index, iface_index
step = "Storing info in order to update vnfr"
vnfr_update[
vnfr_update_text + ".ip-address"
- ] = increment_ip_mac(
- vld_fixed_ip_connection_point_data[item].get(
- "ip-address"
- ),
- vdur.get("count-index", 0),
- )
+ ] = increment_ip_mac(
+ vld_fixed_ip_connection_point_data[item].get("ip-address"),
+ vdur.get("count-index", 0), )
vnfr_update[vnfr_update_text + ".fixed-ip"] = True
step = "updating vnfr at database"
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, vnfr_update)
except (
- ValidationError,
- EngineException,
- DbException,
- MsgException,
- FsException,
+ ValidationError,
+ EngineException,
+ DbException,
+ MsgException,
+ FsException,
) as e:
raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
- @staticmethod
- def _get_vld_icp_from_instantiation_params(
- vnf_inst_params: dict, vnfr: dict, vnfr_update: dict
- ) -> dict:
- """Get vnf.internal-vld.internal-conection-point instantiation params to update vnfr.vdur.interfaces
- Args:
- vnf_inst_params (dict): VNF instantiation parameters dictionary
- vnfr (dict): VNF record dictionary
- vnfr_update (dict): VNF record update dictionary
-
- Returns:
- vnfr_update (dict): VNF record update dictionary
- """
- for ivld_inst_param in get_iterable(vnf_inst_params.get("internal-vld")):
- for icp_inst_param in get_iterable(
- ivld_inst_param.get("internal-connection-point")
- ):
- # Look for iface
- for vdur_index, vdur in enumerate(vnfr["vdur"]):
- for iface_index, iface in enumerate(vdur["interfaces"]):
- if (
- iface.get("internal-connection-point-ref")
- == icp_inst_param["id-ref"]
- ):
- vnfr_update_text = "vdur.{}.interfaces.{}".format(
- vdur_index, iface_index
- )
- if icp_inst_param.get("ip-address"):
- vnfr_update[
- vnfr_update_text + ".ip-address"
- ] = increment_ip_mac(
- icp_inst_param.get("ip-address"),
- vdur.get("count-index", 0),
- )
- vnfr_update[vnfr_update_text + ".fixed-ip"] = True
- if icp_inst_param.get("mac-address"):
- vnfr_update[
- vnfr_update_text + ".mac-address"
- ] = increment_ip_mac(
- icp_inst_param.get("mac-address"),
- vdur.get("count-index", 0),
- )
- vnfr_update[vnfr_update_text + ".fixed-mac"] = True
- break
- return vnfr_update
+ def _update_vnfrs(self, session, rollback, nsr, indata):
+ # get vnfr
+ nsr_id = nsr["_id"]
+ vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
- @staticmethod
- def _get_vdu_interfaces_from_instantiation_params(
- vnfr_update: dict, vnf_inst_params: dict, vnfr: dict
- ) -> dict:
- """Get vnf.vdu.interface instantiation params to update vnfr.vdur.interfaces ip, mac
- Args:
- vnfr_update (dict): VNF record update dictionary
- vnf_inst_params (dict): VNF instantiation parameters dictionary
- vnfr (dict): VNF record dictionary
+ for vnfr in vnfrs:
+ vnfr_update = {}
+ vnfr_update_rollback = {}
+ member_vnf_index = vnfr["member-vnf-index-ref"]
+ # update vim-account-id
- Returns:
- vnfr_update (dict): VNF record update dictionary
- """
- for vdu_inst_param in get_iterable(vnf_inst_params.get("vdu")):
- for vdur_index, vdur in enumerate(vnfr["vdur"]):
- if vdu_inst_param["id"] != vdur["vdu-id-ref"]:
+ vim_account = indata["vimAccountId"]
+ vca_id = self._get_vim_account(vim_account, session).get("vca")
+ # check instantiate parameters
+ for vnf_inst_params in get_iterable(indata.get("vnf")):
+ if vnf_inst_params["member-vnf-index"] != member_vnf_index:
continue
- for iface_inst_param in get_iterable(vdu_inst_param.get("interface")):
- iface_index, _ = next(
- i
- for i in enumerate(vdur["interfaces"])
- if i[1]["name"] == iface_inst_param["name"]
- )
- vnfr_update_text = "vdur.{}.interfaces.{}".format(
- vdur_index, iface_index
- )
- if iface_inst_param.get("ip-address"):
- vnfr_update[
- vnfr_update_text + ".ip-address"
- ] = increment_ip_mac(
- iface_inst_param.get("ip-address"),
- vdur.get("count-index", 0),
- )
- vnfr_update[vnfr_update_text + ".fixed-ip"] = True
- if iface_inst_param.get("mac-address"):
- vnfr_update[
- vnfr_update_text + ".mac-address"
- ] = increment_ip_mac(
- iface_inst_param.get("mac-address"),
- vdur.get("count-index", 0),
- )
- vnfr_update[vnfr_update_text + ".fixed-mac"] = True
- if iface_inst_param.get("floating-ip-required"):
- vnfr_update[vnfr_update_text + ".floating-ip-required"] = True
- return vnfr_update
-
- @staticmethod
- def _get_ip_address_from_instantiation_params(
- indata: dict, member_vnf_index: str, vnfr: dict, vnfr_update: dict
- ) -> dict:
- """Get ip address from instantiation parameters.vld.vnfd-connection-point-ref
- Args:
- indata (dict): Input data dictionary
- member_vnf_index (str): VNF index as an identifier
- vnfr (dict): VNF record dictionary
- vnfr_update (dict): VNF record update dictionary to keep the updates
+ if vnf_inst_params.get("vimAccountId"):
+ vim_account = vnf_inst_params.get("vimAccountId")
+ vca_id = self._get_vim_account(vim_account, session).get("vca")
- Returns:
- vnfr_update (dict): VNF record update dictionary to keep the updates
- """
- for vld_inst_param in get_iterable(indata.get("vld")):
- for vnfcp_inst_param in get_iterable(
- vld_inst_param.get("vnfd-connection-point-ref")
- ):
- if vnfcp_inst_param["member-vnf-index-ref"] != member_vnf_index:
- continue
- # look for iface
- for vdur_index, vdur in enumerate(vnfr["vdur"]):
- for iface_index, iface in enumerate(vdur["interfaces"]):
- if (
- iface.get("external-connection-point-ref")
- == vnfcp_inst_param["vnfd-connection-point-ref"]
+ # get vnf.vdu.interface instantiation params to update vnfr.vdur.interfaces ip, mac
+ for vdu_inst_param in get_iterable(vnf_inst_params.get("vdu")):
+ for vdur_index, vdur in enumerate(vnfr["vdur"]):
+ if vdu_inst_param["id"] != vdur["vdu-id-ref"]:
+ continue
+ for iface_inst_param in get_iterable(
+ vdu_inst_param.get("interface")
):
+ iface_index, _ = next(
+ i
+ for i in enumerate(vdur["interfaces"])
+ if i[1]["name"] == iface_inst_param["name"]
+ )
vnfr_update_text = "vdur.{}.interfaces.{}".format(
vdur_index, iface_index
)
- if vnfcp_inst_param.get("ip-address"):
+ if iface_inst_param.get("ip-address"):
vnfr_update[
vnfr_update_text + ".ip-address"
] = increment_ip_mac(
- vnfcp_inst_param.get("ip-address"),
+ iface_inst_param.get("ip-address"),
vdur.get("count-index", 0),
)
vnfr_update[vnfr_update_text + ".fixed-ip"] = True
- if vnfcp_inst_param.get("mac-address"):
+ if iface_inst_param.get("mac-address"):
vnfr_update[
vnfr_update_text + ".mac-address"
] = increment_ip_mac(
- vnfcp_inst_param.get("mac-address"),
+ iface_inst_param.get("mac-address"),
vdur.get("count-index", 0),
)
vnfr_update[vnfr_update_text + ".fixed-mac"] = True
- break
- return vnfr_update
-
- @staticmethod
- def _update_indata_to_use_concrete_vim_network(
- ifaces_forcing_vim_network: list, indata: dict, member_vnf_index: str
- ) -> dict:
- """Update indata in case pdu forces to use a concrete vim-network-name
- Args:
- ifaces_forcing_vim_network (list): List of interfaces that are connected to an existing VIM network.
- indata (dict): Input data dictionary
- member_vnf_index (str): VNF index as an identifier
- Returns:
- indata (dict): Input data dictionary
- """
- for iface_info in ifaces_forcing_vim_network:
- if iface_info.get("ns-vld-id"):
- if "vld" not in indata:
- indata["vld"] = []
- indata["vld"].append(
- {
- key: iface_info[key]
- for key in ("name", "vim-network-name", "vim-network-id")
- if iface_info.get(key)
- }
- )
-
- elif iface_info.get("vnf-vld-id"):
- if "vnf" not in indata:
- indata["vnf"] = []
- indata["vnf"].append(
- {
- "member-vnf-index": member_vnf_index,
- "internal-vld": [
- {
- key: iface_info[key]
- for key in (
- "name",
- "vim-network-name",
- "vim-network-id",
- )
- if iface_info.get(key)
- }
- ],
- }
- )
- return indata
-
- def _update_vnfrs(
- self, session: dict, rollback: list, nsr: dict, indata: dict
- ) -> None:
- """Update VNF record using indata.
- Args:
- session (dict): Contains "username", "admin", "force", "public", "project_id", "set_project"
- rollback (list): List with the database modifications to rollback if needed
- nsr (dict): NS record to get the related VNF record.
- indata (dict): Input data dictionary
-
- Returns:
- None
-
- Raises:
- None
- """
- # Get vnfr
- nsr_id = nsr["_id"]
- vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
-
- for vnfr in vnfrs:
- vnfr_update = {}
- vnfr_update_rollback = {}
- member_vnf_index = vnfr["member-vnf-index-ref"]
-
- # Update vim-account-id
- vim_account = indata.get("vimAccountId")
- paas_account = indata.get("paasAccountId")
- vca_id = None
- if vim_account:
- vca_id = self._get_vim_account(vim_account, session).get("vca")
-
- # Check instantiate parameters
- for vnf_inst_params in get_iterable(indata.get("vnf")):
- if vnf_inst_params["member-vnf-index"] != member_vnf_index:
- continue
- if vnf_inst_params.get("vimAccountId"):
- vim_account = vnf_inst_params["vimAccountId"]
- vca_id = self._get_vim_account(vim_account, session).get("vca")
- elif vnf_inst_params.get("paasAccountId"):
- paas_account = vnf_inst_params["paasAccountId"]
-
- # Get vnf.vdu.interface instantiation params to update vnfr.vdur.interfaces ip, mac
- vnfr_update = (
- NsLcmOpTopic._get_vdu_interfaces_from_instantiation_params(
- vnfr_update, vnf_inst_params, vnfr
- )
- )
-
- # Get vnf.internal-vld.internal-conection-point instantiation params to update vnfr.vdur.interfaces
+ if iface_inst_param.get("floating-ip-required"):
+ vnfr_update[
+ vnfr_update_text + ".floating-ip-required"
+ ] = True
+ # get vnf.internal-vld.internal-conection-point instantiation params to update vnfr.vdur.interfaces
# TODO update vld with the ip-profile
- vnfr_update = NsLcmOpTopic._get_vld_icp_from_instantiation_params(
- vnf_inst_params, vnfr, vnfr_update
- )
-
- # Get ip address from instantiation parameters.vld.vnfd-connection-point-ref
- vnfr_update = NsLcmOpTopic._get_ip_address_from_instantiation_params(
- indata, member_vnf_index, vnfr, vnfr_update
- )
+ for ivld_inst_param in get_iterable(
+ vnf_inst_params.get("internal-vld")
+ ):
+ for icp_inst_param in get_iterable(
+ ivld_inst_param.get("internal-connection-point")
+ ):
+ # look for iface
+ for vdur_index, vdur in enumerate(vnfr["vdur"]):
+ for iface_index, iface in enumerate(vdur["interfaces"]):
+ if (
+ iface.get("internal-connection-point-ref")
+ == icp_inst_param["id-ref"]
+ ):
+ vnfr_update_text = "vdur.{}.interfaces.{}".format(
+ vdur_index, iface_index
+ )
+ if icp_inst_param.get("ip-address"):
+ vnfr_update[
+ vnfr_update_text + ".ip-address"
+ ] = increment_ip_mac(
+ icp_inst_param.get("ip-address"),
+ vdur.get("count-index", 0),
+ )
+ vnfr_update[
+ vnfr_update_text + ".fixed-ip"
+ ] = True
+ if icp_inst_param.get("mac-address"):
+ vnfr_update[
+ vnfr_update_text + ".mac-address"
+ ] = increment_ip_mac(
+ icp_inst_param.get("mac-address"),
+ vdur.get("count-index", 0),
+ )
+ vnfr_update[
+ vnfr_update_text + ".fixed-mac"
+ ] = True
+ break
+ # get ip address from instantiation parameters.vld.vnfd-connection-point-ref
+ for vld_inst_param in get_iterable(indata.get("vld")):
+ for vnfcp_inst_param in get_iterable(
+ vld_inst_param.get("vnfd-connection-point-ref")
+ ):
+ if vnfcp_inst_param["member-vnf-index-ref"] != member_vnf_index:
+ continue
+ # look for iface
+ for vdur_index, vdur in enumerate(vnfr["vdur"]):
+ for iface_index, iface in enumerate(vdur["interfaces"]):
+ if (
+ iface.get("external-connection-point-ref")
+ == vnfcp_inst_param["vnfd-connection-point-ref"]
+ ):
+ vnfr_update_text = "vdur.{}.interfaces.{}".format(
+ vdur_index, iface_index
+ )
+ if vnfcp_inst_param.get("ip-address"):
+ vnfr_update[
+ vnfr_update_text + ".ip-address"
+ ] = increment_ip_mac(
+ vnfcp_inst_param.get("ip-address"),
+ vdur.get("count-index", 0),
+ )
+ vnfr_update[vnfr_update_text + ".fixed-ip"] = True
+ if vnfcp_inst_param.get("mac-address"):
+ vnfr_update[
+ vnfr_update_text + ".mac-address"
+ ] = increment_ip_mac(
+ vnfcp_inst_param.get("mac-address"),
+ vdur.get("count-index", 0),
+ )
+ vnfr_update[vnfr_update_text + ".fixed-mac"] = True
+ break
- if vim_account:
- vnfr_update["vim-account-id"] = vim_account
- vnfr_update_rollback["vim-account-id"] = vnfr.get("vim-account-id")
- elif paas_account:
- vnfr_update["paas-account-id"] = paas_account
- vnfr_update_rollback["paas-account-id"] = vnfr.get("paas-account-id")
+ vnfr_update["vim-account-id"] = vim_account
+ vnfr_update_rollback["vim-account-id"] = vnfr.get("vim-account-id")
if vca_id:
vnfr_update["vca-id"] = vca_id
vnfr_update_rollback["vca-id"] = vnfr.get("vca-id")
- # Get pdu
+ # get pdu
ifaces_forcing_vim_network = self._look_for_pdu(
session, rollback, vnfr, vim_account, vnfr_update, vnfr_update_rollback
)
- # Get kdus
+ # get kdus
ifaces_forcing_vim_network += self._look_for_k8scluster(
session, rollback, vnfr, vim_account, vnfr_update, vnfr_update_rollback
)
-
- # Update vnfr in database
+ # update database vnfr
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, vnfr_update)
rollback.append(
{
}
)
- # Update indata in case pdu forces to use a concrete vim-network-name
+ # Update indada in case pdu forces to use a concrete vim-network-name
# TODO check if user has already insert a vim-network-name and raises an error
if not ifaces_forcing_vim_network:
continue
- indata = NsLcmOpTopic._update_indata_to_use_concrete_vim_network(
- ifaces_forcing_vim_network, indata, member_vnf_index
- )
+ for iface_info in ifaces_forcing_vim_network:
+ if iface_info.get("ns-vld-id"):
+ if "vld" not in indata:
+ indata["vld"] = []
+ indata["vld"].append(
+ {
+ key: iface_info[key]
+ for key in ("name", "vim-network-name", "vim-network-id")
+ if iface_info.get(key)
+ }
+ )
+
+ elif iface_info.get("vnf-vld-id"):
+ if "vnf" not in indata:
+ indata["vnf"] = []
+ indata["vnf"].append(
+ {
+ "member-vnf-index": member_vnf_index,
+ "internal-vld": [
+ {
+ key: iface_info[key]
+ for key in (
+ "name",
+ "vim-network-name",
+ "vim-network-id",
+ )
+ if iface_info.get(key)
+ }
+ ],
+ }
+ )
@staticmethod
def _create_nslcmop(nsr_id, operation, params):
HTTPStatus.CONFLICT,
)
self._check_ns_operation(session, nsr, operation, indata)
- if indata.get("primitive_params"):
+ if (indata.get("primitive_params")):
indata["primitive_params"] = json.dumps(indata["primitive_params"])
- elif indata.get("additionalParamsForVnf"):
- indata["additionalParamsForVnf"] = json.dumps(
- indata["additionalParamsForVnf"]
- )
+ elif (indata.get("additionalParamsForVnf")):
+ indata["additionalParamsForVnf"] = json.dumps(indata["additionalParamsForVnf"])
if operation == "instantiate":
self._update_vnfrs_from_nsd(nsr)
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
nsd = self.db.get_one("nsds", {"_id": nsr["nsd-id"]})
ns_request = nsr["instantiate_params"]
- vnfr = self.db.get_one(
- "vnfrs", {"_id": indata["changeVnfPackageData"]["vnfInstanceId"]}
- )
+ vnfr = self.db.get_one("vnfrs", {"_id": indata["changeVnfPackageData"]["vnfInstanceId"]})
latest_vnfd_revision = vnfd["_admin"].get("revision", 1)
vnfr_vnfd_revision = vnfr.get("revision", 1)
if latest_vnfd_revision != vnfr_vnfd_revision:
old_vnfd_id = vnfd_id + ":" + str(vnfr_vnfd_revision)
- old_db_vnfd = self.db.get_one(
- "vnfds_revisions", {"_id": old_vnfd_id}
- )
+ old_db_vnfd = self.db.get_one("vnfds_revisions", {"_id": old_vnfd_id})
old_sw_version = old_db_vnfd.get("software-version", "1.0")
new_sw_version = vnfd.get("software-version", "1.0")
if new_sw_version != old_sw_version:
vnf_index = vnfr["member-vnf-index-ref"]
self.logger.info("nsr {}".format(nsr))
for vdu in vnfd["vdu"]:
- self.nsrtopic._add_flavor_to_nsr(
- vdu, vnfd, nsr, vnf_index, latest_vnfd_revision
- )
+ self.nsrtopic._add_flavor_to_nsr(vdu, vnfd, nsr, vnf_index, latest_vnfd_revision)
sw_image_id = vdu.get("sw-image-desc")
if sw_image_id:
- image_data = self.nsrtopic._get_image_data_from_vnfd(
- vnfd, sw_image_id
- )
+ image_data = self.nsrtopic._get_image_data_from_vnfd(vnfd, sw_image_id)
self.nsrtopic._add_image_to_nsr(nsr, image_data)
for alt_image in vdu.get("alternative-sw-image-desc", ()):
- image_data = self.nsrtopic._get_image_data_from_vnfd(
- vnfd, alt_image
- )
+ image_data = self.nsrtopic._get_image_data_from_vnfd(vnfd, alt_image)
self.nsrtopic._add_image_to_nsr(nsr, image_data)
nsr_update["image"] = nsr["image"]
nsr_update["flavor"] = nsr["flavor"]
self.db.set_one("nsrs", {"_id": nsr["_id"]}, nsr_update)
- ns_k8s_namespace = self.nsrtopic._get_ns_k8s_namespace(
- nsd, ns_request, session
- )
- vnfr_descriptor = (
- self.nsrtopic._create_vnfr_descriptor_from_vnfd(
- nsd,
- vnfd,
- vnfd_id,
- vnf_index,
- nsr,
- ns_request,
- ns_k8s_namespace,
- latest_vnfd_revision,
- )
+ ns_k8s_namespace = self.nsrtopic._get_ns_k8s_namespace(nsd, ns_request, session)
+ vnfr_descriptor = self.nsrtopic._create_vnfr_descriptor_from_vnfd(
+ nsd,
+ vnfd,
+ vnfd_id,
+ vnf_index,
+ nsr,
+ ns_request,
+ ns_k8s_namespace,
+ latest_vnfd_revision,
)
indata["newVdur"] = vnfr_descriptor["vdur"]
nslcmop_desc = self._create_nslcmop(nsInstanceId, operation, indata)
/<id> O O O
/k8sclusters O O
/<id> O O O
- /paas O5 O5
- /<id> O5 O5 O5
/k8srepos O O
/<id> O O
/osmrepos O O
"ROLE_PERMISSION": "vca:id:",
},
},
- "paas": {
- "METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "paas:",
- "<ID>": {
- "METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "paas:id:",
- },
- },
"k8srepos": {
"METHODS": ("GET", "POST"),
"ROLE_PERMISSION": "k8srepos:",
},
"verticalscale": {
"METHODS": ("POST",),
- "ROLE_PERMISSION": "ns_instances:id:verticalscale:",
- },
+ "ROLE_PERMISSION": "ns_instances:id:verticalscale:"
+ },
},
},
"ns_lcm_op_occs": {
},
"vnflcm": {
"v1": {
- "vnf_instances": {
- "METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vnflcm_instances:",
- "<ID>": {
- "METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "vnflcm_instances:id:",
- "scale": {
- "METHODS": ("POST",),
- "ROLE_PERMISSION": "vnflcm_instances:id:scale:",
- },
- "terminate": {
- "METHODS": ("POST",),
- "ROLE_PERMISSION": "vnflcm_instances:id:terminate:",
- },
- "instantiate": {
- "METHODS": ("POST",),
- "ROLE_PERMISSION": "vnflcm_instances:id:instantiate:",
- },
- },
- },
- "vnf_lcm_op_occs": {
- "METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:opps:",
- "<ID>": {
- "METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:opps:id:",
- },
- },
- "subscriptions": {
- "METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vnflcm_subscriptions:",
- "<ID>": {
- "METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "vnflcm_subscriptions:id:",
- },
- },
+ "vnf_instances": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnflcm_instances:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "vnflcm_instances:id:",
+ "scale": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:scale:"
+ },
+ "terminate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:terminate:"
+ },
+ "instantiate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:instantiate:"
+ },
+ }
+ },
+ "vnf_lcm_op_occs": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:opps:",
+ "<ID>": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:opps:id:"
+ },
+ },
+ "subscriptions": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:id:"
+ }
+ },
}
},
"nst": {
},
"nsfm": {
"v1": {
- "alarms": {
- "METHODS": ("GET", "PATCH"),
- "ROLE_PERMISSION": "alarms:",
- "<ID>": {
- "METHODS": ("GET", "PATCH"),
- "ROLE_PERMISSION": "alarms:id:",
- },
- }
+ "alarms": {"METHODS": ("GET", "PATCH"),
+ "ROLE_PERMISSION": "alarms:",
+ "<ID>": {"METHODS": ("GET", "PATCH"),
+ "ROLE_PERMISSION": "alarms:id:",
+ },
+ }
},
},
}
# NS Fault Management
@cherrypy.expose
- def nsfm(
- self,
- version=None,
- topic=None,
- uuid=None,
- project_name=None,
- ns_id=None,
- *args,
- **kwargs
- ):
- if topic == "alarms":
+ def nsfm(self, version=None, topic=None, uuid=None, project_name=None, ns_id=None, *args, **kwargs):
+ if topic == 'alarms':
try:
method = cherrypy.request.method
- role_permission = self._check_valid_url_method(
- method, "nsfm", version, topic, None, None, *args
- )
- query_string_operations = self._extract_query_string_operations(
- kwargs, method
- )
+ role_permission = self._check_valid_url_method(method, "nsfm", version, topic, None, None, *args)
+ query_string_operations = self._extract_query_string_operations(kwargs, method)
- self.authenticator.authorize(
- role_permission, query_string_operations, None
- )
+ self.authenticator.authorize(role_permission, query_string_operations, None)
# to handle get request
- if cherrypy.request.method == "GET":
+ if cherrypy.request.method == 'GET':
# if request is on basis of uuid
- if uuid and uuid != "None":
+ if uuid and uuid != 'None':
try:
alarm = self.engine.db.get_one("alarms", {"uuid": uuid})
- alarm_action = self.engine.db.get_one(
- "alarms_action", {"uuid": uuid}
- )
+ alarm_action = self.engine.db.get_one("alarms_action", {"uuid": uuid})
alarm.update(alarm_action)
- vnf = self.engine.db.get_one(
- "vnfrs", {"nsr-id-ref": alarm["tags"]["ns_id"]}
- )
+ vnf = self.engine.db.get_one("vnfrs", {"nsr-id-ref": alarm["tags"]["ns_id"]})
alarm["vnf-id"] = vnf["_id"]
return self._format_out(str(alarm))
except Exception:
return self._format_out("Please provide valid alarm uuid")
- elif ns_id and ns_id != "None":
+ elif ns_id and ns_id != 'None':
# if request is on basis of ns_id
try:
- alarms = self.engine.db.get_list(
- "alarms", {"tags.ns_id": ns_id}
- )
+ alarms = self.engine.db.get_list("alarms", {"tags.ns_id": ns_id})
for alarm in alarms:
- alarm_action = self.engine.db.get_one(
- "alarms_action", {"uuid": alarm["uuid"]}
- )
+ alarm_action = self.engine.db.get_one("alarms_action", {"uuid": alarm['uuid']})
alarm.update(alarm_action)
return self._format_out(str(alarms))
except Exception:
return self._format_out("Please provide valid ns id")
else:
# to return only alarm which are related to given project
- project = self.engine.db.get_one(
- "projects", {"name": project_name}
- )
- project_id = project.get("_id")
- ns_list = self.engine.db.get_list(
- "nsrs", {"_admin.projects_read": project_id}
- )
+ project = self.engine.db.get_one("projects", {"name": project_name})
+ project_id = project.get('_id')
+ ns_list = self.engine.db.get_list("nsrs", {"_admin.projects_read": project_id})
ns_ids = []
for ns in ns_list:
ns_ids.append(ns.get("_id"))
alarms = self.engine.db.get_list("alarms")
- alarm_list = [
- alarm
- for alarm in alarms
- if alarm["tags"]["ns_id"] in ns_ids
- ]
+ alarm_list = [alarm for alarm in alarms if alarm["tags"]["ns_id"] in ns_ids]
for alrm in alarm_list:
- action = self.engine.db.get_one(
- "alarms_action", {"uuid": alrm.get("uuid")}
- )
+ action = self.engine.db.get_one("alarms_action", {"uuid": alrm.get("uuid")})
alrm.update(action)
return self._format_out(str(alarm_list))
# to handle patch request for alarm update
- elif cherrypy.request.method == "PATCH":
+ elif cherrypy.request.method == 'PATCH':
data = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
try:
# check if uuid is valid
return self._format_out("Please provide valid alarm uuid.")
if data.get("is_enable") is not None:
if data.get("is_enable"):
- alarm_status = "ok"
+ alarm_status = 'ok'
else:
- alarm_status = "disabled"
- self.engine.db.set_one(
- "alarms",
- {"uuid": data.get("uuid")},
- {"alarm_status": alarm_status},
- )
+ alarm_status = 'disabled'
+ self.engine.db.set_one("alarms", {"uuid": data.get("uuid")},
+ {"alarm_status": alarm_status})
else:
- self.engine.db.set_one(
- "alarms",
- {"uuid": data.get("uuid")},
- {"threshold": data.get("threshold")},
- )
+ self.engine.db.set_one("alarms", {"uuid": data.get("uuid")},
+ {"threshold": data.get("threshold")})
return self._format_out("Alarm updated")
except Exception as e:
cherrypy.response.status = e.http_code.value
- if isinstance(
- e,
- (
- NbiException,
- EngineException,
- DbException,
- FsException,
- MsgException,
- AuthException,
- ValidationError,
- AuthconnException,
- ),
- ):
+ if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException,
+ ValidationError, AuthconnException)):
http_code_value = cherrypy.response.status = e.http_code.value
http_code_name = e.http_code.name
cherrypy.log("Exception {}".format(e))
else:
- http_code_value = (
- cherrypy.response.status
- ) = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
+ http_code_value = cherrypy.response.status = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
cherrypy.log("CRITICAL: Exception {}".format(e), traceback=True)
http_code_name = HTTPStatus.BAD_REQUEST.name
problem_details = {
self._format_login(token_info)
# password expiry check
if self.authenticator.check_password_expiry(outdata):
- outdata = {
- "id": outdata["id"],
- "message": "change_password",
- "user_id": outdata["user_id"],
- }
+ outdata = {"id": outdata["id"],
+ "message": "change_password",
+ "user_id": outdata["user_id"]
+ }
# cherrypy.response.cookie["Authorization"] = outdata["id"]
# cherrypy.response.cookie["Authorization"]['expires'] = 3600
elif method == "DELETE":
filter_q = None
if "vcaStatusRefresh" in kwargs:
filter_q = {"vcaStatusRefresh": kwargs["vcaStatusRefresh"]}
- outdata = self.engine.get_item(
- engine_session, engine_topic, _id, filter_q, True
- )
+ outdata = self.engine.get_item(engine_session, engine_topic, _id, filter_q, True)
elif method == "POST":
cherrypy.response.status = HTTPStatus.CREATED.value
elif topic == "vnf_instances" and item:
indata["lcmOperationType"] = item
indata["vnfInstanceId"] = _id
- _id, _ = self.engine.new_item(
- rollback, engine_session, "vnflcmops", indata, kwargs
- )
- self._set_location_header(
- main_topic, version, "vnf_lcm_op_occs", _id
- )
+ _id, _ = self.engine.new_item(rollback, engine_session, "vnflcmops", indata, kwargs)
+ self._set_location_header(main_topic, version, "vnf_lcm_op_occs", _id)
outdata = {"id": _id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
else:
"nsName": indata["vnfName"],
"nsDescription": indata["vnfDescription"],
"nsdId": self.__get_nsdid(session, indata["vnfInstanceId"]),
- "vimAccountId": indata.get("vimAccountId"),
- "paasAccountId": indata.get("paasAccountId"),
+ "vimAccountId": indata["vimAccountId"],
"nsr_id": indata["vnfInstanceId"],
"lcmOperationType": indata["lcmOperationType"],
"nsInstanceId": indata["vnfInstanceId"]
"PATCH /admin/v1/vca/<id>": "vca:id:patch"
-################################################################################
-###################################### PAASs ###################################
-################################################################################
-
- "GET /admin/v1/paas": "paas:get"
-
- "POST /admin/v1/paas": "paas:post"
-
- "GET /admin/v1/paas/<id>": "paas:id:get"
-
- "DELETE /admin/v1/paas/<id>": "paas:id:delete"
-
- "PATCH /admin/v1/paas/<id>": "paas:id:patch"
-
################################################################################
################################# K8s Repos ##############################
################################################################################
vca: false
vca:get: true
vca:id:get: true
- # PAAS
- paas: false
- paas:get: true
- paas:id:get: true
# K8s repos
k8srepos: true
# OSM repos
UserTopicAuth,
CommonVimWimSdn,
VcaTopic,
- PaasTopic,
)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
mock_check_conflict_on_del.assert_not_called()
-class TestPaaSTopic(TestCase):
- def setUp(self):
- self.db = Mock(dbbase.DbBase())
- self.fs = Mock(fsbase.FsBase())
- self.msg = Mock(msgbase.MsgBase())
- self.auth = Mock(authconn.Authconn(None, None, None))
- self.paas_topic = PaasTopic(self.db, self.fs, self.msg, self.auth)
-
- def test_format_on_new(self):
- content = {
- "_id": "id",
- "secret": "secret_to_encrypt",
- }
- self.db.encrypt.side_effect = ["encrypted_secret"]
-
- expecte_oid = "id:0"
- expected_num_operations = 1
- oid = self.paas_topic.format_on_new(content)
-
- self.assertEqual(oid, expecte_oid)
- self.assertEqual(content["secret"], "encrypted_secret")
- self.assertEqual(content["_admin"]["operationalState"], "PROCESSING")
- self.assertEqual(content["_admin"]["current_operation"], None)
- self.assertEqual(len(content["_admin"]["operations"]), expected_num_operations)
- self.assertEqual(
- content["_admin"]["operations"][0]["lcmOperationType"], "create"
- )
- self.db.encrypt.assert_called_with(
- "secret_to_encrypt", schema_version="1.11", salt="id"
- )
-
- @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
- def test_check_conflict_on_new(self, mock_get_project_filter):
- indata = {"name": "new_paas_name"}
- session = {}
- mock_get_project_filter.return_value = {}
- self.db.get_one.return_value = None
- self.paas_topic.check_conflict_on_new(session, indata)
-
- @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
- def test_check_conflict_on_new_raise_exception(self, mock_get_project_filter):
- indata = {"name": "new_paas_name"}
- session = {}
- mock_get_project_filter.return_value = {}
- self.db.get_one.return_value = ["Found_PaaS"]
- with self.assertRaises(EngineException):
- self.paas_topic.check_conflict_on_new(session, indata)
-
- @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
- def test_check_conflict_on_edit(self, mock_get_project_filter):
- edit_content = {"name": "new_paas_name"}
- final_content = {}
- session = {"force": None}
- mock_get_project_filter.return_value = {}
- self.db.get_one.return_value = None
- self.paas_topic.check_conflict_on_edit(
- session, final_content, edit_content, "id"
- )
-
- @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
- def test_check_conflict_on_edit_raise_exception(self, mock_get_project_filter):
- edit_content = {"name": "new_paas_name"}
- final_content = {}
- session = {"force": None}
- mock_get_project_filter.return_value = {}
- self.db.get_one.return_value = ["Found_PaaS"]
- with self.assertRaises(EngineException):
- self.paas_topic.check_conflict_on_edit(
- session, final_content, edit_content, "id"
- )
-
- def test_format_on_edit(self):
- edit_content = {
- "_id": "id",
- "secret": "secret_to_encrypt",
- }
- final_content = {
- "_id": "id",
- "_admin": {"operations": [{"lcmOperationType": "create"}]},
- "schema_version": "1.11",
- }
- self.db.encrypt.side_effect = ["encrypted_secret"]
- expected_oid = "id:1"
- expected_num_operations = 2
- print(self.paas_topic.password_to_encrypt)
- oid = self.paas_topic.format_on_edit(final_content, edit_content)
-
- self.assertEqual(oid, expected_oid)
- self.assertEqual(final_content["secret"], "encrypted_secret")
- self.assertEqual(
- len(final_content["_admin"]["operations"]), expected_num_operations
- )
- self.assertEqual(final_content["_admin"]["operationalState"], "PROCESSING")
- self.assertEqual(final_content["_admin"]["detailed-status"], "Editing")
- self.db.encrypt.assert_called_with(
- "secret_to_encrypt", schema_version="1.11", salt="id"
- )
-
-
class Test_ProjectTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
},
)
content = self.auth.update_user.call_args[0][0]
- self.assertEqual(
- content["old_password"], old_password, "Wrong old password"
- )
+ self.assertEqual(content["old_password"], old_password, "Wrong old password")
self.assertEqual(content["password"], new_pasw, "Wrong user password")
def test_delete_user(self):
vim_user: osm
"""
-db_paas_accounts_text = """
----
-- _id: 3d93ca4e-b007-4d94-bbdc-61911078b864
- name: my_new_paas1
- paas_type: juju
- endpoints:
- - 127.0.0.1
- user: admin
- secret: SCgY5peOGf98oqc00TNxfQ==
- _admin:
- created: 1664533097.4940042
- modified: 1664533116.9955606
- projects_read:
- - 25b5aebf-3da1-49ed-99de-1d2b4a86d6e4
- projects_write:
- - 25b5aebf-3da1-49ed-99de-1d2b4a86d6e4
- operationalState: ENABLED
- operations:
- - lcmOperationType: create
- operationState: COMPLETED
- startTime: 1664533097.4943297
- statusEnteredTime: 1664533097.4943297
- detailed-status: PaaS validated
- operationParams: null
- worker: null
- current_operation: null
- detailed-status: "Connectivity: ok"
- schema_version: 1.11
-- _id: c2538499-4c30-41c0-acd5-80cb92f45555
- name: my_new_paas2
- paas_type: juju
- endpoints:
- - 127.0.0.1
- user: admin
- secret: 5g0yGX86qIhprX86YTMcpg==
- _admin:
- created: 1664533097.4940042
- modified: 1664533116.9955606
- projects_read:
- - 25b5aebf-3da1-49ed-99de-1d2b4a86d6e4
- projects_write:
- - 25b5aebf-3da1-49ed-99de-1d2b4a86d6e4
- operationalState: ENABLED
- operations:
- - lcmOperationType: create
- operationState: COMPLETED
- startTime: 1664533097.4943297
- statusEnteredTime: 1664533097.4943297
- detailed-status: PaaS validated
- operationParams: null
- worker: null
- current_operation: null
- detailed-status: "Connectivity: ok"
- schema_version: 1.11
-"""
-
db_vnfds_text = """
---
- _admin:
from contextlib import contextmanager
import unittest
from time import time
-from unittest.mock import Mock, mock_open
+from unittest.mock import Mock, mock_open # patch, MagicMock
from osm_common.dbbase import DbException
from osm_nbi.engine import EngineException
from osm_common.dbmemory import DbMemory
from osm_nbi.instance_topics import NsLcmOpTopic, NsrTopic
from osm_nbi.tests.test_db_descriptors import (
db_vim_accounts_text,
- db_paas_accounts_text,
db_nsds_text,
db_vnfds_text,
db_nsrs_text,
self.vim = self.db.get_list("vim_accounts")[0]
self.vim_id = self.vim["_id"]
- self.db.create_list(
- "paas", yaml.load(db_paas_accounts_text, Loader=yaml.Loader)
- )
- self.paas_ids = [
- paas_element["_id"] for paas_element in self.db.get_list("paas")
- ]
- self.paas_id = self.paas_ids[0]
- self.vnfrs = {
- vnfr["_id"]: vnfr["member-vnf-index-ref"]
- for vnfr in self.db.get_list("vnfrs")
- }
- self.number_of_vnfr = len(self.vnfrs)
- self.new_session = {
+
+ def test_create_instantiate(self):
+ self.db.set_one = Mock(return_value={"updated": 1})
+ session = {
"force": False,
"admin": False,
"public": False,
"project_id": [self.nsr_project],
"method": "write",
}
-
- def check_operation_params(self, operation_params):
- self.assertEqual(self.nsd_id, operation_params["nsdId"])
- self.assertEqual(self.nsr_id, operation_params["nsInstanceId"])
- self.assertEqual("name", operation_params["nsName"])
-
- def test_create_instantiate_with_vim_account(self):
- self.db.set_one = Mock(return_value={"updated": 1})
indata = {
"nsdId": self.nsd_id,
"nsInstanceId": self.nsr_id,
headers = {}
nslcmop_id, _ = self.nslcmop_topic.new(
- rollback,
- self.new_session,
- indata=deepcopy(indata),
- kwargs=None,
- headers=headers,
+ rollback, session, indata=deepcopy(indata), kwargs=None, headers=headers
)
# check nslcmop is created at database
"rollback mismatch with created/set items at database",
)
- operation_params = created_nslcmop["operationParams"]
- self.check_operation_params(operation_params)
- self.assertEqual(self.vim_id, operation_params["vimAccountId"])
- self.assertNotIn("paasAccountId", operation_params)
-
- self.assertEqual(len(self.db.set_one.call_args_list), self.number_of_vnfr)
-
- for call in self.db.set_one.call_args_list:
- topic, vnfr_id, update = call[0]
- self.assertEqual(topic, "vnfrs")
- self.assertIn(vnfr_id["_id"], self.vnfrs)
- self.assertEqual(self.vim_id, update["vim-account-id"])
- self.assertNotIn("paas-account-id", update)
-
# test parameters with error
bad_id = "88d90b0c-faff-4b9f-bccd-aaaaaaaaaaaa"
test_set = (
with self.assertRaises(expect_exc, msg=message) as e:
self.nslcmop_topic.new(
rollback,
- self.new_session,
+ session,
indata=deepcopy(indata),
kwargs=kwargs_,
headers=headers,
"Expected '{}' at exception text".format(expect_text),
)
- def test_create_instantiate_with_paas_account(self):
- self.db.set_one = Mock(return_value={"updated": 1})
- indata = {
- "nsdId": self.nsd_id,
- "nsInstanceId": self.nsr_id,
- "nsName": "name",
- "paasAccountId": self.paas_id,
- "lcmOperationType": "instantiate",
- }
-
- nslcmop_id, _ = self.nslcmop_topic.new(
- [], self.new_session, indata=deepcopy(indata), kwargs=None, headers={}
- )
-
- # check nslcmop is created at database
- self.assertEqual(
- self.db.create.call_count,
- 1,
- "database create not called, or called more than once",
- )
- _call = self.db.create.call_args_list[0]
- self.assertEqual(
- _call[0][0], "nslcmops", "must be create a nslcmops entry at database"
- )
-
- created_nslcmop = _call[0][1]
- self.assertEqual(
- nslcmop_id,
- created_nslcmop["_id"],
- "mismatch between return id and database '_id'",
- )
-
- operation_params = created_nslcmop["operationParams"]
- self.check_operation_params(operation_params)
- self.assertEqual(self.paas_id, operation_params["paasAccountId"])
- self.assertNotIn("vimAccountId", operation_params)
-
- self.assertEqual(len(self.db.set_one.call_args_list), self.number_of_vnfr)
-
- for call in self.db.set_one.call_args_list:
- topic, vnfr_id, update = call[0]
- self.assertEqual(topic, "vnfrs")
- self.assertIn(vnfr_id["_id"], self.vnfrs)
- self.assertEqual(self.paas_id, update["paas-account-id"])
- self.assertNotIn("vim-account-id", update)
-
- def test_create_instantiate_invalid_paas_account_raises_exception(self):
- self.db.set_one = Mock(return_value={"updated": 1})
- invalid_paas_id = "88d90b0c-faff-4b9f-bccd-017f33985984"
- indata = {
- "nsdId": self.nsd_id,
- "nsInstanceId": self.nsr_id,
- "nsName": "name",
- "paasAccountId": invalid_paas_id,
- "lcmOperationType": "instantiate",
- }
-
- with self.assertRaises(EngineException):
- nslcmop_id, _ = self.nslcmop_topic.new(
- [], self.new_session, indata=deepcopy(indata), kwargs=None, headers={}
- )
- self.db.set_one.assert_not_called()
-
- def test_create_instantiate_with_paas_account_in_vnf(self):
- self.db.set_one = Mock(return_value={"updated": 1})
- indata = {
- "nsdId": self.nsd_id,
- "nsInstanceId": self.nsr_id,
- "nsName": "name",
- "paasAccountId": self.paas_ids[0],
- "lcmOperationType": "instantiate",
- "vnf": [{"member-vnf-index": "1", "paasAccountId": self.paas_ids[1]}],
- }
-
- nslcmop_id, _ = self.nslcmop_topic.new(
- [], self.new_session, indata=deepcopy(indata), kwargs=None, headers={}
- )
-
- # check nslcmop is created at database
- self.assertEqual(
- self.db.create.call_count,
- 1,
- "database create not called, or called more than once",
- )
- _call = self.db.create.call_args_list[0]
- self.assertEqual(
- _call[0][0], "nslcmops", "must be create a nslcmops entry at database"
- )
-
- created_nslcmop = _call[0][1]
- self.assertEqual(
- nslcmop_id,
- created_nslcmop["_id"],
- "mismatch between return id and database '_id'",
- )
-
- operation_params = created_nslcmop["operationParams"]
- self.check_operation_params(operation_params)
- self.assertEqual(self.paas_id, operation_params["paasAccountId"])
- self.assertNotIn("vimAccountId", operation_params)
- expected_paas_id = ""
-
- self.assertEqual(len(self.db.set_one.call_args_list), self.number_of_vnfr)
- for call in self.db.set_one.call_args_list:
- topic, vnfr_id, update = call[0]
- self.assertEqual(topic, "vnfrs")
- self.assertIn(vnfr_id["_id"], self.vnfrs)
- vnf_index_ref = self.vnfrs[vnfr_id["_id"]]
- if vnf_index_ref == "1":
- expected_paas_id = self.paas_ids[1]
- elif vnf_index_ref == "2":
- expected_paas_id = self.paas_ids[0]
- else:
- assert False
- self.assertEqual(expected_paas_id, update["paas-account-id"])
- self.assertNotIn("vim-account-id", update)
-
- def test_create_instantiate_invalid_paas_account_in_vnf_raises_exception(self):
- self.db.set_one = Mock(return_value={"updated": 1})
- invalid_paas_id = "88d90b0c-faff-4b9f-bccd-017f33985984"
- indata = {
- "nsdId": self.nsd_id,
- "nsInstanceId": self.nsr_id,
- "nsName": "name",
- "paasAccountId": self.paas_ids[0],
- "lcmOperationType": "instantiate",
- "vnf": [{"member-vnf-index": "1", "paasAccountId": invalid_paas_id}],
- }
-
- with self.assertRaises(EngineException):
- self.nslcmop_topic.new(
- [], self.new_session, indata=deepcopy(indata), kwargs=None, headers={}
- )
- self.db.set_one.assert_not_called()
-
def test_check_ns_operation_action(self):
nsrs = self.db.get_list("nsrs")[0]
session = {}
vnfr_id = self.db.get_list("vnfrs")[0]["_id"]
session = {}
self.db.set_one(
- "nsrs", {"_id": self.nsr_id}, {"_admin.nsState": "INSTANTIATED"}
+ "nsrs",
+ {"_id": self.nsr_id},
+ {"_admin.nsState": "INSTANTIATED"},
)
indata = {
"lcmOperationType": "update",
"updateType": "REMOVE_VNF",
"nsInstanceId": self.nsr_id,
- "removeVnfInstanceId": vnfr_id,
+ "removeVnfInstanceId": vnfr_id
}
session = {
)
def test_migrate(self):
+ vnfr_id = self.db.get_list("vnfrs")[0]["_id"]
session = {}
self.db.set_one(
- "nsrs", {"_id": self.nsr_id}, {"_admin.nsState": "INSTANTIATED"}
+ "nsrs",
+ {"_id": self.nsr_id},
+ {"_admin.nsState": "INSTANTIATED"},
)
session = {
"force": False,
indata = {
"lcmOperationType": "migrate",
"nsInstanceId": self.nsr_id,
- "migrateToHost": "sample02",
- "vdu": {"vduCountIndex": 0, "vduId": "mgmtVM"},
- "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f",
+ "migrateToHost":"sample02",
+ "vdu": {
+ "vduCountIndex": 0,
+ "vduId": "mgmtVM"
+ },
+ "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f"
}
nslcmop_id, _ = self.nslcmop_topic.new(
rollback, session, indata, kwargs=None, headers=headers
indata = {
"lcmOperationType": "migrate",
"nsInstanceId": self.nsr_id,
- "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f",
+ "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f"
}
nslcmop_id, _ = self.nslcmop_topic.new(
rollback, session, indata, kwargs=None, headers=headers
indata = {
"lcmOperationType": "migrate",
"nsInstanceId": self.nsr_id,
- "migrateToHost": "sample02",
- "vdu": {"vduCountIndex": 0},
- "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f",
+ "migrateToHost":"sample02",
+ "vdu": {
+ "vduCountIndex": 0
+ },
+ "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f"
}
with self.assertRaises(Exception) as e:
nslcmop_id, _ = self.nslcmop_topic.new(
- rollback, session, indata, kwargs=None, headers=headers
- )
- self.assertTrue(
- "Format error at 'vdu' ''vduId' is a required property'"
- in str(e.exception)
+ rollback, session, indata, kwargs=None, headers=headers
)
+ self.assertTrue("Format error at 'vdu' ''vduId' is a required property'" in str(e.exception))
class TestNsLcmOpTopicWithMock(unittest.TestCase):
test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
self.db.get_one.side_effect = [test_vnfr, test_vnfd]
- self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr["_id"])
- self.assertEqual(
- self.db.get_one.call_args_list[0][0][0],
- "vnfrs",
- "Incorrect first DB lookup",
- )
- self.assertEqual(
- self.db.get_one.call_args_list[1][0][0],
- "vnfds",
- "Incorrect second DB lookup",
- )
+ vnfr = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr['_id'])
+ self.assertEqual(self.db.get_one.call_args_list[0][0][0], 'vnfrs', "Incorrect first DB lookup")
+ self.assertEqual(self.db.get_one.call_args_list[1][0][0], 'vnfds', "Incorrect second DB lookup")
def test_get_vnfd_from_vnf_member_no_revision(self):
test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
- test_vnfr["revision"] = 3
+ test_vnfr['revision'] = 3
test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
self.db.get_one.side_effect = [test_vnfr, test_vnfd]
- self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr["_id"])
- self.assertEqual(
- self.db.get_one.call_args_list[0][0][0],
- "vnfrs",
- "Incorrect first DB lookup",
- )
- self.assertEqual(
- self.db.get_one.call_args_list[1][0][0],
- "vnfds_revisions",
- "Incorrect second DB lookup",
- )
+ vnfr = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr['_id'])
+ self.assertEqual(self.db.get_one.call_args_list[0][0][0], 'vnfrs', "Incorrect first DB lookup")
+ self.assertEqual(self.db.get_one.call_args_list[1][0][0], 'vnfds_revisions', "Incorrect second DB lookup")
@contextmanager
def assertNotRaises(self, exception_type):
"VNF instance: 88d90b0c-faff-4b9f-bccd-017f33985984",
)
- with self.subTest(
- i=5, t="Ns update REMOVE_VNF request validated with no exception"
- ):
+ with self.subTest(i=5, t="Ns update REMOVE_VNF request validated with no exception"):
test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)
test_vnfr[0]["revision"] = 2
test_nsr = yaml.load(db_nsrs_text, Loader=yaml.Loader)
with self.assertNotRaises(EngineException):
self.nslcmop_topic._check_ns_operation(session, nsrs, "update", indata)
-
class TestNsrTopic(unittest.TestCase):
def setUp(self):
self.db = DbMemory()
self.vim = self.db.get_list("vim_accounts")[0]
self.vim_id = self.vim["_id"]
- self.paas_id = "3d93ca4e-b007-4d94-bbdc-61911078b864"
-
- def stock_db_creations(self, created_vnfrs, created_nsrs):
- for _call in self.db.create.call_args_list:
- assert len(_call[0]) >= 2, "called db.create with few parameters"
- created_item = _call[0][1]
- if _call[0][0] == "vnfrs":
- created_vnfrs.append(created_item)
- elif _call[0][0] == "nsrs":
- created_nsrs.append(created_item)
- else:
- assert False, "created an unknown record {} at database".format(
- _call[0][0]
- )
- self.check_admin_field_in_db_call(created_item)
-
- def check_admin_field_in_db_call(self, created_item):
- self.assertTrue(
- created_item["_admin"].get("projects_read"),
- "Database record must contain '_amdin.projects_read'",
- )
- self.assertIn(
- "created",
- created_item["_admin"],
- "Database record must contain '_admin.created'",
- )
- self.assertTrue(
- created_item["_admin"]["nsState"] == "NOT_INSTANTIATED",
- "Database record must contain '_admin.nstate=NOT INSTANTIATE'",
- )
-
- def check_vnfr_content(self, vnfr, nsr_id):
- self.assertEqual(vnfr["vim-account-id"], None)
- self.assertEqual(vnfr["paas-account-id"], None)
- self.assertIn(
- "member-vnf-index-ref",
- vnfr,
- "Created item must contain member-vnf-index-ref section",
- )
- self.assertEqual(
- nsr_id, vnfr["nsr-id-ref"], "bad reference id from vnfr to nsr"
- )
- self.check_vdur_interfaces(vnfr)
-
- def check_vdur_interfaces(self, vnfr):
- self.assertEqual(
- vnfr["vdur"][0]["interfaces"][0]["position"],
- 1,
- "vdur first interface position does not match",
- )
- self.assertEqual(
- vnfr["vdur"][0]["interfaces"][1]["position"],
- 2,
- "vdur second interface position does not match",
- )
-
- def check_vnfrs(self, created_vnfrs, nsrs_id):
- self.assertEqual(
- len(created_vnfrs), 2, "created a mismatch number of vnfr at database"
- )
- for vnfr in created_vnfrs:
- self.check_vnfr_content(vnfr, nsrs_id)
-
- def check_vnfrs_ref_in_nsr(self, nsrs, created_vnfrs):
- self.assertEqual(len(nsrs["constituent-vnfr-ref"]), len(created_vnfrs))
- self.assertNotEqual(created_vnfrs[0]["_id"], created_vnfrs[1]["_id"])
- for vnfr in created_vnfrs:
- vnfr_id = vnfr["_id"]
- self.assertIn(vnfr_id, nsrs["constituent-vnfr-ref"])
-
- def check_nsrs(self, created_nsrs, created_vnfrs, vim_id, paas_id):
- self.assertEqual(
- len(created_nsrs), 1, "Only one nsrs must be created at database"
- )
- nsrs = created_nsrs[0]
- self.assertEqual(nsrs["vimdatacenter"], vim_id)
- self.assertEqual(nsrs["paasdatacenter"], paas_id)
- self.check_vnfrs_ref_in_nsr(nsrs, created_vnfrs)
-
- def check_created_nsrs_and_vnfrs(
- self, created_nsrs, created_vnfrs, rollback, vim_id, paas_id
- ):
-
- nsrs_id = created_nsrs[0]["_id"]
-
- self.assertEqual(
- len(rollback),
- len(created_vnfrs) + 1,
- "rollback mismatch with created items at database",
- )
-
- self.check_nsrs(created_nsrs, created_vnfrs, vim_id, paas_id)
- self.check_vnfrs(created_vnfrs, nsrs_id)
-
- def test_create_with_vim_account(self):
+ def test_create(self):
session = {
"force": False,
"admin": False,
# check vnfrs and nsrs created in whatever order
created_vnfrs = []
created_nsrs = []
- self.stock_db_creations(created_vnfrs, created_nsrs)
- self.check_created_nsrs_and_vnfrs(
- created_nsrs, created_vnfrs, rollback, self.vim_id, None
+ nsr_id = None
+ for _call in self.db.create.call_args_list:
+ assert len(_call[0]) >= 2, "called db.create with few parameters"
+ created_item = _call[0][1]
+ if _call[0][0] == "vnfrs":
+ created_vnfrs.append(created_item)
+ self.assertIn(
+ "member-vnf-index-ref",
+ created_item,
+ "Created item must contain member-vnf-index-ref section",
+ )
+ if nsr_id:
+ self.assertEqual(
+ nsr_id,
+ created_item["nsr-id-ref"],
+ "bad reference id from vnfr to nsr",
+ )
+ else:
+ nsr_id = created_item["nsr-id-ref"]
+
+ elif _call[0][0] == "nsrs":
+ created_nsrs.append(created_item)
+ if nsr_id:
+ self.assertEqual(
+ nsr_id, created_item["_id"], "bad reference id from vnfr to nsr"
+ )
+ else:
+ nsr_id = created_item["_id"]
+ else:
+ assert True, "created an unknown record {} at database".format(
+ _call[0][0]
+ )
+
+ self.assertTrue(
+ created_item["_admin"].get("projects_read"),
+ "Database record must contain '_amdin.projects_read'",
+ )
+ self.assertIn(
+ "created",
+ created_item["_admin"],
+ "Database record must contain '_admin.created'",
+ )
+ self.assertTrue(
+ created_item["_admin"]["nsState"] == "NOT_INSTANTIATED",
+ "Database record must contain '_admin.nstate=NOT INSTANTIATE'",
+ )
+
+ self.assertEqual(
+ len(created_vnfrs), 2, "created a mismatch number of vnfr at database"
+ )
+
+ self.assertEqual(
+ created_vnfrs[0]["vdur"][0]["interfaces"][0]["position"],
+ 1,
+ "vdur first interface position does not match",
+ )
+
+ self.assertEqual(
+ created_vnfrs[0]["vdur"][0]["interfaces"][1]["position"],
+ 2,
+ "vdur second interface position does not match",
+ )
+
+ self.assertEqual(
+ len(created_nsrs), 1, "Only one nsrs must be created at database"
+ )
+ self.assertEqual(
+ len(rollback),
+ len(created_vnfrs) + 1,
+ "rollback mismatch with created items at database",
)
- def test_create_with_vim_account_raise_exception(self):
# test parameters with error
bad_id = "88d90b0c-faff-4b9f-bccd-aaaaaaaaaaaa"
- session = {
- "force": False,
- "admin": False,
- "public": False,
- "project_id": [self.nsd_project],
- "method": "write",
- }
- indata = {
- "nsdId": self.nsd_id,
- "nsName": "name",
- "vimAccountId": self.vim_id,
- "additionalParamsForVnf": [
- {
- "member-vnf-index": "hackfest_vnf1",
- "additionalParams": {"touch_filename": "file"},
- },
- {
- "member-vnf-index": "hackfest_vnf2",
- "additionalParams": {"touch_filename": "file"},
- },
- ],
- }
- rollback = []
- headers = {}
test_set = (
# TODO add "nsd"
(
self.assertTrue(e.exception.http_code == expect_code)
if expect_text_list:
for expect_text in expect_text_list:
- self.assertIn(
- expect_text,
- str(e.exception).lower(),
- "Expected '{}' at exception text".format(expect_text),
- )
-
- def test_create_with_paas_account(self):
- session = {
- "force": False,
- "admin": False,
- "public": False,
- "project_id": [self.nsd_project],
- "method": "write",
- }
- indata = {
- "nsdId": self.nsd_id,
- "nsName": "name",
- "paasAccountId": self.paas_id,
- "additionalParamsForVnf": [
- {
- "member-vnf-index": "hackfest_vnf1",
- "additionalParams": {"touch_filename": "file"},
- },
- {
- "member-vnf-index": "hackfest_vnf2",
- "additionalParams": {"touch_filename": "file"},
- },
- ],
- }
- rollback = []
- headers = {}
-
- self.nsr_topic.new(
- rollback, session, indata=indata, kwargs=None, headers=headers
- )
-
- created_vnfrs = []
- created_nsrs = []
- self.stock_db_creations(created_vnfrs, created_nsrs)
- self.check_created_nsrs_and_vnfrs(
- created_nsrs, created_vnfrs, rollback, None, self.paas_id
- )
+ self.assertIn(expect_text, str(e.exception).lower(),
+ "Expected '{}' at exception text".format(expect_text))
def test_show_instance(self):
- session = {
- "force": False,
- "admin": False,
- "public": False,
- "project_id": [self.nsd_project],
- "method": "write",
- }
+ session = {"force": False, "admin": False, "public": False, "project_id": [self.nsd_project], "method": "write"}
filter_q = {}
for refresh_status in ("true", "false"):
self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
actual_nsr = self.db.get_list("nsrs")[0]
nsr_id = actual_nsr["_id"]
- filter_q["vcaStatus-refresh"] = refresh_status
+ filter_q['vcaStatus-refresh'] = refresh_status
expected_nsr = self.nsr_topic.show(session, nsr_id, filter_q=filter_q)
self.nsr_topic.delete(session, nsr_id)
actual_nsr.pop("_admin")
expected_nsr.pop("_admin")
- self.assertEqual(
- expected_nsr, actual_nsr, "Database nsr and show() nsr do not match."
- )
+ self.assertEqual(expected_nsr, actual_nsr, "Database nsr and show() nsr do not match.")
def test_vca_status_refresh(self):
- session = {
- "force": False,
- "admin": False,
- "public": False,
- "project_id": [self.nsd_project],
- "method": "write",
- }
- filter_q = {"vcaStatus-refresh": "true"}
+ session = {"force": False, "admin": False, "public": False, "project_id": [self.nsd_project], "method": "write"}
+ filter_q = {'vcaStatus-refresh': 'true'}
time_delta = 120
self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
nsr = self.db.get_list("nsrs")[0]
# When vcaStatus-refresh is true
- filter_q["vcaStatus-refresh"] = "true"
+ filter_q['vcaStatus-refresh'] = "true"
self.nsr_topic.vca_status_refresh(session, nsr, filter_q)
msg_args = self.msg.write.call_args[0]
self.assertEqual(msg_args[1], "vca_status_refresh", "Wrong message action")
self.assertGreater(nsr["_admin"]["modified"], time() - time_delta)
# When vcaStatus-refresh is false but modified time is within threshold
- filter_q["vcaStatus-refresh"] = "false"
+ filter_q['vcaStatus-refresh'] = "false"
time_now = time()
nsr["_admin"]["modified"] = time_now
self.nsr_topic.vca_status_refresh(session, nsr, filter_q)
msg_args = self.msg.write.call_args[1]
self.assertEqual(msg_args, {}, "Message should not be sent.")
- self.assertEqual(
- nsr["_admin"]["modified"], time_now, "Modified time should not be changed."
- )
+ self.assertEqual(nsr["_admin"]["modified"], time_now, "Modified time should not be changed.")
# When vcaStatus-refresh is false but modified time is less than threshold
- filter_q["vcaStatus-refresh"] = "false"
- nsr["_admin"]["modified"] = time() - (2 * time_delta)
+ filter_q['vcaStatus-refresh'] = "false"
+ nsr["_admin"]["modified"] = time() - (2*time_delta)
self.nsr_topic.vca_status_refresh(session, nsr, filter_q)
msg_args = self.msg.write.call_args[0]
self.assertEqual(msg_args[1], "vca_status_refresh", "Wrong message action")
self.nsr_topic.delete(session, nsr["_id"])
- self.assertGreater(
- nsr["_admin"]["modified"],
- time() - time_delta,
- "Modified time is not changed.",
- )
+ self.assertGreater(nsr["_admin"]["modified"], time() - time_delta, "Modified time is not changed.")
def test_delete_ns(self):
self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
"nsName": name_schema,
"nsDescription": {"oneOf": [description_schema, null_schema]},
"nsdId": id_schema,
- "vimAccountId": {"oneOf": [id_schema, null_schema]},
- "paasAccountId": {"oneOf": [id_schema, null_schema]},
+ "vimAccountId": id_schema,
"wimAccountId": {"oneOf": [id_schema, bool_schema, null_schema]},
"placement-engine": string_schema,
"placement-constraints": object_schema,
"type": "object",
"properties": {
"member-vnf-index": name_schema,
- "vimAccountId": {"oneOf": [id_schema, null_schema]},
- "paasAccountId": {"oneOf": [id_schema, null_schema]},
+ "vimAccountId": id_schema,
"vdu": {
"type": "array",
"minItems": 1,
},
},
},
- "required": ["nsName", "nsdId"],
+ "required": ["nsName", "nsdId", "vimAccountId"],
"additionalProperties": False,
}
"nsInstanceId": id_schema,
"timeout_ns_update": integer1_schema,
"updateType": {
- "enum": [
- "CHANGE_VNFPKG",
- "REMOVE_VNF",
- "MODIFY_VNF_INFORMATION",
- "OPERATE_VNF",
- ]
+ "enum": ["CHANGE_VNFPKG", "REMOVE_VNF", "MODIFY_VNF_INFORMATION", "OPERATE_VNF"]
},
"modifyVnfInfoData": {
"type": "object",
},
"required": ["vdu_id", "count-index"],
"additionalProperties": False,
- },
+ }
},
"required": ["vnfInstanceId", "changeStateTo"],
- },
+ }
},
"required": ["updateType"],
"additionalProperties": False,
"migrateToHost": string_schema,
"vdu": {
"type": "object",
- "properties": {
- "vduId": name_schema,
- "vduCountIndex": integer0_schema,
- },
- "required": ["vduId"],
- "additionalProperties": False,
+ "properties": {
+ "vduId": name_schema,
+ "vduCountIndex": integer0_schema,
+ },
+ "required": ["vduId"],
+ "additionalProperties": False,
},
},
"required": ["vnfInstanceId"],
- "additionalProperties": False,
+ "additionalProperties": False
}
ns_heal = {
"virtualMemory": integer1_schema,
"sizeOfStorage": integer0_schema,
"numVirtualCpu": integer1_schema,
- },
+ },
+ }
},
- },
"required": ["vnfInstanceId", "additionalParams"],
"additionalProperties": False,
+ }
},
- },
"required": ["lcmOperationType", "verticalScale", "nsInstanceId"],
"additionalProperties": False,
}
"additionalProperties": False,
}
-# PAAS
-paas_types = {"enum": ["juju"]}
-paas_new_schema = {
- "title": "paas creation input schema",
- "$schema": "http://json-schema.org/draft-04/schema#",
- "type": "object",
- "properties": {
- "schema_version": schema_version,
- "schema_type": schema_type,
- "name": name_schema,
- "paas_type": paas_types,
- "description": description_schema,
- "endpoints": description_list_schema,
- "user": string_schema,
- "secret": passwd_schema,
- "config": object_schema,
- },
- "required": [
- "name",
- "paas_type",
- "endpoints",
- "user",
- "secret",
- ],
- "additionalProperties": False,
-}
-paas_edit_schema = {
- "title": "paas edition input schema",
- "$schema": "http://json-schema.org/draft-04/schema#",
- "type": "object",
- "properties": {
- "name": name_schema,
- "paas_type": paas_types,
- "description": description_schema,
- "endpoints": description_list_schema,
- "user": string_schema,
- "secret": passwd_schema,
- "config": object_schema,
- },
- "additionalProperties": False,
-}
-
# K8s Repos
k8srepo_types = {"enum": ["helm-chart", "juju-bundle"]}
k8srepo_properties = {
"sdn_controllers",
"k8sclusters",
"vca",
- "paas",
"k8srepos",
"osmrepos",
"ns_subscriptions",
"enum": [
"VnfIdentifierCreationNotification",
"VnfLcmOperationOccurrenceNotification",
- "VnfIdentifierDeletionNotification",
- ]
- },
+ "VnfIdentifierDeletionNotification"
+ ]
+ }
},
"operationTypes": {
"type": "array",
"items": {
"enum": [
- "INSTANTIATE",
- "SCALE",
- "SCALE_TO_LEVEL",
- "CHANGE_FLAVOUR",
- "TERMINATE",
- "HEAL",
- "OPERATE",
- "CHANGE_EXT_CONN",
- "MODIFY_INFO",
- "CREATE_SNAPSHOT",
- "REVERT_TO_SNAPSHOT",
- "CHANGE_VNFPKG",
- ]
- },
+ "INSTANTIATE", "SCALE", "SCALE_TO_LEVEL", "CHANGE_FLAVOUR", "TERMINATE",
+ "HEAL", "OPERATE", "CHANGE_EXT_CONN", "MODIFY_INFO", "CREATE_SNAPSHOT",
+ "REVERT_TO_SNAPSHOT", "CHANGE_VNFPKG"
+ ]
+ }
},
"operationStates": {
"type": "array",
"items": {
"enum": [
- "STARTING",
- "PROCESSING",
- "COMPLETED",
- "FAILED_TEMP",
- "FAILED",
- "ROLLING_BACK",
- "ROLLED_BACK",
- ]
- },
- },
+ "STARTING", "PROCESSING", "COMPLETED", "FAILED_TEMP", "FAILED",
+ "ROLLING_BACK", "ROLLED_BACK"
+ ]
+ }
+ }
},
- "required": ["VnfInstanceSubscriptionFilter", "notificationTypes"],
-}
+ "required": ["VnfInstanceSubscriptionFilter", "notificationTypes"]
+ }
vnf_subscription = {
"title": "vnf subscription input schema",
"properties": {
"filter": vnflcmsub_schema,
"CallbackUri": description_schema,
- "authentication": authentication_schema,
+ "authentication": authentication_schema
},
- "required": ["filter", "CallbackUri"],
+ "required": ["filter", "CallbackUri"]
}