# See the License for the specific language governing permissions and
# limitations under the License.
-set -e
echo "Launching tox"
-tox --parallel=auto
-
+TOX_PARALLEL_NO_SPINNER=1 tox --parallel=auto
if content.get("vim_type"):
if content["vim_type"] == "openstack":
compute = {
- "ram": {
- "total": None,
- "used": None
- },
- "vcpus": {
- "total": None,
- "used": None
- },
- "instances": {
- "total": None,
- "used": None
- }
+ "ram": {"total": None, "used": None},
+ "vcpus": {"total": None, "used": None},
+ "instances": {"total": None, "used": None},
}
storage = {
- "volumes": {
- "total": None,
- "used": None
- },
- "snapshots": {
- "total": None,
- "used": None
- },
- "storage": {
- "total": None,
- "used": None
- }
+ "volumes": {"total": None, "used": None},
+ "snapshots": {"total": None, "used": None},
+ "storage": {"total": None, "used": None},
}
network = {
- "networks": {
- "total": None,
- "used": None
- },
- "subnets": {
- "total": None,
- "used": None
- },
- "floating_ips": {
- "total": None,
- "used": None
- }
+ "networks": {"total": None, "used": None},
+ "subnets": {"total": None, "used": None},
+ "floating_ips": {"total": None, "used": None},
+ }
+ content["resources"] = {
+ "compute": compute,
+ "storage": storage,
+ "network": network,
}
- content["resources"] = {"compute": compute, "storage": storage, "network": network}
return "{}:0".format(content["_id"])
elif auth_list[0].lower() == "basic":
user_passwd64 = auth_list[-1]
if not token:
- if cherrypy.session.get("Authorization"):
+ if cherrypy.session.get("Authorization"): # pylint: disable=E1101
# 2. Try using session before request a new token. If not, basic authentication will generate
- token = cherrypy.session.get("Authorization")
+ token = cherrypy.session.get( # pylint: disable=E1101
+ "Authorization"
+ )
if token == "logout":
token = None # force Unauthorized response to insert user password again
elif user_passwd64 and cherrypy.request.config.get(
except Exception:
pass
outdata = self.new_token(
- None, {"username": user, "password": passwd}
+ None, {"username": user, "password": passwd}, None
)
token = outdata["_id"]
- cherrypy.session["Authorization"] = token
+ cherrypy.session["Authorization"] = token # pylint: disable=E1101
if not token:
raise AuthException(
return token_info
except AuthException as e:
if not isinstance(e, AuthExceptionUnauthorized):
- if cherrypy.session.get("Authorization"):
- del cherrypy.session["Authorization"]
+ if cherrypy.session.get("Authorization"): # pylint: disable=E1101
+ del cherrypy.session["Authorization"] # pylint: disable=E1101
cherrypy.response.headers[
"WWW-Authenticate"
] = 'Bearer realm="{}"'.format(e)
:param outdata: user token information
"""
user_content = None
- detail = {}
present_time = time()
user = outdata["username"]
if self.config["authentication"].get("pwd_expiry_check"):
:param filter_q: dictionary to filter user list by name (username is also admited) and/or _id
:return: returns a list of users.
"""
+ return list() # Default return value so that the method get_user passes pylint
def get_user(self, _id, fail=True):
"""
import logging
import re
-from osm_nbi.authconn import Authconn, AuthException, AuthconnConflictException # , AuthconnOperationException
+from osm_nbi.authconn import (
+ Authconn,
+ AuthException,
+ AuthconnConflictException,
+) # , AuthconnOperationException
from osm_common.dbbase import DbException
from osm_nbi.base_topic import BaseTopic
from osm_nbi.validation import is_valid_uuid
)
if old_pwd:
salt = user_data["_admin"]["salt"]
- shadow_password = sha256(old_pwd.encode('utf-8') + salt.encode('utf-8')).hexdigest()
+ shadow_password = sha256(
+ old_pwd.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
if shadow_password != user_data["password"]:
raise AuthconnConflictException(
- "Incorrect password",
- http_code=HTTPStatus.CONFLICT
+ "Incorrect password", http_code=HTTPStatus.CONFLICT
)
BaseTopic.format_on_edit(user_data, user_info)
# User Name
self.http_code = http_code
super(Exception, self).__init__(message)
+
class NBIBadArgumentsException(Exception):
"""
Bad argument values exception
self.bad_args = bad_args
def __str__(self):
- return "{}, Bad arguments: {}".format(
- self.message, self.bad_args
- )
+ return "{}, Bad arguments: {}".format(self.message, self.bad_args)
+
def deep_get(target_dict, key_list):
"""
return target_dict
-def detect_descriptor_usage(
- descriptor: dict, db_collection: str, db: object
-) -> bool:
+def detect_descriptor_usage(descriptor: dict, db_collection: str, db: object) -> bool:
"""Detect the descriptor usage state.
Args:
}
if db_collection not in search_dict:
- raise NBIBadArgumentsException("db_collection should be equal to vnfds or nsds", "db_collection")
+ raise NBIBadArgumentsException(
+ "db_collection should be equal to vnfds or nsds", "db_collection"
+ )
record_list = db.get_list(
search_dict[db_collection][0],
return True
except (DbException, KeyError, NBIBadArgumentsException) as error:
- raise EngineException(f"Error occured while detecting the descriptor usage: {error}")
+ raise EngineException(
+ f"Error occured while detecting the descriptor usage: {error}"
+ )
def update_descriptor_usage_state(
"_admin.usageState": "IN_USE",
}
- db.set_one(db_collection, {"_id": descriptor["_id"]}, update_dict=descriptor_update)
+ db.set_one(
+ db_collection, {"_id": descriptor["_id"]}, update_dict=descriptor_update
+ )
except (DbException, KeyError, NBIBadArgumentsException) as error:
- raise EngineException(f"Error occured while updating the descriptor usage state: {error}")
+ raise EngineException(
+ f"Error occured while updating the descriptor usage state: {error}"
+ )
def get_iterable(input_var):
class DescriptorTopic(BaseTopic):
def __init__(self, db, fs, msg, auth):
-
BaseTopic.__init__(self, db, fs, msg, auth)
+ def _validate_input_new(self, indata, storage_params, force=False):
+ return indata
+
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
final_content = super().check_conflict_on_edit(
session, final_content, edit_content, _id
if self.db.get_one(self.topic, _filter, fail_on_empty=False):
raise EngineException(
"{} with id '{}' already exists for this project".format(
- self.topic[:-1], final_content["id"]
+ (str(self.topic))[:-1], final_content["id"]
),
HTTPStatus.CONFLICT,
)
self.fs.file_delete(_id + ":" + str(revision), ignore_non_exist=True)
revision = revision - 1
-
@staticmethod
def get_one_by_id(db, session, topic, id):
# find owned by this project
# Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
# indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
- content = {"_admin": {
- "userDefinedData": indata,
- "revision": 0
- }}
+ content = {"_admin": {"userDefinedData": indata, "revision": 0}}
self.format_on_new(
content, session["project_id"], make_public=session["public"]
or "application/x-gzip" in content_type
):
compressed = "gzip"
- if (
- content_type
- and "application/zip" in content_type
- ):
+ if content_type and "application/zip" in content_type:
compressed = "zip"
filename = headers.get("Content-Filename")
if not filename and compressed:
)
if (
- (
- zipfilename.endswith(".yaml")
- or zipfilename.endswith(".json")
- or zipfilename.endswith(".yml")
- ) and (
- zipfilename.find("/") < 0
- or zipfilename.find("Definitions") >= 0
- )
+ zipfilename.endswith(".yaml")
+ or zipfilename.endswith(".json")
+ or zipfilename.endswith(".yml")
+ ) and (
+ zipfilename.find("/") < 0
+ or zipfilename.find("Definitions") >= 0
):
storage["pkg-dir"] = ""
if descriptor_file_name:
proposed_revision_path,
)
except Exception as e:
- shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
- shutil.rmtree(self.fs.path + proposed_revision_path, ignore_errors=True)
+ shutil.rmtree(
+ self.fs.path + current_revision_path, ignore_errors=True
+ )
+ shutil.rmtree(
+ self.fs.path + proposed_revision_path, ignore_errors=True
+ )
# Only delete the new revision. We need to keep the original version in place
# as it has not been changed.
self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
raise e
-
indata = self._remove_envelop(indata)
# Override descriptor with query string kwargs
# Copy the revision to the active package name by its original id
shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
- os.rename(self.fs.path + proposed_revision_path, self.fs.path + current_revision_path)
+ os.rename(
+ self.fs.path + proposed_revision_path,
+ self.fs.path + current_revision_path,
+ )
self.fs.file_delete(current_revision_path, ignore_non_exist=True)
self.fs.mkdir(current_revision_path)
self.fs.reverse_sync(from_path=current_revision_path)
descriptor_id,
descriptor_file_name,
old_descriptor_directory,
- new_descriptor_directory
+ new_descriptor_directory,
):
# Example:
# raise EngineException(
# )
pass
+
class VnfdTopic(DescriptorTopic):
topic = "vnfds"
topic_msg = "vnfd"
return False
elif not storage_params.get("pkg-dir"):
if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
- f = "{}_/{}".format(
- storage_params["folder"], folder
- )
+ f = "{}_/{}".format(storage_params["folder"], folder)
else:
- f = "{}/{}".format(
- storage_params["folder"], folder
- )
+ f = "{}/{}".format(storage_params["folder"], folder)
if file:
return self.fs.file_exists("{}/{}".format(f, file), "file")
else:
"""
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
- self.db.del_list(self.topic+"_revisions", {"_id": {"$regex": _id}})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
def sol005_projection(self, data):
data["onboardingState"] = data["_admin"]["onboardingState"]
"""
for df in vnfd.get("df", {}):
for policy in ["scaling-aspect", "healing-aspect"]:
- if (df.get(policy, {})):
+ if df.get(policy, {}):
df.pop(policy)
for vdu in vnfd.get("vdu", {}):
for alarm_policy in ["alarm", "monitoring-parameter"]:
- if (vdu.get(alarm_policy, {})):
+ if vdu.get(alarm_policy, {}):
vdu.pop(alarm_policy)
return vnfd
:raises: FsException in case of error while deleting associated storage
"""
super().delete_extra(session, _id, db_content, not_send_msg)
- self.db.del_list(self.topic+"_revisions", { "_id": { "$regex": _id}})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
@staticmethod
def extract_day12_primitives(nsd: dict) -> dict:
:return: The list, it can be empty if no one match the filter_q.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].list(session, filter_q, api_req)
def get_item(self, session, topic, _id, filter_q=None, api_req=False):
:return: dictionary, raise exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].show(session, _id, filter_q, api_req)
def get_file(self, session, topic, _id, path=None, accept_header=None):
if "Location" in response.headers:
body += '<a href="{}"> show </a>'.format(response.headers["Location"])
else:
- _id = request.path_info[request.path_info.rfind("/") + 1:]
+ _id = request.path_info[request.path_info.rfind("/") + 1 :]
body += (
'<a href="/osm/{}?METHOD=DELETE"> '
'<img src="/osm/static/delete.png" height="25" width="25"> </a>'
- ).format(
- request.path_info
- )
+ ).format(request.path_info)
if request.path_info.startswith(
"/nslcm/v1/ns_instances_content/"
) or request.path_info.startswith("/nslcm/v1/ns_instances/"):
def __init__(self, db, fs, msg, auth):
BaseTopic.__init__(self, db, fs, msg, auth)
- def _check_descriptor_dependencies(self, session, descriptor):
- """
- Check that the dependent descriptors exist on a new descriptor or edition
- :param session: client session information
- :param descriptor: descriptor to be inserted or edit
- :return: None or raises exception
- """
- if not descriptor.get("nsdId"):
- return
- nsd_id = descriptor["nsdId"]
- if not self.get_item_list(session, "nsds", {"id": nsd_id}):
- raise EngineException(
- "Descriptor error at nsdId='{}' references a non exist nsd".format(
- nsd_id
- ),
- http_code=HTTPStatus.CONFLICT,
- )
-
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
for config in df["lcm-operations-configuration"][
"operate-vnf-op-config"
].get("day1-2", []):
- # Verify the target object (VNF|NS|VDU|KDU) where we need to populate
+ # Verify the target object (VNF|NS|VDU|KDU) where we need to populate
# the params with the additional ones given by the user
if config.get("id") == selector:
for primitive in get_iterable(
"<rw_mgmt_ip>",
"<VDU_SCALE_INFO>",
"<ns_config_info>",
- "<OSM>"
+ "<OSM>",
):
continue
if (
EngineException, ValidationError, DbException, FsException, MsgException.
Note: Exceptions are not captured on purpose. They should be captured at called
"""
+ step = "checking quotas" # first step must be defined outside try
try:
- step = "checking quotas"
self.check_quota(session)
step = "validating input parameters"
return ns_k8s_namespace
- def _add_flavor_to_nsr(self, vdu, vnfd, nsr_descriptor, member_vnf_index, revision=None):
+ def _add_flavor_to_nsr(
+ self, vdu, vnfd, nsr_descriptor, member_vnf_index, revision=None
+ ):
flavor_data = {}
guest_epa = {}
# Find this vdu compute and storage descriptors
if vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]:
vdu_virtual_storage = vsd
# Get this vdu vcpus, memory and storage info for flavor_data
- if vdu_virtual_compute.get("virtual-cpu", {}).get(
- "num-virtual-cpu"
- ):
+ if vdu_virtual_compute.get("virtual-cpu", {}).get("num-virtual-cpu"):
flavor_data["vcpu-count"] = vdu_virtual_compute["virtual-cpu"][
"num-virtual-cpu"
]
if vdu_virtual_compute.get("virtual-memory", {}).get("size"):
flavor_data["memory-mb"] = (
- float(vdu_virtual_compute["virtual-memory"]["size"])
- * 1024.0
+ float(vdu_virtual_compute["virtual-memory"]["size"]) * 1024.0
)
if vdu_virtual_storage.get("size-of-storage"):
- flavor_data["storage-gb"] = vdu_virtual_storage[
- "size-of-storage"
- ]
+ flavor_data["storage-gb"] = vdu_virtual_storage["size-of-storage"]
# Get this vdu EPA info for guest_epa
if vdu_virtual_compute.get("virtual-cpu", {}).get("cpu-quota"):
- guest_epa["cpu-quota"] = vdu_virtual_compute["virtual-cpu"][
- "cpu-quota"
- ]
+ guest_epa["cpu-quota"] = vdu_virtual_compute["virtual-cpu"]["cpu-quota"]
if vdu_virtual_compute.get("virtual-cpu", {}).get("pinning"):
vcpu_pinning = vdu_virtual_compute["virtual-cpu"]["pinning"]
if vcpu_pinning.get("thread-policy"):
- guest_epa["cpu-thread-pinning-policy"] = vcpu_pinning[
- "thread-policy"
- ]
+ guest_epa["cpu-thread-pinning-policy"] = vcpu_pinning["thread-policy"]
if vcpu_pinning.get("policy"):
cpu_policy = (
- "SHARED"
- if vcpu_pinning["policy"] == "dynamic"
- else "DEDICATED"
+ "SHARED" if vcpu_pinning["policy"] == "dynamic" else "DEDICATED"
)
guest_epa["cpu-pinning-policy"] = cpu_policy
if vdu_virtual_compute.get("virtual-memory", {}).get("mem-quota"):
- guest_epa["mem-quota"] = vdu_virtual_compute["virtual-memory"][
- "mem-quota"
+ guest_epa["mem-quota"] = vdu_virtual_compute["virtual-memory"]["mem-quota"]
+ if vdu_virtual_compute.get("virtual-memory", {}).get("mempage-size"):
+ guest_epa["mempage-size"] = vdu_virtual_compute["virtual-memory"][
+ "mempage-size"
]
- if vdu_virtual_compute.get("virtual-memory", {}).get(
- "mempage-size"
- ):
- guest_epa["mempage-size"] = vdu_virtual_compute[
- "virtual-memory"
- ]["mempage-size"]
- if vdu_virtual_compute.get("virtual-memory", {}).get(
- "numa-node-policy"
- ):
- guest_epa["numa-node-policy"] = vdu_virtual_compute[
- "virtual-memory"
- ]["numa-node-policy"]
- if vdu_virtual_storage.get("disk-io-quota"):
- guest_epa["disk-io-quota"] = vdu_virtual_storage[
- "disk-io-quota"
+ if vdu_virtual_compute.get("virtual-memory", {}).get("numa-node-policy"):
+ guest_epa["numa-node-policy"] = vdu_virtual_compute["virtual-memory"][
+ "numa-node-policy"
]
+ if vdu_virtual_storage.get("disk-io-quota"):
+ guest_epa["disk-io-quota"] = vdu_virtual_storage["disk-io-quota"]
if guest_epa:
flavor_data["guest-epa"] = guest_epa
revision = revision if revision is not None else 1
- flavor_data["name"] = vdu["id"][:56] + "-" + member_vnf_index + "-" + str(revision) + "-flv"
+ flavor_data["name"] = (
+ vdu["id"][:56] + "-" + member_vnf_index + "-" + str(revision) + "-flv"
+ )
flavor_data["id"] = str(len(nsr_descriptor["flavor"]))
nsr_descriptor["flavor"].append(flavor_data)
if "revision" in vnfd:
vnfr_descriptor["revision"] = vnfd["revision"]
-
vnf_k8s_namespace = ns_k8s_namespace
if vnf_params:
if vnf_params.get("k8s-namespace"):
try:
vdu_virtual_storage_descriptors = utils.filter_in_list(
vnfd.get("virtual-storage-desc", []),
- lambda stg_desc: stg_desc["id"] in vdu["virtual-storage-desc"]
+ lambda stg_desc: stg_desc["id"] in vdu["virtual-storage-desc"],
)
except Exception:
vdu_virtual_storage_descriptors = []
"interfaces": [],
"additionalParams": additional_params,
"vdu-name": vdu["name"],
- "virtual-storages": vdu_virtual_storage_descriptors
+ "virtual-storages": vdu_virtual_storage_descriptors,
}
if vdu_params and vdu_params.get("config-units"):
vdur["config-units"] = vdu_params["config-units"]
vdur["alt-image-ids"] = alt_image_ids
revision = revision if revision is not None else 1
- flavor_data_name = vdu["id"][:56] + "-" + vnf_index + "-" + str(revision) + "-flv"
+ flavor_data_name = (
+ vdu["id"][:56] + "-" + vnf_index + "-" + str(revision) + "-flv"
+ )
nsr_flavor_desc = utils.find_in_list(
nsr_descriptor["flavor"],
lambda flavor: flavor["name"] == flavor_data_name,
:param filter_q: dict: query parameter containing vcaStatus-refresh as true or false
:return: None
"""
- time_now, time_delta = time(), time() - ns_instance_content["_admin"]["modified"]
- force_refresh = isinstance(filter_q, dict) and filter_q.get('vcaStatusRefresh') == 'true'
+ time_now, time_delta = (
+ time(),
+ time() - ns_instance_content["_admin"]["modified"],
+ )
+ force_refresh = (
+ isinstance(filter_q, dict) and filter_q.get("vcaStatusRefresh") == "true"
+ )
threshold_reached = time_delta > 120
if force_refresh or threshold_reached:
operation, _id = "vca_status_refresh", ns_instance_content["_id"]
ns_instance_content["_admin"]["modified"] = time_now
self.db.set_one(self.topic, {"_id": _id}, ns_instance_content)
nslcmop_desc = NsLcmOpTopic._create_nslcmop(_id, operation, None)
- self.format_on_new(nslcmop_desc, session["project_id"], make_public=session["public"])
+ self.format_on_new(
+ nslcmop_desc, session["project_id"], make_public=session["public"]
+ )
nslcmop_desc["_admin"].pop("nsState")
self.msg.write("ns", operation, nslcmop_desc)
return
"nsd:constituent-vnfd".format(member_vnf_index)
)
- ## Backwards compatibility: if there is no revision, get it from the one and only VNFD entry
+ # Backwards compatibility: if there is no revision, get it from the one and only VNFD entry
if "revision" in vnfr:
vnfd_revision = vnfr["vnfd-id"] + ":" + str(vnfr["revision"])
- vnfd = self.db.get_one("vnfds_revisions", {"_id": vnfd_revision}, fail_on_empty=False)
+ vnfd = self.db.get_one(
+ "vnfds_revisions", {"_id": vnfd_revision}, fail_on_empty=False
+ )
else:
- vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False)
+ vnfd = self.db.get_one(
+ "vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False
+ )
if not vnfd:
raise EngineException(
return self.db.get_one("vim_accounts", db_filter)
except Exception:
raise EngineException(
- "Invalid vimAccountId='{}' not present for the project".format(
- vim_id
- )
+ "Invalid vimAccountId='{}' not present for the project".format(vim_id)
)
def _check_valid_wim_account(self, wim_account, wim_accounts, session):
return ifaces_forcing_vim_network
def _update_vnfrs_from_nsd(self, nsr):
+ step = "Getting vnf_profiles from nsd" # first step must be defined outside try
try:
nsr_id = nsr["_id"]
nsd = nsr["nsd"]
- step = "Getting vnf_profiles from nsd"
vnf_profiles = nsd.get("df", [{}])[0].get("vnf-profile", ())
vld_fixed_ip_connection_point_data = {}
for cpd in vlc.get("constituent-cpd-id", ()):
if cpd.get("ip-address"):
step = "Storing ip-address info"
- vld_fixed_ip_connection_point_data.update({vlc.get("virtual-link-profile-id") + '.' + cpd.get("constituent-base-element-id"): {
- "vnfd-connection-point-ref": cpd.get(
- "constituent-cpd-id"),
- "ip-address": cpd.get(
- "ip-address")}})
+ vld_fixed_ip_connection_point_data.update(
+ {
+ vlc.get("virtual-link-profile-id")
+ + "."
+ + cpd.get("constituent-base-element-id"): {
+ "vnfd-connection-point-ref": cpd.get(
+ "constituent-cpd-id"
+ ),
+ "ip-address": cpd.get("ip-address"),
+ }
+ }
+ )
# Inserting ip address to vnfr
if len(vld_fixed_ip_connection_point_data) > 0:
vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
for item in vld_fixed_ip_connection_point_data.keys():
step = "Filtering vnfrs"
- vnfr = next(filter(lambda vnfr: vnfr["member-vnf-index-ref"] == item.split('.')[1], vnfrs), None)
+ vnfr = next(
+ filter(
+ lambda vnfr: vnfr["member-vnf-index-ref"]
+ == item.split(".")[1],
+ vnfrs,
+ ),
+ None,
+ )
if vnfr:
vnfr_update = {}
for vdur_index, vdur in enumerate(vnfr["vdur"]):
for iface_index, iface in enumerate(vdur["interfaces"]):
step = "Looking for matched interface"
if (
- iface.get("external-connection-point-ref")
- == vld_fixed_ip_connection_point_data[item].get("vnfd-connection-point-ref") and
- iface.get("ns-vld-id") == item.split('.')[0]
-
+ iface.get("external-connection-point-ref")
+ == vld_fixed_ip_connection_point_data[item].get(
+ "vnfd-connection-point-ref"
+ )
+ and iface.get("ns-vld-id") == item.split(".")[0]
):
vnfr_update_text = "vdur.{}.interfaces.{}".format(
vdur_index, iface_index
step = "Storing info in order to update vnfr"
vnfr_update[
vnfr_update_text + ".ip-address"
- ] = increment_ip_mac(
- vld_fixed_ip_connection_point_data[item].get("ip-address"),
- vdur.get("count-index", 0), )
+ ] = increment_ip_mac(
+ vld_fixed_ip_connection_point_data[item].get(
+ "ip-address"
+ ),
+ vdur.get("count-index", 0),
+ )
vnfr_update[vnfr_update_text + ".fixed-ip"] = True
step = "updating vnfr at database"
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, vnfr_update)
except (
- ValidationError,
- EngineException,
- DbException,
- MsgException,
- FsException,
+ ValidationError,
+ EngineException,
+ DbException,
+ MsgException,
+ FsException,
) as e:
raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
HTTPStatus.CONFLICT,
)
self._check_ns_operation(session, nsr, operation, indata)
- if (indata.get("primitive_params")):
+ if indata.get("primitive_params"):
indata["primitive_params"] = json.dumps(indata["primitive_params"])
- elif (indata.get("additionalParamsForVnf")):
- indata["additionalParamsForVnf"] = json.dumps(indata["additionalParamsForVnf"])
+ elif indata.get("additionalParamsForVnf"):
+ indata["additionalParamsForVnf"] = json.dumps(
+ indata["additionalParamsForVnf"]
+ )
if operation == "instantiate":
self._update_vnfrs_from_nsd(nsr)
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
nsd = self.db.get_one("nsds", {"_id": nsr["nsd-id"]})
ns_request = nsr["instantiate_params"]
- vnfr = self.db.get_one("vnfrs", {"_id": indata["changeVnfPackageData"]["vnfInstanceId"]})
+ vnfr = self.db.get_one(
+ "vnfrs", {"_id": indata["changeVnfPackageData"]["vnfInstanceId"]}
+ )
latest_vnfd_revision = vnfd["_admin"].get("revision", 1)
vnfr_vnfd_revision = vnfr.get("revision", 1)
if latest_vnfd_revision != vnfr_vnfd_revision:
old_vnfd_id = vnfd_id + ":" + str(vnfr_vnfd_revision)
- old_db_vnfd = self.db.get_one("vnfds_revisions", {"_id": old_vnfd_id})
+ old_db_vnfd = self.db.get_one(
+ "vnfds_revisions", {"_id": old_vnfd_id}
+ )
old_sw_version = old_db_vnfd.get("software-version", "1.0")
new_sw_version = vnfd.get("software-version", "1.0")
if new_sw_version != old_sw_version:
vnf_index = vnfr["member-vnf-index-ref"]
self.logger.info("nsr {}".format(nsr))
for vdu in vnfd["vdu"]:
- self.nsrtopic._add_flavor_to_nsr(vdu, vnfd, nsr, vnf_index, latest_vnfd_revision)
+ self.nsrtopic._add_flavor_to_nsr(
+ vdu, vnfd, nsr, vnf_index, latest_vnfd_revision
+ )
sw_image_id = vdu.get("sw-image-desc")
if sw_image_id:
- image_data = self.nsrtopic._get_image_data_from_vnfd(vnfd, sw_image_id)
+ image_data = self.nsrtopic._get_image_data_from_vnfd(
+ vnfd, sw_image_id
+ )
self.nsrtopic._add_image_to_nsr(nsr, image_data)
for alt_image in vdu.get("alternative-sw-image-desc", ()):
- image_data = self.nsrtopic._get_image_data_from_vnfd(vnfd, alt_image)
+ image_data = self.nsrtopic._get_image_data_from_vnfd(
+ vnfd, alt_image
+ )
self.nsrtopic._add_image_to_nsr(nsr, image_data)
nsr_update["image"] = nsr["image"]
nsr_update["flavor"] = nsr["flavor"]
self.db.set_one("nsrs", {"_id": nsr["_id"]}, nsr_update)
- ns_k8s_namespace = self.nsrtopic._get_ns_k8s_namespace(nsd, ns_request, session)
- vnfr_descriptor = self.nsrtopic._create_vnfr_descriptor_from_vnfd(
- nsd,
- vnfd,
- vnfd_id,
- vnf_index,
- nsr,
- ns_request,
- ns_k8s_namespace,
- latest_vnfd_revision,
+ ns_k8s_namespace = self.nsrtopic._get_ns_k8s_namespace(
+ nsd, ns_request, session
+ )
+ vnfr_descriptor = (
+ self.nsrtopic._create_vnfr_descriptor_from_vnfd(
+ nsd,
+ vnfd,
+ vnfd_id,
+ vnf_index,
+ nsr,
+ ns_request,
+ ns_k8s_namespace,
+ latest_vnfd_revision,
+ )
)
indata["newVdur"] = vnfr_descriptor["vdur"]
nslcmop_desc = self._create_nslcmop(nsInstanceId, operation, indata)
additional_params[k] = "!!yaml " + safe_dump(v)
return additional_params
- def _check_descriptor_dependencies(self, session, descriptor):
- """
- Check that the dependent descriptors exist on a new descriptor or edition
- :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param descriptor: descriptor to be inserted or edit
- :return: None or raises exception
- """
- if not descriptor.get("nst-ref"):
- return
- nstd_id = descriptor["nst-ref"]
- if not self.get_item_list(session, "nsts", {"id": nstd_id}):
- raise EngineException(
- "Descriptor error at nst-ref='{}' references a non exist nstd".format(
- nstd_id
- ),
- http_code=HTTPStatus.CONFLICT,
- )
-
def check_conflict_on_del(self, session, _id, db_content):
"""
Check that NSI is not instantiated
:return: the _id of nsi descriptor created at database
"""
+ step = "checking quotas" # first step must be defined outside try
try:
- step = "checking quotas"
self.check_quota(session)
step = ""
self.db.create("nsis", nsi_descriptor)
rollback.append({"topic": "nsis", "_id": nsi_id})
return nsi_id, None
+ except ValidationError as e:
+ raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
except Exception as e: # TODO remove try Except, it is captured at nbi.py
self.logger.exception(
"Exception {} at NsiTopic.new()".format(e), exc_info=True
)
raise EngineException("Error {}: {}".format(step, e))
- except ValidationError as e:
- raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
raise EngineException(
},
"verticalscale": {
"METHODS": ("POST",),
- "ROLE_PERMISSION": "ns_instances:id:verticalscale:"
- },
+ "ROLE_PERMISSION": "ns_instances:id:verticalscale:",
+ },
},
},
"ns_lcm_op_occs": {
},
"vnflcm": {
"v1": {
- "vnf_instances": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vnflcm_instances:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "vnflcm_instances:id:",
- "scale": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "vnflcm_instances:id:scale:"
- },
- "terminate": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "vnflcm_instances:id:terminate:"
- },
- "instantiate": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "vnflcm_instances:id:instantiate:"
- },
- }
- },
- "vnf_lcm_op_occs": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:opps:",
- "<ID>": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:opps:id:"
- },
- },
- "subscriptions": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vnflcm_subscriptions:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "vnflcm_subscriptions:id:"
- }
- },
+ "vnf_instances": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnflcm_instances:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "vnflcm_instances:id:",
+ "scale": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:scale:",
+ },
+ "terminate": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:terminate:",
+ },
+ "instantiate": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:instantiate:",
+ },
+ },
+ },
+ "vnf_lcm_op_occs": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:opps:",
+ "<ID>": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:opps:id:",
+ },
+ },
+ "subscriptions": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:id:",
+ },
+ },
}
},
"nst": {
},
"nsfm": {
"v1": {
- "alarms": {"METHODS": ("GET", "PATCH"),
- "ROLE_PERMISSION": "alarms:",
- "<ID>": {"METHODS": ("GET", "PATCH"),
- "ROLE_PERMISSION": "alarms:id:",
- },
- }
+ "alarms": {
+ "METHODS": ("GET", "PATCH"),
+ "ROLE_PERMISSION": "alarms:",
+ "<ID>": {
+ "METHODS": ("GET", "PATCH"),
+ "ROLE_PERMISSION": "alarms:id:",
+ },
+ }
},
},
}
self.engine = Engine(self.authenticator)
def _format_in(self, kwargs):
+ error_text = "" # error_text must be initialized outside try
try:
indata = None
if cherrypy.request.body.length:
# NS Fault Management
@cherrypy.expose
- def nsfm(self, version=None, topic=None, uuid=None, project_name=None, ns_id=None, *args, **kwargs):
- if topic == 'alarms':
+ def nsfm(
+ self,
+ version=None,
+ topic=None,
+ uuid=None,
+ project_name=None,
+ ns_id=None,
+ *args,
+ **kwargs
+ ):
+ if topic == "alarms":
try:
method = cherrypy.request.method
- role_permission = self._check_valid_url_method(method, "nsfm", version, topic, None, None, *args)
- query_string_operations = self._extract_query_string_operations(kwargs, method)
+ role_permission = self._check_valid_url_method(
+ method, "nsfm", version, topic, None, None, *args
+ )
+ query_string_operations = self._extract_query_string_operations(
+ kwargs, method
+ )
- self.authenticator.authorize(role_permission, query_string_operations, None)
+ self.authenticator.authorize(
+ role_permission, query_string_operations, None
+ )
# to handle get request
- if cherrypy.request.method == 'GET':
+ if cherrypy.request.method == "GET":
# if request is on basis of uuid
- if uuid and uuid != 'None':
+ if uuid and uuid != "None":
try:
alarm = self.engine.db.get_one("alarms", {"uuid": uuid})
- alarm_action = self.engine.db.get_one("alarms_action", {"uuid": uuid})
+ alarm_action = self.engine.db.get_one(
+ "alarms_action", {"uuid": uuid}
+ )
alarm.update(alarm_action)
- vnf = self.engine.db.get_one("vnfrs", {"nsr-id-ref": alarm["tags"]["ns_id"]})
+ vnf = self.engine.db.get_one(
+ "vnfrs", {"nsr-id-ref": alarm["tags"]["ns_id"]}
+ )
alarm["vnf-id"] = vnf["_id"]
return self._format_out(str(alarm))
except Exception:
return self._format_out("Please provide valid alarm uuid")
- elif ns_id and ns_id != 'None':
+ elif ns_id and ns_id != "None":
# if request is on basis of ns_id
try:
- alarms = self.engine.db.get_list("alarms", {"tags.ns_id": ns_id})
+ alarms = self.engine.db.get_list(
+ "alarms", {"tags.ns_id": ns_id}
+ )
for alarm in alarms:
- alarm_action = self.engine.db.get_one("alarms_action", {"uuid": alarm['uuid']})
+ alarm_action = self.engine.db.get_one(
+ "alarms_action", {"uuid": alarm["uuid"]}
+ )
alarm.update(alarm_action)
return self._format_out(str(alarms))
except Exception:
return self._format_out("Please provide valid ns id")
else:
# to return only alarm which are related to given project
- project = self.engine.db.get_one("projects", {"name": project_name})
- project_id = project.get('_id')
- ns_list = self.engine.db.get_list("nsrs", {"_admin.projects_read": project_id})
+ project = self.engine.db.get_one(
+ "projects", {"name": project_name}
+ )
+ project_id = project.get("_id")
+ ns_list = self.engine.db.get_list(
+ "nsrs", {"_admin.projects_read": project_id}
+ )
ns_ids = []
for ns in ns_list:
ns_ids.append(ns.get("_id"))
alarms = self.engine.db.get_list("alarms")
- alarm_list = [alarm for alarm in alarms if alarm["tags"]["ns_id"] in ns_ids]
+ alarm_list = [
+ alarm
+ for alarm in alarms
+ if alarm["tags"]["ns_id"] in ns_ids
+ ]
for alrm in alarm_list:
- action = self.engine.db.get_one("alarms_action", {"uuid": alrm.get("uuid")})
+ action = self.engine.db.get_one(
+ "alarms_action", {"uuid": alrm.get("uuid")}
+ )
alrm.update(action)
return self._format_out(str(alarm_list))
# to handle patch request for alarm update
- elif cherrypy.request.method == 'PATCH':
+ elif cherrypy.request.method == "PATCH":
data = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
try:
# check if uuid is valid
return self._format_out("Please provide valid alarm uuid.")
if data.get("is_enable") is not None:
if data.get("is_enable"):
- alarm_status = 'ok'
+ alarm_status = "ok"
else:
- alarm_status = 'disabled'
- self.engine.db.set_one("alarms", {"uuid": data.get("uuid")},
- {"alarm_status": alarm_status})
+ alarm_status = "disabled"
+ self.engine.db.set_one(
+ "alarms",
+ {"uuid": data.get("uuid")},
+ {"alarm_status": alarm_status},
+ )
else:
- self.engine.db.set_one("alarms", {"uuid": data.get("uuid")},
- {"threshold": data.get("threshold")})
+ self.engine.db.set_one(
+ "alarms",
+ {"uuid": data.get("uuid")},
+ {"threshold": data.get("threshold")},
+ )
return self._format_out("Alarm updated")
except Exception as e:
- cherrypy.response.status = e.http_code.value
- if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException,
- ValidationError, AuthconnException)):
+ if isinstance(
+ e,
+ (
+ NbiException,
+ EngineException,
+ DbException,
+ FsException,
+ MsgException,
+ AuthException,
+ ValidationError,
+ AuthconnException,
+ ),
+ ):
http_code_value = cherrypy.response.status = e.http_code.value
http_code_name = e.http_code.name
cherrypy.log("Exception {}".format(e))
else:
- http_code_value = cherrypy.response.status = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
+ http_code_value = (
+ cherrypy.response.status
+ ) = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
cherrypy.log("CRITICAL: Exception {}".format(e), traceback=True)
http_code_name = HTTPStatus.BAD_REQUEST.name
problem_details = {
outdata = token_info = self.authenticator.new_token(
token_info, indata, cherrypy.request.remote
)
- cherrypy.session["Authorization"] = outdata["_id"]
+ cherrypy.session["Authorization"] = outdata["_id"] # pylint: disable=E1101
self._set_location_header("admin", "v1", "tokens", outdata["_id"])
# for logging
self._format_login(token_info)
# password expiry check
if self.authenticator.check_password_expiry(outdata):
- outdata = {"id": outdata["id"],
- "message": "change_password",
- "user_id": outdata["user_id"]
- }
+ outdata = {
+ "id": outdata["id"],
+ "message": "change_password",
+ "user_id": outdata["user_id"],
+ }
# cherrypy.response.cookie["Authorization"] = outdata["id"]
# cherrypy.response.cookie["Authorization"]['expires'] = 3600
elif method == "DELETE":
token_id = token_info["_id"]
outdata = self.authenticator.del_token(token_id)
token_info = None
- cherrypy.session["Authorization"] = "logout"
+ cherrypy.session["Authorization"] = "logout" # pylint: disable=E1101
# cherrypy.response.cookie["Authorization"] = token_id
# cherrypy.response.cookie["Authorization"]['expires'] = 0
else:
elif args and args[0] == "init":
try:
# self.engine.load_dbase(cherrypy.request.app.config)
- self.engine.create_admin()
+ pid = self.authenticator.create_admin_project()
+ self.authenticator.create_admin_user(pid)
return "Done. User 'admin', password 'admin' created"
except Exception:
cherrypy.response.status = HTTPStatus.FORBIDDEN.value
+ " headers: {}\n".format(cherrypy.request.headers)
+ " path_info: {}\n".format(cherrypy.request.path_info)
+ " query_string: {}\n".format(cherrypy.request.query_string)
- + " session: {}\n".format(cherrypy.session)
+ + " session: {}\n".format(cherrypy.session) # pylint: disable=E1101
+ " cookie: {}\n".format(cherrypy.request.cookie)
+ " method: {}\n".format(cherrypy.request.method)
- + " session: {}\n".format(cherrypy.session.get("fieldname"))
+ + " session: {}\n".format(
+ cherrypy.session.get("fieldname") # pylint: disable=E1101
+ )
+ " body:\n"
)
return_text += " length: {}\n".format(cherrypy.request.body.length)
filter_q = None
if "vcaStatusRefresh" in kwargs:
filter_q = {"vcaStatusRefresh": kwargs["vcaStatusRefresh"]}
- outdata = self.engine.get_item(engine_session, engine_topic, _id, filter_q, True)
+ outdata = self.engine.get_item(
+ engine_session, engine_topic, _id, filter_q, True
+ )
elif method == "POST":
cherrypy.response.status = HTTPStatus.CREATED.value
elif topic == "vnf_instances" and item:
indata["lcmOperationType"] = item
indata["vnfInstanceId"] = _id
- _id, _ = self.engine.new_item(rollback, engine_session, "vnflcmops", indata, kwargs)
- self._set_location_header(main_topic, version, "vnf_lcm_op_occs", _id)
+ _id, _ = self.engine.new_item(
+ rollback, engine_session, "vnflcmops", indata, kwargs
+ )
+ self._set_location_header(
+ main_topic, version, "vnf_lcm_op_occs", _id
+ )
outdata = {"id": _id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
else:
self.engine.db.del_list(
rollback_item["topic"],
rollback_item["filter"],
- fail_on_empty=False,
)
else:
self.engine.db.del_one(
except ValueError as e:
cherrypy.log.error("Ignoring environ '{}': " + str(e))
except Exception as e:
- cherrypy.log.warn("skipping environ '{}' on exception '{}'".format(k, e))
+ cherrypy.log(
+ "WARNING: skipping environ '{}' on exception '{}'".format(k, e)
+ )
if update_dict:
cherrypy.config.update(update_dict)
"NsLcmOperationOccurrenceNotification",
"NsChangeNotification",
"NsIdentifierCreationNotification",
- "NsIdentifierDeletionNotification"
+ "NsIdentifierDeletionNotification",
]
filter_q = {
"identifier": [nsd_id, ns_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
- "notificationType": notification_type
- }
+ "notificationType": notification_type,
+ }
if op_state:
filter_q["operationStates"].append(op_state)
if command:
"changedExtConnectivity",
"modificationsTriggeredByVnfPkgChange",
"error",
- "_links"
+ "_links",
},
"VnfIdentifierCreationNotification": {
"id",
"subscriptionId",
"timeStamp",
"vnfInstanceId",
- "_links"
+ "_links",
},
"VnfIdentifierDeletionNotification": {
"id",
"subscriptionId",
"timeStamp",
"vnfInstanceId",
- "_links"
+ "_links",
},
}
"""
return self.response_models
- def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
+ def _format_vnflcm_subscribers(
+ self, subscribers: list, event_details: dict
+ ) -> list:
"""
Formats the raw event details from kafka message and subscriber details.
:param subscribers: A list of subscribers whom the event needs to be notified.
subscriber.update(event_details["params"])
return subscribers
- def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
- event_details: dict) -> list:
+ def get_subscribers(
+ self,
+ vnfd_id: str,
+ vnf_instance_id: str,
+ command: str,
+ op_state: str,
+ event_details: dict,
+ ) -> list:
"""
Queries database and returns list of subscribers.
:param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
notification_type = [
"VnfIdentifierCreationNotification",
"VnfLcmOperationOccurrenceNotification",
- "VnfIdentifierDeletionNotification"
+ "VnfIdentifierDeletionNotification",
]
filter_q = {
"identifier": [vnfd_id, vnf_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
- "notificationType": notification_type
+ "notificationType": notification_type,
}
if op_state:
filter_q["operationStates"].append(op_state)
class BaseMethod:
-
def __init__(self):
"""
Constructor of the base method
class VnfLcmOp2NsLcmOp:
-
def __init__(self, db, fs, msg, auth):
"""
Constructor of Vnf lcm op to Ns lcm op
class NewVnfLcmOp(BaseMethod):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor of new Vnf Lcm Op
:return: id of nsd id
"""
nsr = self.nsrtopic.show(session, vnf_instance_id)
- return nsr['nsd']['_id']
+ return nsr["nsd"]["_id"]
def __get_formatted_indata(self, session, indata):
"""
"vimAccountId": indata["vimAccountId"],
"nsr_id": indata["vnfInstanceId"],
"lcmOperationType": indata["lcmOperationType"],
- "nsInstanceId": indata["vnfInstanceId"]
+ "nsInstanceId": indata["vnfInstanceId"],
}
elif indata["lcmOperationType"] == "terminate":
formatted_indata = {
"lcmOperationType": indata["lcmOperationType"],
- "nsInstanceId": indata["vnfInstanceId"]
+ "nsInstanceId": indata["vnfInstanceId"],
}
elif indata["lcmOperationType"] == "scale":
formatted_indata = {
"scaleVnfType": indata["type"],
"scaleByStepData": {
"scaling-group-descriptor": indata["aspectId"],
- "member-vnf-index": indata["additionalParams"]["member-vnf-index"]
- }
- }
+ "member-vnf-index": indata["additionalParams"][
+ "member-vnf-index"
+ ],
+ },
+ },
}
elif indata["lcmOperationType"] == "action":
formatted_indata = {
"nsInstanceId": indata["vnfInstanceId"],
"member_vnf_index": indata["member_vnf_index"],
"primitive": indata["primitive"],
- "primitive_params": indata["primitive_params"]
+ "primitive_params": indata["primitive_params"],
}
return formatted_indata
nslcmop_rec = self.nslcmoptopic.show(session, op_id)
operation_status = nslcmop_rec["operationState"]
vnfr = self.vnfrtopic.show(session, vnfInstanceId)
- links = {"self": "/osm/vnflcm/v1/vnf_lcm_op_occs/" + op_id,
- "vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfInstanceId}
- params = {"vnfdId": vnfr["vnfd-ref"],
- "vnfInstanceId": vnfInstanceId,
- "operationState": operation_status,
- "vnfLcmOpOccId": op_id,
- "_links": links
- }
+ links = {
+ "self": "/osm/vnflcm/v1/vnf_lcm_op_occs/" + op_id,
+ "vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfInstanceId,
+ }
+ params = {
+ "vnfdId": vnfr["vnfd-ref"],
+ "vnfInstanceId": vnfInstanceId,
+ "operationState": operation_status,
+ "vnfLcmOpOccId": op_id,
+ "_links": links,
+ }
self.msg.write("vnf", operation, params)
return None
class ListVnfLcmOp(BaseMethod):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor call for listing vnf lcm operations
for record in records:
ns_id = record.get("nsInstanceId")
nsr = self.nsrtopic.show(session, ns_id)
- vnfInstance_id = nsr['constituent-vnfr-ref'][0]
+ vnfInstance_id = nsr["constituent-vnfr-ref"][0]
outdata = sol003_projection(record, vnfInstance_id)
list.append(outdata)
return list
class ShowVnfLcmOp(BaseMethod):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor call for showing vnf lcm operation
record = self.nslcmoptopic.show(session, _id, api_req)
ns_id = record.get("nsInstanceId")
nsr = self.nsrtopic.show(session, ns_id)
- vnfinstance_id = nsr['constituent-vnfr-ref'][0]
+ vnfinstance_id = nsr["constituent-vnfr-ref"][0]
outdata = sol003_projection(record, vnfinstance_id)
return outdata
class VnfInstances2NsInstances:
-
def __init__(self, db, fs, msg, auth):
"""
Constructor of Vnf Instances to Ns Instances
"virtual-link-connectivity": [
{
"constituent-cpd-id": [
- {"constituent-base-element-id": 1,
- "constituent-cpd-id": "eth0-ext"}
+ {
+ "constituent-base-element-id": 1,
+ "constituent-cpd-id": "eth0-ext",
+ }
],
"virtual-link-profile-id": "mgmtnet",
}
],
- "vnfd-id": "cirros_vnfd"
+ "vnfd-id": "cirros_vnfd",
}
],
}
formatted_indata = deepcopy(indata)
formatted_indata["nsdId"] = nsd_id
formatted_indata["nsName"] = indata["vnfInstanceName"] + "-ns"
- for invalid_key in ("vnfdId", "vnfInstanceName", "vnfInstanceDescription", "additionalParams"):
+ for invalid_key in (
+ "vnfdId",
+ "vnfInstanceName",
+ "vnfInstanceDescription",
+ "additionalParams",
+ ):
formatted_indata.pop(invalid_key)
return formatted_indata
_id, *others = self.nsdtopic.new(rollback, session, {}, None, headers)
new_nsd = deepcopy(NewVnfInstance.sample_nsd)
vnf_content = {
- "id":"default-df",
- "vnf-profile": [
+ "id": "default-df",
+ "vnf-profile": [
{
- "id": "1",
- "virtual-link-connectivity": [
- {
- "constituent-cpd-id": [
+ "id": "1",
+ "virtual-link-connectivity": [
{
- "constituent-base-element-id": "1",
- "constituent-cpd-id": indata["additionalParams"]["constituent-cpd-id"]
+ "constituent-cpd-id": [
+ {
+ "constituent-base-element-id": "1",
+ "constituent-cpd-id": indata["additionalParams"][
+ "constituent-cpd-id"
+ ],
+ }
+ ],
+ "virtual-link-profile-id": indata["additionalParams"][
+ "virtual-link-profile-id"
+ ],
}
- ],
- "virtual-link-profile-id": indata["additionalParams"]["virtual-link-profile-id"]
- }
- ],
- "vnfd-id": indata["vnfdId"]
+ ],
+ "vnfd-id": indata["vnfdId"],
}
- ]
+ ],
}
new_nsd["nsd"]["nsd"][0] = {
"description": indata["vnfInstanceDescription"],
"id": indata["vnfdId"] + "-ns",
"name": indata["vnfdId"] + "-ns",
"version": "1.0",
- "df": [vnf_content, ],
+ "df": [
+ vnf_content,
+ ],
"virtual-link-desc": indata["additionalParams"]["virtual-link-desc"],
- "vnfd-id": [indata["vnfdId"]]
+ "vnfd-id": [indata["vnfdId"]],
}
return _id, new_nsd
"""
return self.nsrtopic.new(rollback, session, indata, kwargs, headers)
- def __action_pre_processing(self, rollback, session, indata=None, kwargs=None, headers=None):
+ def __action_pre_processing(
+ self, rollback, session, indata=None, kwargs=None, headers=None
+ ):
"""
Pre process for creating new vnf instance
:param rollback: list to append the created items at database in case a rollback must be done
nsd_id, nsd = self.__create_nsd(rollback, session, indata, kwargs, headers)
self.nsdtopic.upload_content(session, nsd_id, nsd, kwargs, headers)
formatted_indata = NewVnfInstance.__get_formatted_indata(indata, nsd_id)
- nsr_id, _ = self.__create_nsr(rollback, session, formatted_indata, kwargs, headers)
+ nsr_id, _ = self.__create_nsr(
+ rollback, session, formatted_indata, kwargs, headers
+ )
nsr = self.nsrtopic.show(session, nsr_id)
- vnfr_id = nsr['constituent-vnfr-ref'][0]
+ vnfr_id = nsr["constituent-vnfr-ref"][0]
if vnfr_id:
links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + vnfr_id}
indata["vnfInstanceId"] = vnfr_id
class ListVnfInstance(BaseMethod):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor call for listing vnfs
class ShowVnfInstance(BaseMethod):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor call for showing vnf lcm operation
class DeleteVnfInstance(BaseMethod):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor call for deleting vnf
vnfr = self.vnfrtopic.show(session, vnfInstanceId)
ns_id = vnfr.get("nsr-id-ref")
nsr = self.nsrtopic.show(session, ns_id)
- nsd_to_del = nsr['nsd']['_id']
+ nsd_to_del = nsr["nsd"]["_id"]
links = {"vnfInstance": "/osm/vnflcm/v1/vnf_instances/" + _id}
- params = {"vnfdId": vnfr["vnfd-ref"],
- "vnfInstanceId": _id,
- "_links": links}
+ params = {"vnfdId": vnfr["vnfd-ref"], "vnfInstanceId": _id, "_links": links}
self.msg.write("vnf", "delete", params)
self.nsrtopic.delete(session, ns_id, dry_run, not_send_msg)
return self.nsdtopic.delete(session, nsd_to_del, dry_run, not_send_msg)
from osm_nbi.subscription_topics import CommonSubscriptions
from osm_nbi.validation import vnf_subscription
+
class VnflcmSubscriptionsTopic(CommonSubscriptions):
schema_new = vnf_subscription
+
def _subscription_mapper(self, _id, data, table):
"""
Performs data transformation on subscription request
formatted_data = []
formed_data = {
"reference": data.get("_id"),
- "CallbackUri": data.get("CallbackUri")
+ "CallbackUri": data.get("CallbackUri"),
}
if data.get("authentication"):
formed_data.update({"authentication": data.get("authentication")})
formatted_data.append(update_dict)
elif elem == "VnfLcmOperationOccurrenceNotification":
if "operationTypes" in data["filter"].keys():
- update_dict["operationTypes"] = data["filter"]["operationTypes"]
+ update_dict["operationTypes"] = data["filter"][
+ "operationTypes"
+ ]
else:
update_dict["operationTypes"] = "ANY"
if "operationStates" in data["filter"].keys():
- update_dict["operationStates"] = data["filter"]["operationStates"]
+ update_dict["operationStates"] = data["filter"][
+ "operationStates"
+ ]
else:
update_dict["operationStates"] = "ANY"
formatted_data.append(update_dict)
topic = "subscriptions"
topic_msg = None
+ def _subscription_mapper(self, _id, data, table):
+ """
+ Performs data transformation on subscription request
+ :param data: data to be trasformed
+ :param table: table in which transformed data are inserted
+ """
+ pass
+
def format_subscription(self, subs_data):
"""
Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
else:
op_state = params["operationState"]
event_details = {
- "topic": topic,
- "command": command.upper(),
- "params": params,
- }
+ "topic": topic,
+ "command": command.upper(),
+ "params": params,
+ }
subscribers = self.vnflcm.get_subscribers(
- vnfd_id,
- vnf_instance_id,
- command.upper(),
- op_state,
- event_details
- )
+ vnfd_id,
+ vnf_instance_id,
+ command.upper(),
+ op_state,
+ event_details,
+ )
if subscribers:
asyncio.ensure_future(
- self.vnflcm.send_notifications(
- subscribers, loop=self.loop
- ),
- loop=self.loop
- )
+ self.vnflcm.send_notifications(subscribers, loop=self.loop),
+ loop=self.loop,
+ )
elif topic == "nsi":
if command == "terminated" and params["operationState"] in (
"COMPLETED",
location = r.headers.get("Location")
if location:
- _id = location[location.rfind("/") + 1:]
+ _id = location[location.rfind("/") + 1 :]
if _id:
self.last_id = str(_id)
if not pooling:
)
vim_data = (
"{{schema_version: '1.0', name: '{}', vim_type: {}, vim_url: '{}',"
- "vim_tenant_name: '{}', " "vim_user: {}, vim_password: {}"
+ "vim_tenant_name: '{}', "
+ "vim_user: {}, vim_password: {}"
).format(
vim_name,
os.environ.get("OSMNBITEST_VIM_TYPE", "openstack"),
"vnfd_2vdu_set_ip_mac.yaml",
)
self.nsd_filename = "scenario_2vdu_set_ip_mac.yaml"
- self.descriptor_url = (
- "https://osm.etsi.org/gitweb/?p=osm/RO.git;a=blob_plain;f=test/RO_tests/v3_2vdu_set_ip_mac/"
- )
+ self.descriptor_url = "https://osm.etsi.org/gitweb/?p=osm/RO.git;a=blob_plain;f=test/RO_tests/v3_2vdu_set_ip_mac/"
self.commands = {
"1": [
"ls -lrt",
},
)
content = self.auth.update_user.call_args[0][0]
- self.assertEqual(content["old_password"], old_password, "Wrong old password")
+ self.assertEqual(
+ content["old_password"], old_password, "Wrong old password"
+ )
self.assertEqual(content["password"], new_pasw, "Wrong user password")
def test_delete_user(self):
)
test_vnfd["_id"] = did
test_vnfd["extra-property"] = 0
- self.db.get_one.side_effect = lambda table, filter, fail_on_empty=None, fail_on_more=None: {
- "_id": did,
- "_admin": deepcopy(db_vnfd_content["_admin"]),
- }
+ self.db.get_one.side_effect = (
+ lambda table, filter, fail_on_empty=None, fail_on_more=None: {
+ "_id": did,
+ "_admin": deepcopy(db_vnfd_content["_admin"]),
+ }
+ )
with self.assertRaises(
EngineException, msg="Accepted VNFD with an additional property"
{"ns-configuration": {"config-primitive": [{"name": "del-user"}]}}
)
-
with self.assertNotRaises(EngineException):
self.topic._validate_descriptor_changes(
old_nsd["_id"], descriptor_name, "/tmp", "/tmp:1"
from contextlib import contextmanager
import unittest
from time import time
-from unittest.mock import Mock, mock_open # patch, MagicMock
+from unittest.mock import Mock, mock_open # patch, MagicMock
from osm_common.dbbase import DbException
from osm_nbi.engine import EngineException
from osm_common.dbmemory import DbMemory
"lcmOperationType": "update",
"updateType": "REMOVE_VNF",
"nsInstanceId": self.nsr_id,
- "removeVnfInstanceId": vnfr_id
+ "removeVnfInstanceId": vnfr_id,
}
session = {
)
def test_migrate(self):
- vnfr_id = self.db.get_list("vnfrs")[0]["_id"]
+ _ = self.db.get_list("vnfrs")[0]["_id"]
session = {}
self.db.set_one(
"nsrs",
indata = {
"lcmOperationType": "migrate",
"nsInstanceId": self.nsr_id,
- "migrateToHost":"sample02",
- "vdu": {
- "vduCountIndex": 0,
- "vduId": "mgmtVM"
- },
- "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f"
+ "migrateToHost": "sample02",
+ "vdu": {"vduCountIndex": 0, "vduId": "mgmtVM"},
+ "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f",
}
nslcmop_id, _ = self.nslcmop_topic.new(
rollback, session, indata, kwargs=None, headers=headers
indata = {
"lcmOperationType": "migrate",
"nsInstanceId": self.nsr_id,
- "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f"
+ "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f",
}
nslcmop_id, _ = self.nslcmop_topic.new(
rollback, session, indata, kwargs=None, headers=headers
indata = {
"lcmOperationType": "migrate",
"nsInstanceId": self.nsr_id,
- "migrateToHost":"sample02",
- "vdu": {
- "vduCountIndex": 0
- },
- "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f"
+ "migrateToHost": "sample02",
+ "vdu": {"vduCountIndex": 0},
+ "vnfInstanceId": "9e8006df-cdfa-4f63-bf6a-fce860d71c1f",
}
with self.assertRaises(Exception) as e:
nslcmop_id, _ = self.nslcmop_topic.new(
- rollback, session, indata, kwargs=None, headers=headers
+ rollback, session, indata, kwargs=None, headers=headers
+ )
+ self.assertTrue(
+ "Format error at 'vdu' ''vduId' is a required property'"
+ in str(e.exception)
)
- self.assertTrue("Format error at 'vdu' ''vduId' is a required property'" in str(e.exception))
class TestNsLcmOpTopicWithMock(unittest.TestCase):
test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
self.db.get_one.side_effect = [test_vnfr, test_vnfd]
- vnfr = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr['_id'])
- self.assertEqual(self.db.get_one.call_args_list[0][0][0], 'vnfrs', "Incorrect first DB lookup")
- self.assertEqual(self.db.get_one.call_args_list[1][0][0], 'vnfds', "Incorrect second DB lookup")
+ _ = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr["_id"])
+ self.assertEqual(
+ self.db.get_one.call_args_list[0][0][0],
+ "vnfrs",
+ "Incorrect first DB lookup",
+ )
+ self.assertEqual(
+ self.db.get_one.call_args_list[1][0][0],
+ "vnfds",
+ "Incorrect second DB lookup",
+ )
def test_get_vnfd_from_vnf_member_no_revision(self):
test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
- test_vnfr['revision'] = 3
+ test_vnfr["revision"] = 3
test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
self.db.get_one.side_effect = [test_vnfr, test_vnfd]
- vnfr = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr['_id'])
- self.assertEqual(self.db.get_one.call_args_list[0][0][0], 'vnfrs', "Incorrect first DB lookup")
- self.assertEqual(self.db.get_one.call_args_list[1][0][0], 'vnfds_revisions', "Incorrect second DB lookup")
+ _ = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr["_id"])
+ self.assertEqual(
+ self.db.get_one.call_args_list[0][0][0],
+ "vnfrs",
+ "Incorrect first DB lookup",
+ )
+ self.assertEqual(
+ self.db.get_one.call_args_list[1][0][0],
+ "vnfds_revisions",
+ "Incorrect second DB lookup",
+ )
@contextmanager
def assertNotRaises(self, exception_type):
"VNF instance: 88d90b0c-faff-4b9f-bccd-017f33985984",
)
- with self.subTest(i=5, t="Ns update REMOVE_VNF request validated with no exception"):
+ with self.subTest(
+ i=5, t="Ns update REMOVE_VNF request validated with no exception"
+ ):
test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)
test_vnfr[0]["revision"] = 2
test_nsr = yaml.load(db_nsrs_text, Loader=yaml.Loader)
with self.assertNotRaises(EngineException):
self.nslcmop_topic._check_ns_operation(session, nsrs, "update", indata)
+
class TestNsrTopic(unittest.TestCase):
def setUp(self):
self.db = DbMemory()
self.assertTrue(e.exception.http_code == expect_code)
if expect_text_list:
for expect_text in expect_text_list:
- self.assertIn(expect_text, str(e.exception).lower(),
- "Expected '{}' at exception text".format(expect_text))
+ self.assertIn(
+ expect_text,
+ str(e.exception).lower(),
+ "Expected '{}' at exception text".format(expect_text),
+ )
def test_show_instance(self):
- session = {"force": False, "admin": False, "public": False, "project_id": [self.nsd_project], "method": "write"}
+ session = {
+ "force": False,
+ "admin": False,
+ "public": False,
+ "project_id": [self.nsd_project],
+ "method": "write",
+ }
filter_q = {}
for refresh_status in ("true", "false"):
self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
actual_nsr = self.db.get_list("nsrs")[0]
nsr_id = actual_nsr["_id"]
- filter_q['vcaStatus-refresh'] = refresh_status
+ filter_q["vcaStatus-refresh"] = refresh_status
expected_nsr = self.nsr_topic.show(session, nsr_id, filter_q=filter_q)
self.nsr_topic.delete(session, nsr_id)
actual_nsr.pop("_admin")
expected_nsr.pop("_admin")
- self.assertEqual(expected_nsr, actual_nsr, "Database nsr and show() nsr do not match.")
+ self.assertEqual(
+ expected_nsr, actual_nsr, "Database nsr and show() nsr do not match."
+ )
def test_vca_status_refresh(self):
- session = {"force": False, "admin": False, "public": False, "project_id": [self.nsd_project], "method": "write"}
- filter_q = {'vcaStatus-refresh': 'true'}
+ session = {
+ "force": False,
+ "admin": False,
+ "public": False,
+ "project_id": [self.nsd_project],
+ "method": "write",
+ }
+ filter_q = {"vcaStatus-refresh": "true"}
time_delta = 120
self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
nsr = self.db.get_list("nsrs")[0]
# When vcaStatus-refresh is true
- filter_q['vcaStatus-refresh'] = "true"
+ filter_q["vcaStatus-refresh"] = "true"
self.nsr_topic.vca_status_refresh(session, nsr, filter_q)
msg_args = self.msg.write.call_args[0]
self.assertEqual(msg_args[1], "vca_status_refresh", "Wrong message action")
self.assertGreater(nsr["_admin"]["modified"], time() - time_delta)
# When vcaStatus-refresh is false but modified time is within threshold
- filter_q['vcaStatus-refresh'] = "false"
+ filter_q["vcaStatus-refresh"] = "false"
time_now = time()
nsr["_admin"]["modified"] = time_now
self.nsr_topic.vca_status_refresh(session, nsr, filter_q)
msg_args = self.msg.write.call_args[1]
self.assertEqual(msg_args, {}, "Message should not be sent.")
- self.assertEqual(nsr["_admin"]["modified"], time_now, "Modified time should not be changed.")
+ self.assertEqual(
+ nsr["_admin"]["modified"], time_now, "Modified time should not be changed."
+ )
# When vcaStatus-refresh is false but modified time is less than threshold
- filter_q['vcaStatus-refresh'] = "false"
- nsr["_admin"]["modified"] = time() - (2*time_delta)
+ filter_q["vcaStatus-refresh"] = "false"
+ nsr["_admin"]["modified"] = time() - (2 * time_delta)
self.nsr_topic.vca_status_refresh(session, nsr, filter_q)
msg_args = self.msg.write.call_args[0]
self.assertEqual(msg_args[1], "vca_status_refresh", "Wrong message action")
self.nsr_topic.delete(session, nsr["_id"])
- self.assertGreater(nsr["_admin"]["modified"], time() - time_delta, "Modified time is not changed.")
+ self.assertGreater(
+ nsr["_admin"]["modified"],
+ time() - time_delta,
+ "Modified time is not changed.",
+ )
def test_delete_ns(self):
self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
db_nsds_text,
db_nsrs_text,
db_vnfrs_text,
- db_nslcmops_text
+ db_nslcmops_text,
)
import yaml
"vnfInstanceDescription": "vnf instance description",
"vimAccountId": self.vim_id,
"additionalParams": {
- "virtual-link-desc": [
- {
- "id": "mgmt-net",
- "mgmt-network": True
- }
- ],
+ "virtual-link-desc": [{"id": "mgmt-net", "mgmt-network": True}],
"constituent-cpd-id": "vnf-cp0-ext",
- "virtual-link-profile-id": "mgmt-net"
- }
+ "virtual-link-profile-id": "mgmt-net",
+ },
}
rollback = []
- headers = {}
self.fs.path = ""
self.fs.get_params.return_value = {}
self.fs.file_exists.return_value = False
self.fs.file_open.side_effect = lambda path, mode: open(
- "/tmp/" + str(uuid4()), "a+b"
- )
+ "/tmp/" + str(uuid4()), "a+b"
+ )
vnfr_id, _ = self.vnfinstances.new(
rollback, session, indata, {}, headers={"Content-Type": []}
)
vnfr = self.db.get_one("vnfrs")
self.assertEqual(
- vnfr_id,
- vnfr["id"],
- "Mismatch between return id and database id"
- )
+ vnfr_id, vnfr["id"], "Mismatch between return id and database id"
+ )
self.assertEqual(
- "NOT_INSTANTIATED",
- vnfr["_admin"]["nsState"],
- "Database record must contain 'nsState' NOT_INSTANTIATED"
- )
+ "NOT_INSTANTIATED",
+ vnfr["_admin"]["nsState"],
+ "Database record must contain 'nsState' NOT_INSTANTIATED",
+ )
self.assertEqual(
- self.vnfd_id,
- vnfr["vnfd-ref"],
- "vnfr record is not properly created for the given vnfd")
+ self.vnfd_id,
+ vnfr["vnfd-ref"],
+ "vnfr record is not properly created for the given vnfd",
+ )
def test_show_vnfinstance(self):
session = {
self.assertEqual(
actual_vnfr["_id"],
expected_vnfr["_id"],
- "Mismatch between return vnfr Id and database vnfr Id"
+ "Mismatch between return vnfr Id and database vnfr Id",
)
def test_delete_vnfinstance(self):
"vnfName": "vnf_instance_name",
"vnfDescription": "vnf instance description",
"vnfId": self.vnfd_id,
- "vimAccountId": self.vim_id
+ "vimAccountId": self.vim_id,
}
rollback = []
headers = {}
vnfr_id = vnfr["_id"]
vnflcmop = self.vnflcmop_topic.show(session, id, filter_q)
_id = vnflcmop["vnfInstanceId"]
- self.assertEqual(_id, vnfr_id, "Mismatch between vnflcmop's vnfInstanceId and database vnfr's id")
+ self.assertEqual(
+ _id,
+ vnfr_id,
+ "Mismatch between vnflcmop's vnfInstanceId and database vnfr's id",
+ )
}
ipv6_schema = {
"type": "string",
- "pattern": "(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))",
+ "pattern": "(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))", # noqa: W605
}
ip_prefix_schema = {
"type": "string",
"nsInstanceId": id_schema,
"timeout_ns_update": integer1_schema,
"updateType": {
- "enum": ["CHANGE_VNFPKG", "REMOVE_VNF", "MODIFY_VNF_INFORMATION", "OPERATE_VNF"]
+ "enum": [
+ "CHANGE_VNFPKG",
+ "REMOVE_VNF",
+ "MODIFY_VNF_INFORMATION",
+ "OPERATE_VNF",
+ ]
},
"modifyVnfInfoData": {
"type": "object",
},
"required": ["vdu_id", "count-index"],
"additionalProperties": False,
- }
+ },
},
"required": ["vnfInstanceId", "changeStateTo"],
- }
+ },
},
"required": ["updateType"],
"additionalProperties": False,
"migrateToHost": string_schema,
"vdu": {
"type": "object",
- "properties": {
- "vduId": name_schema,
- "vduCountIndex": integer0_schema,
- },
- "required": ["vduId"],
- "additionalProperties": False,
+ "properties": {
+ "vduId": name_schema,
+ "vduCountIndex": integer0_schema,
+ },
+ "required": ["vduId"],
+ "additionalProperties": False,
},
},
"required": ["vnfInstanceId"],
- "additionalProperties": False
+ "additionalProperties": False,
}
ns_heal = {
"virtualMemory": integer1_schema,
"sizeOfStorage": integer0_schema,
"numVirtualCpu": integer1_schema,
- },
- }
+ },
},
+ },
"required": ["vnfInstanceId", "additionalParams"],
"additionalProperties": False,
- }
},
+ },
"required": ["lcmOperationType", "verticalScale", "nsInstanceId"],
"additionalProperties": False,
}
"enum": [
"VnfIdentifierCreationNotification",
"VnfLcmOperationOccurrenceNotification",
- "VnfIdentifierDeletionNotification"
- ]
- }
+ "VnfIdentifierDeletionNotification",
+ ]
+ },
},
"operationTypes": {
"type": "array",
"items": {
"enum": [
- "INSTANTIATE", "SCALE", "SCALE_TO_LEVEL", "CHANGE_FLAVOUR", "TERMINATE",
- "HEAL", "OPERATE", "CHANGE_EXT_CONN", "MODIFY_INFO", "CREATE_SNAPSHOT",
- "REVERT_TO_SNAPSHOT", "CHANGE_VNFPKG"
- ]
- }
+ "INSTANTIATE",
+ "SCALE",
+ "SCALE_TO_LEVEL",
+ "CHANGE_FLAVOUR",
+ "TERMINATE",
+ "HEAL",
+ "OPERATE",
+ "CHANGE_EXT_CONN",
+ "MODIFY_INFO",
+ "CREATE_SNAPSHOT",
+ "REVERT_TO_SNAPSHOT",
+ "CHANGE_VNFPKG",
+ ]
+ },
},
"operationStates": {
"type": "array",
"items": {
"enum": [
- "STARTING", "PROCESSING", "COMPLETED", "FAILED_TEMP", "FAILED",
- "ROLLING_BACK", "ROLLED_BACK"
- ]
- }
- }
+ "STARTING",
+ "PROCESSING",
+ "COMPLETED",
+ "FAILED_TEMP",
+ "FAILED",
+ "ROLLING_BACK",
+ "ROLLED_BACK",
+ ]
+ },
+ },
},
- "required": ["VnfInstanceSubscriptionFilter", "notificationTypes"]
- }
+ "required": ["VnfInstanceSubscriptionFilter", "notificationTypes"],
+}
vnf_subscription = {
"title": "vnf subscription input schema",
"properties": {
"filter": vnflcmsub_schema,
"CallbackUri": description_schema,
- "authentication": authentication_schema
+ "authentication": authentication_schema,
},
- "required": ["filter", "CallbackUri"]
+ "required": ["filter", "CallbackUri"],
}
class VnfInstances(BaseTopic):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor call for vnf instance topic
:param headers: http request headers
:return: the _id of vnf instance created at database. Or an exception.
"""
- return self.vnfinstances2nsinstances.new(rollback, session, indata, kwargs, headers)
+ return self.vnfinstances2nsinstances.new(
+ rollback, session, indata, kwargs, headers
+ )
def list(self, session, filter_q=None, api_req=False):
"""
class VnfLcmOpTopic(BaseTopic):
-
def __init__(self, db, fs, msg, auth):
"""
Constructor call for vnf lcm op topic
deps = black
skip_install = true
commands =
- - black --check --diff osm_nbi/
- - black --check --diff setup.py
+ black --check --diff osm_nbi/
+ black --check --diff setup.py
#######################################################################################
[testenv:flake8]
deps = flake8
commands =
- - flake8 osm_nbi/ setup.py
+ flake8 osm_nbi/ setup.py
#######################################################################################
-r{toxinidir}/requirements-test.txt
pylint
commands =
- - pylint -E osm_nbi
+ pylint -E osm_nbi
#######################################################################################
W503,
E123,
E125,
+ E203,
E226,
E241,
E501