X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fengine.py;h=53bf21989a5942face8a32165e5593061adec695;hp=e55c7ecaba5d0bf5dd9279808b397976bf27998f;hb=56e698aea30098e7cfc0c5e3df9e771a4dd47f64;hpb=0f98af53b320c8244b58d0d8751e28e157949e8e diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index e55c7ec..53bf219 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -1,23 +1,22 @@ # -*- coding: utf-8 -*- -import dbmongo -import dbmemory -import fslocal -import msglocal -import msgkafka +from osm_common import dbmongo +from osm_common import dbmemory +from osm_common import fslocal +from osm_common import msglocal +from osm_common import msgkafka import tarfile import yaml import json import logging -from random import choice as random_choice from uuid import uuid4 from hashlib import sha256, md5 -from dbbase import DbException -from fsbase import FsException -from msgbase import MsgException +from osm_common.dbbase import DbException, deep_update +from osm_common.fsbase import FsException +from osm_common.msgbase import MsgException from http import HTTPStatus from time import time -from copy import deepcopy +from copy import copy from validation import validate_input, ValidationError __author__ = "Alfonso Tierno " @@ -30,28 +29,16 @@ class EngineException(Exception): Exception.__init__(self, message) -def _deep_update(dict_to_change, dict_reference): +def get_iterable(input): """ - Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396 - :param dict_to_change: Ends modified - :param dict_reference: reference - :return: none + Returns an iterable, in case input is None it just returns an empty tuple + :param input: + :return: iterable """ + if input is None: + return () + return input - for k in dict_reference: - if dict_reference[k] is None: # None->Anything - if k in dict_to_change: - del dict_to_change[k] - elif not isinstance(dict_reference[k], dict): # NotDict->Anything - dict_to_change[k] = dict_reference[k] - elif k not in dict_to_change: # Dict->Empty - dict_to_change[k] = deepcopy(dict_reference[k]) - _deep_update(dict_to_change[k], dict_reference[k]) - elif isinstance(dict_to_change[k], dict): # Dict->Dict - _deep_update(dict_to_change[k], dict_reference[k]) - else: # Dict->NotDict - dict_to_change[k] = deepcopy(dict_reference[k]) - _deep_update(dict_to_change[k], dict_reference[k]) class Engine(object): @@ -112,102 +99,6 @@ class Engine(object): except (DbException, FsException, MsgException) as e: raise EngineException(str(e), http_code=e.http_code) - def authorize(self, token): - try: - if not token: - raise EngineException("Needed a token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - if token not in self.tokens: - raise EngineException("Invalid token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - session = self.tokens[token] - now = time() - if session["expires"] < now: - del self.tokens[token] - raise EngineException("Expired Token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - return session - except EngineException: - if self.config["global"].get("test.user_not_authorized"): - return {"id": "fake-token-id-for-test", - "project_id": self.config["global"].get("test.project_not_authorized", "admin"), - "username": self.config["global"]["test.user_not_authorized"]} - else: - raise - - def new_token(self, session, indata, remote): - now = time() - user_content = None - - # Try using username/password - if indata.get("username"): - user_rows = self.db.get_list("users", {"username": indata.get("username")}) - user_content = None - if user_rows: - user_content = user_rows[0] - salt = user_content["_admin"]["salt"] - shadow_password = sha256(indata.get("password", "").encode('utf-8') + salt.encode('utf-8')).hexdigest() - if shadow_password != user_content["password"]: - user_content = None - if not user_content: - raise EngineException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED) - elif session: - user_rows = self.db.get_list("users", {"username": session["username"]}) - if user_rows: - user_content = user_rows[0] - else: - raise EngineException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED) - else: - raise EngineException("Provide credentials: username/password or Authorization Bearer token", - http_code=HTTPStatus.UNAUTHORIZED) - - token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') - for _ in range(0, 32)) - if indata.get("project_id"): - project_id = indata.get("project_id") - if project_id not in user_content["projects"]: - raise EngineException("project {} not allowed for this user".format(project_id), - http_code=HTTPStatus.UNAUTHORIZED) - else: - project_id = user_content["projects"][0] - if project_id == "admin": - session_admin = True - else: - project = self.db.get_one("projects", {"_id": project_id}) - session_admin = project.get("admin", False) - new_session = {"issued_at": now, "expires": now+3600, - "_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"], - "remote_port": remote.port, "admin": session_admin} - if remote.name: - new_session["remote_host"] = remote.name - elif remote.ip: - new_session["remote_host"] = remote.ip - - self.tokens[token_id] = new_session - return deepcopy(new_session) - - def get_token_list(self, session): - token_list = [] - for token_id, token_value in self.tokens.items(): - if token_value["username"] == session["username"]: - token_list.append(deepcopy(token_value)) - return token_list - - def get_token(self, session, token_id): - token_value = self.tokens.get(token_id) - if not token_value: - raise EngineException("token not found", http_code=HTTPStatus.NOT_FOUND) - if token_value["username"] != session["username"] and not session["admin"]: - raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED) - return token_value - - def del_token(self, token_id): - try: - del self.tokens[token_id] - return "token '{}' deleted".format(token_id) - except KeyError: - raise EngineException("Token '{}' not found".format(token_id), http_code=HTTPStatus.NOT_FOUND) - @staticmethod def _remove_envelop(item, indata=None): """ @@ -243,23 +134,92 @@ class Engine(object): clean_indata = clean_indata['userDefinedData'] return clean_indata - def _validate_new_data(self, session, item, indata, id=None): + def _check_project_dependencies(self, project_id): + """ + Check if a project can be deleted + :param session: + :param _id: + :return: + """ + # TODO Is it needed to check descriptors _admin.project_read/project_write?? + _filter = {"projects": project_id} + if self.db.get_list("users", _filter): + raise EngineException("There are users that uses this project", http_code=HTTPStatus.CONFLICT) + + def _check_dependencies_on_descriptor(self, session, item, descriptor_id, _id): + """ + Check that the descriptor to be deleded is not a dependency of others + :param session: client session information + :param item: can be vnfds, nsds + :param descriptor_id: id (provided by client) of descriptor to be deleted + :param _id: internal id of descriptor to be deleted + :return: None or raises exception + """ + if item == "vnfds": + _filter = {"constituent-vnfd.ANYINDEX.vnfd-id-ref": descriptor_id} + if self.get_item_list(session, "nsds", _filter): + raise EngineException("There are nsd that depends on this VNFD", http_code=HTTPStatus.CONFLICT) + if self.get_item_list(session, "vnfrs", {"vnfd-id": _id}): + raise EngineException("There are vnfr that depends on this VNFD", http_code=HTTPStatus.CONFLICT) + elif item == "nsds": + _filter = {"nsdId": _id} + if self.get_item_list(session, "nsrs", _filter): + raise EngineException("There are nsr that depends on this NSD", http_code=HTTPStatus.CONFLICT) + + def _check_descriptor_dependencies(self, session, item, descriptor): + """ + Check that the dependent descriptors exist on a new descriptor or edition + :param session: client session information + :param item: can be nsds, nsrs + :param descriptor: descriptor to be inserted or edit + :return: None or raises exception + """ + if item == "nsds": + if not descriptor.get("constituent-vnfd"): + return + for vnf in descriptor["constituent-vnfd"]: + vnfd_id = vnf["vnfd-id-ref"] + if not self.get_item_list(session, "vnfds", {"id": vnfd_id}): + raise EngineException("Descriptor error at 'constituent-vnfd':'vnfd-id-ref'='{}' references a non " + "existing vnfd".format(vnfd_id), http_code=HTTPStatus.CONFLICT) + elif item == "nsrs": + if not descriptor.get("nsdId"): + return + nsd_id = descriptor["nsdId"] + if not self.get_item_list(session, "nsds", {"id": nsd_id}): + raise EngineException("Descriptor error at nsdId='{}' references a non exist nsd".format(nsd_id), + http_code=HTTPStatus.CONFLICT) + + def _check_edition(self, session, item, indata, id, force=False): + if item == "users": + if indata.get("projects"): + if not session["admin"]: + raise EngineException("Needed admin privileges to edit user projects", HTTPStatus.UNAUTHORIZED) + if indata.get("password"): + # regenerate salt and encrypt password + salt = uuid4().hex + indata["_admin"] = {"salt": salt} + indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest() + + def _validate_new_data(self, session, item, indata, id=None, force=False): if item == "users": - if not indata.get("username"): - raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY) - if not indata.get("password"): - raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY) - if not indata.get("projects"): - raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY) - # check username not exist - if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("username '{}' exist".format(indata["username"]), HTTPStatus.CONFLICT) + # check username not exists + if not id and self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, + fail_on_more=False): + raise EngineException("username '{}' exists".format(indata["username"]), HTTPStatus.CONFLICT) + # check projects + if not force: + for p in indata["projects"]: + if p == "admin": + continue + if not self.db.get_one("projects", {"_id": p}, fail_on_empty=False, fail_on_more=False): + raise EngineException("project '{}' does not exists".format(p), HTTPStatus.CONFLICT) elif item == "projects": if not indata.get("name"): raise EngineException("missing 'name'") - # check name not exist - if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT) + # check name not exists + if not id and self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): + raise EngineException("name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT) elif item in ("vnfds", "nsds"): filter = {"id": indata["id"]} if id: @@ -267,58 +227,173 @@ class Engine(object): # TODO add admin to filter, validate rights self._add_read_filter(session, item, filter) if self.db.get_one(item, filter, fail_on_empty=False): - raise EngineException("{} with id '{}' already exist for this tenant".format(item[:-1], indata["id"]), + raise EngineException("{} with id '{}' already exists for this tenant".format(item[:-1], indata["id"]), HTTPStatus.CONFLICT) - - # TODO validate with pyangbind + # TODO validate with pyangbind. Load and dumps to convert data types + if item == "nsds": + # transform constituent-vnfd:member-vnf-index to string + if indata.get("constituent-vnfd"): + for constituent_vnfd in indata["constituent-vnfd"]: + if "member-vnf-index" in constituent_vnfd: + constituent_vnfd["member-vnf-index"] = str(constituent_vnfd["member-vnf-index"]) + + if item == "nsds" and not force: + self._check_descriptor_dependencies(session, "nsds", indata) elif item == "userDefinedData": # TODO validate userDefinedData is a keypair values pass elif item == "nsrs": pass - elif item == "vims" or item == "sdns": - if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("name '{}' already exist for {}".format(indata["name"], item), + elif item == "vim_accounts" or item == "sdns": + filter = {"name": indata.get("name")} + if id: + filter["_id.neq"] = id + if self.db.get_one(item, filter, fail_on_empty=False, fail_on_more=False): + raise EngineException("name '{}' already exists for {}".format(indata["name"], item), HTTPStatus.CONFLICT) + def _check_ns_operation(self, session, nsr, operation, indata): + """ + Check that user has enter right parameters for the operation + :param session: + :param operation: it can be: instantiate, terminate, action, TODO: update, heal + :param indata: descriptor with the parameters of the operation + :return: None + """ + vnfds = {} + vim_accounts = [] + nsd = nsr["nsd"] + + def check_valid_vnf_member_index(member_vnf_index): + for vnf in nsd["constituent-vnfd"]: + if member_vnf_index == vnf["member-vnf-index"]: + vnfd_id = vnf["vnfd-id-ref"] + if vnfd_id not in vnfds: + vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id}) + return vnfds[vnfd_id] + else: + raise EngineException("Invalid parameter member_vnf_index='{}' is not one of the " + "nsd:constituent-vnfd".format(member_vnf_index)) + + def check_valid_vim_account(vim_account): + if vim_account in vim_accounts: + return + try: + self.db.get_one("vim_accounts", {"_id": vim_account}) + except Exception: + raise EngineException("Invalid vimAccountId='{}' not present".format(vim_account)) + vim_accounts.append(vim_account) + + if operation == "action": + # check vnf_member_index + if indata.get("vnf_member_index"): + indata["member_vnf_index"] = indata.pop("vnf_member_index") # for backward compatibility + if not indata.get("member_vnf_index"): + raise EngineException("Missing 'member_vnf_index' parameter") + vnfd = check_valid_vnf_member_index(indata["member_vnf_index"]) + # check primitive + for config_primitive in get_iterable(vnfd.get("vnf-configuration", {}).get("config-primitive")): + if indata["primitive"] == config_primitive["name"]: + # check needed primitive_params are provided + if indata.get("primitive_params"): + in_primitive_params_copy = copy(indata["primitive_params"]) + else: + in_primitive_params_copy = {} + for paramd in get_iterable(config_primitive.get("parameter")): + if paramd["name"] in in_primitive_params_copy: + del in_primitive_params_copy[paramd["name"]] + elif not paramd.get("default-value"): + raise EngineException("Needed parameter {} not provided for primitive '{}'".format( + paramd["name"], indata["primitive"])) + # check no extra primitive params are provided + if in_primitive_params_copy: + raise EngineException("parameter/s '{}' not present at vnfd for primitive '{}'".format( + list(in_primitive_params_copy.keys()), indata["primitive"])) + break + else: + raise EngineException("Invalid primitive '{}' is not present at vnfd".format(indata["primitive"])) + if operation == "scale": + vnfd = check_valid_vnf_member_index(indata["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]) + for scaling_group in get_iterable(vnfd.get("scaling-group-descriptor")): + if indata["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"] == scaling_group["name"]: + break + else: + raise EngineException("Invalid scaleVnfData:scaleByStepData:scaling-group-descriptor '{}' is not " + "present at vnfd:scaling-group-descriptor".format( + indata["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"])) + if operation == "instantiate": + # check vim_account + check_valid_vim_account(indata["vimAccountId"]) + for in_vnf in get_iterable(indata.get("vnf")): + vnfd = check_valid_vnf_member_index(in_vnf["member-vnf-index"]) + if in_vnf.get("vimAccountId"): + check_valid_vim_account(in_vnf["vimAccountId"]) + for in_vdu in get_iterable(in_vnf.get("vdu")): + for vdud in get_iterable(vnfd.get("vdu")): + if vdud["id"] == in_vdu["id"]: + for volume in get_iterable(in_vdu.get("volume")): + for volumed in get_iterable(vdud.get("volumes")): + if volumed["name"] == volume["name"]: + break + else: + raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu[id='{}']:" + "volume:name='{}' is not present at vnfd:vdu:volumes list". + format(in_vnf["member-vnf-index"], in_vdu["id"], + volume["name"])) + break + else: + raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu:id='{}' is not " + "present at vnfd".format(in_vnf["member-vnf-index"], in_vdu["id"])) + + for in_internal_vld in get_iterable(in_vnf.get("internal-vld")): + for internal_vldd in get_iterable(vnfd.get("internal-vld")): + if in_internal_vld["name"] == internal_vldd["name"] or \ + in_internal_vld["name"] == internal_vldd["id"]: + break + else: + raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:internal-vld:name='{}'" + " is not present at vnfd '{}'".format(in_vnf["member-vnf-index"], + in_internal_vld["name"], + vnfd["id"])) + for in_vld in get_iterable(indata.get("vld")): + for vldd in get_iterable(nsd.get("vld")): + if in_vld["name"] == vldd["name"] or in_vld["name"] == vldd["id"]: + break + else: + raise EngineException("Invalid parameter vld:name='{}' is not present at nsd:vld".format( + in_vld["name"])) - def _format_new_data(self, session, item, indata, admin=None): + def _format_new_data(self, session, item, indata): now = time() - if not "_admin" in indata: + if "_admin" not in indata: indata["_admin"] = {} indata["_admin"]["created"] = now indata["_admin"]["modified"] = now if item == "users": - _id = indata["username"] + indata["_id"] = indata["username"] salt = uuid4().hex indata["_admin"]["salt"] = salt indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest() elif item == "projects": - _id = indata["name"] + indata["_id"] = indata["name"] else: - _id = None - storage = None - if admin: - _id = admin.get("_id") - storage = admin.get("storage") - if not _id: - _id = str(uuid4()) - if item in ("vnfds", "nsds"): + if not indata.get("_id"): + indata["_id"] = str(uuid4()) + if item in ("vnfds", "nsds", "nsrs", "vnfrs"): if not indata["_admin"].get("projects_read"): indata["_admin"]["projects_read"] = [session["project_id"]] if not indata["_admin"].get("projects_write"): indata["_admin"]["projects_write"] = [session["project_id"]] + if item in ("vnfds", "nsds"): indata["_admin"]["onboardingState"] = "CREATED" indata["_admin"]["operationalState"] = "DISABLED" indata["_admin"]["usageSate"] = "NOT_IN_USE" - elif item in ("vims", "sdns"): + if item == "nsrs": + indata["_admin"]["nsState"] = "NOT_INSTANTIATED" + if item in ("vim_accounts", "sdns"): indata["_admin"]["operationalState"] = "PROCESSING" - if storage: - indata["_admin"]["storage"] = storage - indata["_id"] = _id - def upload_content(self, session, item, _id, indata, kwargs, headers): """ Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract) @@ -331,7 +406,7 @@ class Engine(object): :return: True package has is completely uploaded or False if partial content has been uplodaed. Raise exception on error """ - # Check that _id exist and it is valid + # Check that _id exists and it is valid current_desc = self.get_item(session, item, _id) content_range_text = headers.get("Content-Range") @@ -421,7 +496,8 @@ class Engine(object): storage["pkg-dir"] = tarname_path[0] if len(tarname_path) == 2: if descriptor_file_name: - raise EngineException("Found more than one descriptor file at package descriptor tar.gz") + raise EngineException( + "Found more than one descriptor file at package descriptor tar.gz") descriptor_file_name = tarname if not descriptor_file_name: raise EngineException("Not found any descriptor file at package descriptor tar.gz") @@ -457,140 +533,311 @@ class Engine(object): HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) except IOError as e: raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST) + except tarfile.ReadError as e: + raise EngineException("invalid file content {}".format(e), HTTPStatus.BAD_REQUEST) except (ValueError, yaml.YAMLError) as e: raise EngineException(error_text + str(e)) finally: if file_pkg: file_pkg.close() - def new_nsr(self, session, ns_request): + def new_nsr(self, rollback, session, ns_request): """ - Creates a new nsr into database + Creates a new nsr into database. It also creates needed vnfrs + :param rollback: list where this method appends created items at database in case a rollback may to be done :param session: contains the used login username and working project :param ns_request: params to be used for the nsr - :return: nsr descriptor to be stored at database and the _id + :return: the _id of nsr descriptor stored at database """ + rollback_index = len(rollback) + step = "" + try: + # look for nsr + step = "getting nsd id='{}' from database".format(ns_request.get("nsdId")) + nsd = self.get_item(session, "nsds", ns_request["nsdId"]) + nsr_id = str(uuid4()) + now = time() + step = "filling nsr from input data" + nsr_descriptor = { + "name": ns_request["nsName"], + "name-ref": ns_request["nsName"], + "short-name": ns_request["nsName"], + "admin-status": "ENABLED", + "nsd": nsd, + "datacenter": ns_request["vimAccountId"], + "resource-orchestrator": "osmopenmano", + "description": ns_request.get("nsDescription", ""), + "constituent-vnfr-ref": [], + + "operational-status": "init", # typedef ns-operational- + "config-status": "init", # typedef config-states + "detailed-status": "scheduled", + + "orchestration-progress": {}, + # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}}, + + "create-time": now, + "nsd-name-ref": nsd["name"], + "operational-events": [], # "id", "timestamp", "description", "event", + "nsd-ref": nsd["id"], + "nsdId": nsd["_id"], + "instantiate_params": ns_request, + "ns-instance-config-ref": nsr_id, + "id": nsr_id, + "_id": nsr_id, + # "input-parameter": xpath, value, + "ssh-authorized-key": ns_request.get("key-pair-ref"), + } + ns_request["nsr_id"] = nsr_id + + # Create VNFR + needed_vnfds = {} + for member_vnf in nsd["constituent-vnfd"]: + vnfd_id = member_vnf["vnfd-id-ref"] + step = "getting vnfd id='{}' constituent-vnfd='{}' from database".format( + member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"]) + if vnfd_id not in needed_vnfds: + # Obtain vnfd + vnf_filter = {"id": vnfd_id} + self._add_read_filter(session, "vnfds", vnf_filter) + vnfd = self.db.get_one("vnfds", vnf_filter) + vnfd.pop("_admin") + needed_vnfds[vnfd_id] = vnfd + else: + vnfd = needed_vnfds[vnfd_id] + step = "filling vnfr vnfd-id='{}' constituent-vnfd='{}'".format( + member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"]) + vnfr_id = str(uuid4()) + vnfr_descriptor = { + "id": vnfr_id, + "_id": vnfr_id, + "nsr-id-ref": nsr_id, + "member-vnf-index-ref": member_vnf["member-vnf-index"], + "created-time": now, + # "vnfd": vnfd, # at OSM model.but removed to avoid data duplication TODO: revise + "vnfd-ref": vnfd_id, + "vnfd-id": vnfd["_id"], # not at OSM model, but useful + "vim-account-id": None, + "vdur": [], + "connection-point": [], + "ip-address": None, # mgmt-interface filled by LCM + } + for cp in vnfd.get("connection-point", ()): + vnf_cp = { + "name": cp["name"], + "connection-point-id": cp.get("id"), + "id": cp.get("id"), + # "ip-address", "mac-address" # filled by LCM + # vim-id # TODO it would be nice having a vim port id + } + vnfr_descriptor["connection-point"].append(vnf_cp) + for vdu in vnfd["vdu"]: + vdur_id = str(uuid4()) + vdur = { + "id": vdur_id, + "vdu-id-ref": vdu["id"], + # TODO "name": "" Name of the VDU in the VIM + "ip-address": None, # mgmt-interface filled by LCM + # "vim-id", "flavor-id", "image-id", "management-ip" # filled by LCM + "internal-connection-point": [], + "interfaces": [], + } + # TODO volumes: name, volume-id + for icp in vdu.get("internal-connection-point", ()): + vdu_icp = { + "id": icp["id"], + "connection-point-id": icp["id"], + "name": icp.get("name"), + # "ip-address", "mac-address" # filled by LCM + # vim-id # TODO it would be nice having a vim port id + } + vdur["internal-connection-point"].append(vdu_icp) + for iface in vdu.get("interface", ()): + vdu_iface = { + "name": iface.get("name"), + # "ip-address", "mac-address" # filled by LCM + # vim-id # TODO it would be nice having a vim port id + } + vdur["interfaces"].append(vdu_iface) + vnfr_descriptor["vdur"].append(vdur) + + step = "creating vnfr vnfd-id='{}' constituent-vnfd='{}' at database".format( + member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"]) + self._format_new_data(session, "vnfrs", vnfr_descriptor) + self.db.create("vnfrs", vnfr_descriptor) + rollback.insert(0, {"item": "vnfrs", "_id": vnfr_id}) + nsr_descriptor["constituent-vnfr-ref"].append(vnfr_id) + + step = "creating nsr at database" + self._format_new_data(session, "nsrs", nsr_descriptor) + self.db.create("nsrs", nsr_descriptor) + rollback.insert(rollback_index, {"item": "nsrs", "_id": nsr_id}) + return nsr_id + except Exception as e: + raise EngineException("Error {}: {}".format(step, e)) - # look for nsr - nsd = self.get_item(session, "nsds", ns_request["nsdId"]) - _id = str(uuid4()) - nsr_descriptor = { - "name": ns_request["nsName"], - "name-ref": ns_request["nsName"], - "short-name": ns_request["nsName"], - "admin-status": "ENABLED", - "nsd": nsd, - "datacenter": ns_request["vimAccountId"], - "resource-orchestrator": "osmopenmano", - "description": ns_request.get("nsDescription", ""), - "constituent-vnfr-ref": ["TODO datacenter-id, vnfr-id"], - - "operational-status": "init", # typedef ns-operational- - "config-status": "init", # typedef config-states - "detailed-status": "scheduled", - - "orchestration-progress": {}, # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}}, - - "crete-time": time(), - "nsd-name-ref": nsd["name"], - "operational-events": [], # "id", "timestamp", "description", "event", - "nsd-ref": nsd["id"], - "ns-instance-config-ref": _id, - "id": _id, - - # "input-parameter": xpath, value, - "ssh-authorized-key": ns_request.get("key-pair-ref"), - } - ns_request["nsr_id"] = _id - return nsr_descriptor, _id + @staticmethod + def _update_descriptor(desc, kwargs): + """ + Update descriptor with the kwargs. It contains dot separated keys + :param desc: dictionary to be updated + :param kwargs: plain dictionary to be used for updating. + :return: + """ + if not kwargs: + return + try: + for k, v in kwargs.items(): + update_content = desc + kitem_old = None + klist = k.split(".") + for kitem in klist: + if kitem_old is not None: + update_content = update_content[kitem_old] + if isinstance(update_content, dict): + kitem_old = kitem + elif isinstance(update_content, list): + kitem_old = int(kitem) + else: + raise EngineException( + "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) + update_content[kitem_old] = v + except KeyError: + raise EngineException( + "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) + except ValueError: + raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( + k, kitem)) + except IndexError: + raise EngineException( + "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) - def new_item(self, session, item, indata={}, kwargs=None, headers={}): + def new_item(self, rollback, session, item, indata={}, kwargs=None, headers={}, force=False): """ Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry, that must be completed with a call to method upload_content + :param rollback: list where this method appends created items at database in case a rollback may to be done :param session: contains the used login username and working project - :param item: it can be: users, projects, vims, sdns, nsrs, nsds, vnfds + :param item: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds :param indata: data to be inserted :param kwargs: used to override the indata descriptor :param headers: http request headers - :return: _id, transaction_id: identity of the inserted data. or transaction_id if Content-Range is used + :param force: If True avoid some dependence checks + :return: _id: identity of the inserted data. """ - transaction = None - - item_envelop = item - if item in ("nsds", "vnfds"): - item_envelop = "userDefinedData" - content = self._remove_envelop(item_envelop, indata) + if not session["admin"] and item in ("users", "projects"): + raise EngineException("Needed admin privileges to perform this operation", HTTPStatus.UNAUTHORIZED) - # Override descriptor with query string kwargs - if kwargs: - try: - for k, v in kwargs.items(): - update_content = content - kitem_old = None - klist = k.split(".") - for kitem in klist: - if kitem_old is not None: - update_content = update_content[kitem_old] - if isinstance(update_content, dict): - kitem_old = kitem - elif isinstance(update_content, list): - kitem_old = int(kitem) - else: - raise EngineException( - "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) - update_content[kitem_old] = v - except KeyError: - raise EngineException( - "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) - except ValueError: - raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( - k, kitem)) - except IndexError: - raise EngineException( - "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) try: + item_envelop = item + if item in ("nsds", "vnfds"): + item_envelop = "userDefinedData" + content = self._remove_envelop(item_envelop, indata) + + # Override descriptor with query string kwargs + self._update_descriptor(content, kwargs) + if not content and item not in ("nsds", "vnfds"): + raise EngineException("Empty payload") + validate_input(content, item, new=True) + + if item == "nsrs": + # in this case the input descriptor is not the data to be stored + return self.new_nsr(rollback, session, ns_request=content) + + self._validate_new_data(session, item_envelop, content, force=force) + if item in ("nsds", "vnfds"): + content = {"_admin": {"userDefinedData": content}} + self._format_new_data(session, item, content) + _id = self.db.create(item, content) + rollback.insert(0, {"item": item, "_id": _id}) + + if item == "vim_accounts": + msg_data = self.db.get_one(item, {"_id": _id}) + msg_data.pop("_admin", None) + self.msg.write("vim_account", "create", msg_data) + elif item == "sdns": + msg_data = self.db.get_one(item, {"_id": _id}) + msg_data.pop("_admin", None) + self.msg.write("sdn", "create", msg_data) + return _id except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - if not indata and item not in ("nsds", "vnfds"): - raise EngineException("Empty payload") + def new_nslcmop(self, session, nsInstanceId, operation, params): + now = time() + _id = str(uuid4()) + nslcmop = { + "id": _id, + "_id": _id, + "operationState": "PROCESSING", # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK + "statusEnteredTime": now, + "nsInstanceId": nsInstanceId, + "lcmOperationType": operation, + "startTime": now, + "isAutomaticInvocation": False, + "operationParams": params, + "isCancelPending": False, + "links": { + "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id, + "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsInstanceId, + } + } + return nslcmop - if item == "nsrs": - # in this case the imput descriptor is not the data to be stored - ns_request = content - content, _id = self.new_nsr(session, ns_request) - transaction = {"_id": _id} - - self._validate_new_data(session, item_envelop, content) - if item in ("nsds", "vnfds"): - content = {"_admin": {"userDefinedData": content}} - self._format_new_data(session, item, content, transaction) - _id = self.db.create(item, content) - if item == "nsrs": - self.msg.write("ns", "create", _id) - elif item == "vims": - msg_data = self.db.get_one(item, {"_id": _id}) - msg_data.pop("_admin", None) - self.msg.write("vim_account", "create", msg_data) - elif item == "sdns": - msg_data = self.db.get_one(item, {"_id": _id}) - msg_data.pop("_admin", None) - self.msg.write("sdn", "create", msg_data) - return _id + def ns_operation(self, rollback, session, nsInstanceId, operation, indata, kwargs=None): + """ + Performs a new operation over a ns + :param rollback: list where this method appends created items at database in case a rollback may to be done + :param session: contains the used login username and working project + :param nsInstanceId: _id of the nsr to perform the operation + :param operation: it can be: instantiate, terminate, action, TODO: update, heal + :param indata: descriptor with the parameters of the operation + :param kwargs: used to override the indata descriptor + :return: id of the nslcmops + """ + try: + # Override descriptor with query string kwargs + self._update_descriptor(indata, kwargs) + validate_input(indata, "ns_" + operation, new=True) + # get ns from nsr_id + nsr = self.get_item(session, "nsrs", nsInstanceId) + if not nsr["_admin"].get("nsState") or nsr["_admin"]["nsState"] == "NOT_INSTANTIATED": + if operation == "terminate" and indata.get("autoremove"): + # NSR must be deleted + return self.del_item(session, "nsrs", nsInstanceId) + if operation != "instantiate": + raise EngineException("ns_instance '{}' cannot be '{}' because it is not instantiated".format( + nsInstanceId, operation), HTTPStatus.CONFLICT) + else: + if operation == "instantiate" and not indata.get("force"): + raise EngineException("ns_instance '{}' cannot be '{}' because it is already instantiated".format( + nsInstanceId, operation), HTTPStatus.CONFLICT) + indata["nsInstanceId"] = nsInstanceId + self._check_ns_operation(session, nsr, operation, indata) + nslcmop = self.new_nslcmop(session, nsInstanceId, operation, indata) + self._format_new_data(session, "nslcmops", nslcmop) + _id = self.db.create("nslcmops", nslcmop) + rollback.insert(0, {"item": "nslcmops", "_id": _id}) + indata["_id"] = _id + self.msg.write("ns", operation, nslcmop) + return _id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + # except DbException as e: + # raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND) def _add_read_filter(self, session, item, filter): - if session["project_id"] == "admin": # allows all + if session["admin"]: # allows all return filter if item == "users": filter["username"] = session["username"] - elif item == "vnfds" or item == "nsds": + elif item in ("vnfds", "nsds", "nsrs", "vnfrs"): filter["_admin.projects_read.cont"] = ["ANY", session["project_id"]] def _add_delete_filter(self, session, item, filter): - if session["project_id"] != "admin" and item in ("users", "projects"): + if not session["admin"] and item in ("users", "projects"): raise EngineException("Only admin users can perform this task", http_code=HTTPStatus.FORBIDDEN) if item == "users": if filter.get("_id") == session["username"] or filter.get("username") == session["username"]: @@ -598,7 +845,7 @@ class Engine(object): elif item == "project": if filter.get("_id") == session["project_id"]: raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT) - elif item in ("vnfds", "nsds") and session["project_id"] != "admin": + elif item in ("vnfds", "nsds") and not session["admin"]: filter["_admin.projects_write.cont"] = ["ANY", session["project_id"]] def get_file(self, session, item, _id, path=None, accept_header=None): @@ -624,7 +871,7 @@ class Engine(object): content = self.get_item(session, item, _id) if content["_admin"]["onboardingState"] != "ONBOARDED": raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. " - "onboardingState is {}".format(content["_admin"]["onboardingState"]), + "onboardingState is {}".format(content["_admin"]["onboardingState"]), http_code=HTTPStatus.CONFLICT) storage = content["_admin"]["storage"] if path is not None and path != "$DESCRIPTOR": # artifacts @@ -635,8 +882,8 @@ class Engine(object): return folder_content, "text/plain" # TODO manage folders in http else: - return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \ - "application/octet-stream" + return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"),\ + "application/octet-stream" # pkgtype accept ZIP TEXT -> result # manyfiles yes X -> zip @@ -648,12 +895,12 @@ class Engine(object): return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain" elif storage.get('pkg-dir') and not accept_zip: raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'" - "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE) + "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE) else: if not storage.get('zipfile'): # TODO generate zipfile if not present - raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in future versions" - "", http_code=HTTPStatus.NOT_ACCEPTABLE) + raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in " + "future versions", http_code=HTTPStatus.NOT_ACCEPTABLE) return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip" def get_item_list(self, session, item, filter={}): @@ -679,7 +926,6 @@ class Engine(object): :param _id: server id of the item :return: dictionary, raise exception if not found. """ - database_item = item filter = {"_id": _id} # TODO add admin to filter, validate rights # TODO transform data for SOL005 URL requests @@ -698,33 +944,52 @@ class Engine(object): self._add_read_filter(session, item, filter) return self.db.del_list(item, filter) - def del_item(self, session, item, _id): + def del_item(self, session, item, _id, force=False): """ - Get complete information on an items + Delete item by its internal id :param session: contains the used login username and working project :param item: it can be: users, projects, vnfds, nsds, ... :param _id: server id of the item - :return: dictionary, raise exception if not found. + :param force: indicates if deletion must be forced in case of conflict + :return: dictionary with deleted item _id. It raises exception if not found. """ # TODO add admin to filter, validate rights # data = self.get_item(item, _id) filter = {"_id": _id} self._add_delete_filter(session, item, filter) + if item in ("vnfds", "nsds") and not force: + descriptor = self.get_item(session, item, _id) + descriptor_id = descriptor.get("id") + if descriptor_id: + self._check_dependencies_on_descriptor(session, item, descriptor_id, _id) + elif item == "projects": + if not force: + self._check_project_dependencies(_id) - if item in ("nsrs", "vims", "sdns"): - desc = self.db.get_one(item, filter) - desc["_admin"]["to_delete"] = True - self.db.replace(item, _id, desc) # TODO change to set_one - if item == "nsrs": - self.msg.write("ns", "delete", _id) - elif item == "vims": + if item == "nsrs": + nsr = self.db.get_one(item, filter) + if nsr["_admin"].get("nsState") == "INSTANTIATED" and not force: + raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. " + "Launch 'terminate' operation first; or force deletion".format(_id), + http_code=HTTPStatus.CONFLICT) + v = self.db.del_one(item, {"_id": _id}) + self.db.del_list("nslcmops", {"nsInstanceId": _id}) + self.db.del_list("vnfrs", {"nsr-id-ref": _id}) + self.msg.write("ns", "deleted", {"_id": _id}) + return v + if item in ("vim_accounts", "sdns") and not force: + self.db.set_one(item, {"_id": _id}, {"_admin.to_delete": True}) # TODO change status + if item == "vim_accounts": self.msg.write("vim_account", "delete", {"_id": _id}) elif item == "sdns": self.msg.write("sdn", "delete", {"_id": _id}) return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED v = self.db.del_one(item, filter) - self.fs.file_delete(_id, ignore_non_exist=True) + if item in ("vnfds", "nsds"): + self.fs.file_delete(_id, ignore_non_exist=True) + if item in ("vim_accounts", "sdns", "vnfds", "nsds"): + self.msg.write(item[:-1], "deleted", {"_id": _id}) return v def prune(self): @@ -736,19 +1001,48 @@ class Engine(object): def create_admin(self): """ - Creates a new user admin/admin into database. Only allowed if database is empty. Useful for initialization - :return: _id identity of the inserted data. + Creates a new user admin/admin into database if database is empty. Useful for initialization + :return: _id identity of the inserted data, or None """ users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False) if users: - raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED) + return None + # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED) indata = {"username": "admin", "password": "admin", "projects": ["admin"]} fake_session = {"project_id": "admin", "username": "admin"} self._format_new_data(fake_session, "users", indata) _id = self.db.create("users", indata) return _id - def _edit_item(self, session, item, id, content, indata={}, kwargs=None): + def init_db(self, target_version='1.0'): + """ + Init database if empty. If not empty it checks that database version is ok. + If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' + :return: None if ok, exception if error or if the version is different. + """ + version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False) + if not version: + # create user admin + self.create_admin() + # create database version + version_data = { + "_id": '1.0', # version text + "version": 1000, # version number + "date": "2018-04-12", # version date + "description": "initial design", # changes in this version + 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR, + } + self.db.create("version", version_data) + elif version["_id"] != target_version: + # TODO implement migration process + raise EngineException("Wrong database version '{}'. Expected '{}'".format( + version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR) + elif version["status"] != 'ENABLED': + raise EngineException("Wrong database status '{}'".format( + version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR) + return + + def _edit_item(self, session, item, id, content, indata={}, kwargs=None, force=False): if indata: indata = self._remove_envelop(item, indata) @@ -780,24 +1074,25 @@ class Engine(object): raise EngineException( "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) try: - validate_input(content, item, new=False) + validate_input(indata, item, new=False) except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - _deep_update(content, indata) - self._validate_new_data(session, item, content, id) + self._check_edition(session, item, indata, id, force) + deep_update(content, indata) + self._validate_new_data(session, item, content, id=id, force=force) # self._format_new_data(session, item, content) self.db.replace(item, id, content) - if item in ("vims", "sdns"): + if item in ("vim_accounts", "sdns"): indata.pop("_admin", None) indata["_id"] = id - if item == "vims": + if item == "vim_accounts": self.msg.write("vim_account", "edit", indata) elif item == "sdns": self.msg.write("sdn", "edit", indata) return id - def edit_item(self, session, item, _id, indata={}, kwargs=None): + def edit_item(self, session, item, _id, indata={}, kwargs=None, force=False): """ Update an existing entry at database :param session: contains the used login username and working project @@ -805,11 +1100,11 @@ class Engine(object): :param _id: identifier to be updated :param indata: data to be inserted :param kwargs: used to override the indata descriptor + :param force: If True avoid some dependence checks :return: dictionary, raise exception if not found. """ + if not session["admin"] and item == "projects": + raise EngineException("Needed admin privileges to perform this operation", HTTPStatus.UNAUTHORIZED) content = self.get_item(session, item, _id) - return self._edit_item(session, item, _id, content, indata, kwargs) - - - + return self._edit_item(session, item, _id, content, indata, kwargs, force)