X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fengine.py;h=219becf8fde9b54376c7d5453e3d20687bd04e44;hp=c35617b7b832a7e47fa1afdf8399c3d341b75f0a;hb=2236d203182b39ae5d20d21a475c95f6a5d5fbc5;hpb=c94c3df90aa64298a7935a80b221f80f3c043260 diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index c35617b..219becf 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- -import dbmongo -import dbmemory -import fslocal -import msglocal -import msgkafka +from osm_common import dbmongo +from osm_common import dbmemory +from osm_common import fslocal +from osm_common import msglocal +from osm_common import msgkafka import tarfile import yaml import json @@ -12,12 +12,13 @@ import logging from random import choice as random_choice from uuid import uuid4 from hashlib import sha256, md5 -from dbbase import DbException -from fsbase import FsException -from msgbase import MsgException +from osm_common.dbbase import DbException +from osm_common.fsbase import FsException +from osm_common.msgbase import MsgException from http import HTTPStatus from time import time from copy import deepcopy +from validation import validate_input, ValidationError __author__ = "Alfonso Tierno " @@ -29,6 +30,29 @@ class EngineException(Exception): Exception.__init__(self, message) +def _deep_update(dict_to_change, dict_reference): + """ + Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396 + :param dict_to_change: Ends modified + :param dict_reference: reference + :return: none + """ + for k in dict_reference: + if dict_reference[k] is None: # None->Anything + if k in dict_to_change: + del dict_to_change[k] + elif not isinstance(dict_reference[k], dict): # NotDict->Anything + dict_to_change[k] = dict_reference[k] + elif k not in dict_to_change: # Dict->Empty + dict_to_change[k] = deepcopy(dict_reference[k]) + _deep_update(dict_to_change[k], dict_reference[k]) + elif isinstance(dict_to_change[k], dict): # Dict->Dict + _deep_update(dict_to_change[k], dict_reference[k]) + else: # Dict->NotDict + dict_to_change[k] = deepcopy(dict_reference[k]) + _deep_update(dict_to_change[k], dict_reference[k]) + + class Engine(object): def __init__(self): @@ -189,7 +213,7 @@ class Engine(object): """ Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the vnfd or nsd content - :param item: can be vnfds, nsds, users, projects, + :param item: can be vnfds, nsds, users, projects, userDefinedData (initial content of a vnfds, nsds :param indata: Content to be inspected :return: the useful part of indata """ @@ -214,9 +238,53 @@ class Engine(object): if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: raise EngineException("'nsd' must be a list only one element") clean_indata = clean_indata['nsd'][0] + elif item == "userDefinedData": + if "userDefinedData" in indata: + clean_indata = clean_indata['userDefinedData'] return clean_indata - def _validate_new_data(self, session, item, indata): + def _check_dependencies_on_descriptor(self, session, item, descriptor_id): + """ + Check that the descriptor to be deleded is not a dependency of others + :param session: client session information + :param item: can be vnfds, nsds + :param descriptor_id: id of descriptor to be deleted + :return: None or raises exception + """ + if item == "vnfds": + _filter = {"constituent-vnfd.ANYINDEX.vnfd-id-ref": descriptor_id} + if self.get_item_list(session, "nsds", _filter): + raise EngineException("There are nsd that depends on this VNFD", http_code=HTTPStatus.CONFLICT) + elif item == "nsds": + _filter = {"nsdId": descriptor_id} + if self.get_item_list(session, "nsrs", _filter): + raise EngineException("There are nsr that depends on this NSD", http_code=HTTPStatus.CONFLICT) + + def _check_descriptor_dependencies(self, session, item, descriptor): + """ + Check that the dependent descriptors exist on a new descriptor or edition + :param session: client session information + :param item: can be nsds, nsrs + :param descriptor: descriptor to be inserted or edit + :return: None or raises exception + """ + if item == "nsds": + if not descriptor.get("constituent-vnfd"): + return + for vnf in descriptor["constituent-vnfd"]: + vnfd_id = vnf["vnfd-id-ref"] + if not self.get_item_list(session, "vnfds", {"id": vnfd_id}): + raise EngineException("Descriptor error at 'constituent-vnfd':'vnfd-id-ref'='{}' references a non " + "existing vnfd".format(vnfd_id), http_code=HTTPStatus.CONFLICT) + elif item == "nsrs": + if not descriptor.get("nsdId"): + return + nsd_id = descriptor["nsdId"] + if not self.get_item_list(session, "nsds", {"id": nsd_id}): + raise EngineException("Descriptor error at nsdId='{}' references a non exist nsd".format(nsd_id), + http_code=HTTPStatus.CONFLICT) + + def _validate_new_data(self, session, item, indata, id=None, force=False): if item == "users": if not indata.get("username"): raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY) @@ -224,78 +292,117 @@ class Engine(object): raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY) if not indata.get("projects"): raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY) - # check username not exist + # check username not exists if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("username '{}' exist".format(indata["username"]), HTTPStatus.CONFLICT) + raise EngineException("username '{}' exists".format(indata["username"]), HTTPStatus.CONFLICT) elif item == "projects": if not indata.get("name"): raise EngineException("missing 'name'") - # check name not exist + # check name not exists if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT) - elif item == "vnfds" or item == "nsds": + raise EngineException("name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT) + elif item in ("vnfds", "nsds"): filter = {"id": indata["id"]} + if id: + filter["_id.neq"] = id # TODO add admin to filter, validate rights self._add_read_filter(session, item, filter) if self.db.get_one(item, filter, fail_on_empty=False): - raise EngineException("{} with id '{}' already exist for this tenant".format(item[:-1], indata["id"]), + raise EngineException("{} with id '{}' already exists for this tenant".format(item[:-1], indata["id"]), HTTPStatus.CONFLICT) # TODO validate with pyangbind + if item == "nsds" and not force: + self._check_descriptor_dependencies(session, "nsds", indata) + elif item == "userDefinedData": + # TODO validate userDefinedData is a keypair values + pass + elif item == "nsrs": pass + elif item == "vim_accounts" or item == "sdns": + filter = {"name": indata.get("name")} + if id: + filter["_id.neq"] = id + if self.db.get_one(item, filter, fail_on_empty=False, fail_on_more=False): + raise EngineException("name '{}' already exists for {}".format(indata["name"], item), + HTTPStatus.CONFLICT) - def _format_new_data(self, session, item, indata, admin=None): + def _check_ns_operation(self, session, nsr, operation, indata): + """ + Check that user has enter right parameters for the operation + :param session: + :param operation: it can be: instantiate, terminate, action, TODO: update, heal + :param indata: descriptor with the parameters of the operation + :return: None + """ + if operation == "action": + if indata.get("vnf_member_index"): + indata["member_vnf_index"] = indata.pop("vnf_member_index") # for backward compatibility + for vnf in nsr["nsd"]["constituent-vnfd"]: + if indata["member_vnf_index"] == vnf["member-vnf-index"]: + # TODO get vnfd, check primitives + break + else: + raise EngineException("Invalid parameter member_vnf_index='{}' is not one of the nsd " + "constituent-vnfd".format(indata["member_vnf_index"])) + + def _format_new_data(self, session, item, indata): now = time() - if not "_admin" in indata: + if "_admin" not in indata: indata["_admin"] = {} indata["_admin"]["created"] = now indata["_admin"]["modified"] = now if item == "users": - _id = indata["username"] + indata["_id"] = indata["username"] salt = uuid4().hex - indata["_admin"]["salt"] = salt + indata["_admin"]["salt"] = salt indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest() elif item == "projects": - _id = indata["name"] + indata["_id"] = indata["name"] else: - _id = None - storage = None - if admin: - _id = admin.get("_id") - storage = admin.get("storage") - if not _id: - _id = str(uuid4()) - if item == "vnfds" or item == "nsds": + if not indata.get("_id"): + indata["_id"] = str(uuid4()) + if item in ("vnfds", "nsds", "nsrs", "vnfrs"): if not indata["_admin"].get("projects_read"): indata["_admin"]["projects_read"] = [session["project_id"]] if not indata["_admin"].get("projects_write"): indata["_admin"]["projects_write"] = [session["project_id"]] - if storage: - indata["_admin"]["storage"] = storage - indata["_id"] = _id + if item in ("vnfds", "nsds"): + indata["_admin"]["onboardingState"] = "CREATED" + indata["_admin"]["operationalState"] = "DISABLED" + indata["_admin"]["usageSate"] = "NOT_IN_USE" + if item == "nsrs": + indata["_admin"]["nsState"] = "NOT_INSTANTIATED" + if item in ("vim_accounts", "sdns"): + indata["_admin"]["operationalState"] = "PROCESSING" - def _new_item_partial(self, session, item, indata, headers): + def upload_content(self, session, item, _id, indata, kwargs, headers): """ - Used for recieve content by chunks (with a transaction_id header and/or gzip file. It will store and extract + Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract) :param session: session - :param item: + :param item: can be nsds or vnfds + :param _id : the nsd,vnfd is already created, this is the id :param indata: http body request + :param kwargs: user query string to override parameters. NOT USED :param headers: http request headers - :return: a dict with:: - _id: - storage: : where it is saving - desc: : descriptor: Only present when all the content is received, extracted and read the descriptor + :return: True package has is completely uploaded or False if partial content has been uplodaed. + Raise exception on error """ + # Check that _id exists and it is valid + current_desc = self.get_item(session, item, _id) + content_range_text = headers.get("Content-Range") - transaction_id = headers.get("Transaction-Id") - filename = headers.get("Content-Filename", "pkg") - # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 expected_md5 = headers.get("Content-File-MD5") compressed = None - if "application/gzip" in headers.get("Content-Type") or "application/x-gzip" in headers.get("Content-Type") or \ - "application/zip" in headers.get("Content-Type"): + content_type = headers.get("Content-Type") + if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \ + "application/zip" in content_type: compressed = "gzip" + filename = headers.get("Content-Filename") + if not filename: + filename = "package.tar.gz" if compressed else "package" + # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 file_pkg = None error_text = "" try: @@ -306,41 +413,48 @@ class Engine(object): start = int(content_range[1]) end = int(content_range[2]) + 1 total = int(content_range[3]) - if len(indata) != end-start: - raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format( - start, end-1, len(indata)), HTTPStatus.BAD_REQUEST) else: start = 0 - total = end = len(indata) - if not transaction_id: - # generate transaction - transaction_id = str(uuid4()) - self.fs.mkdir(transaction_id) - # control_file = open(self.storage["path"] + transaction_id + "/.osm.yaml", 'wb') - # control = {"received": 0} - elif not self.fs.file_exists(transaction_id): - raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND) + + if start: + if not self.fs.file_exists(_id, 'dir'): + raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND) else: - pass - # control_file = open(self.storage["path"] + transaction_id + "/.osm.yaml", 'rw') - # control = yaml.load(control_file) - # control_file.seek(0, 0) + self.fs.file_delete(_id, ignore_non_exist=True) + self.fs.mkdir(_id) + storage = self.fs.get_params() - storage["folder"] = transaction_id - storage["file"] = filename + storage["folder"] = _id - file_path = (transaction_id, filename) - if self.fs.file_exists(file_path): + file_path = (_id, filename) + if self.fs.file_exists(file_path, 'file'): file_size = self.fs.file_size(file_path) else: file_size = 0 if file_size != start: - raise EngineException("invalid upload transaction sequence, expected '{}' but received '{}'".format( - file_size, start), HTTPStatus.BAD_REQUEST) + raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format( + file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) file_pkg = self.fs.file_open(file_path, 'a+b') - file_pkg.write(indata) - if end != total: - return {"_id": transaction_id, "storage": storage} + if isinstance(indata, dict): + indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False) + file_pkg.write(indata_text.encode(encoding="utf-8")) + else: + indata_len = 0 + while True: + indata_text = indata.read(4096) + indata_len += len(indata_text) + if not indata_text: + break + file_pkg.write(indata_text) + if content_range_text: + if indata_len != end-start: + raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format( + start, end-1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) + if end != total: + # TODO update to UPLOADING + return False + + # PACKAGE UPLOADED if expected_md5: file_pkg.seek(0, 0) file_md5 = md5() @@ -352,8 +466,6 @@ class Engine(object): raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT) file_pkg.seek(0, 0) if compressed == "gzip": - # TODO unzip, - storage["tarfile"] = filename tar = tarfile.open(mode='r', fileobj=file_pkg) descriptor_file_name = None for tarinfo in tar: @@ -364,34 +476,48 @@ class Engine(object): if len(tarname_path) == 1 and not tarinfo.isdir(): raise EngineException("All files must be inside a dir for package descriptor tar.gz") if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"): - storage["file"] = tarname_path[0] + storage["pkg-dir"] = tarname_path[0] if len(tarname_path) == 2: if descriptor_file_name: - raise EngineException("Found more than one descriptor file at package descriptor tar.gz") + raise EngineException( + "Found more than one descriptor file at package descriptor tar.gz") descriptor_file_name = tarname if not descriptor_file_name: raise EngineException("Not found any descriptor file at package descriptor tar.gz") - self.fs.file_extract(tar, transaction_id) - with self.fs.file_open((transaction_id, descriptor_file_name), "r") as descriptor_file: + storage["descriptor"] = descriptor_file_name + storage["zipfile"] = filename + self.fs.file_extract(tar, _id) + with self.fs.file_open((_id, descriptor_file_name), "r") as descriptor_file: content = descriptor_file.read() else: content = file_pkg.read() - tarname = "" + storage["descriptor"] = descriptor_file_name = filename - if tarname.endswith(".json"): + if descriptor_file_name.endswith(".json"): error_text = "Invalid json format " indata = json.load(content) else: error_text = "Invalid yaml format " indata = yaml.load(content) - return {"_id": transaction_id, "storage": storage, "desc": indata} + + current_desc["_admin"]["storage"] = storage + current_desc["_admin"]["onboardingState"] = "ONBOARDED" + current_desc["_admin"]["operationalState"] = "ENABLED" + + self._edit_item(session, item, _id, current_desc, indata, kwargs) + # TODO if descriptor has changed because kwargs update content and remove cached zip + # TODO if zip is not present creates one + return True + except EngineException: raise except IndexError: raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'", - HTTPStatus.BAD_REQUEST) + HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) except IOError as e: raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST) + except tarfile.ReadError as e: + raise EngineException("invalid file content {}".format(e), HTTPStatus.BAD_REQUEST) except (ValueError, yaml.YAMLError) as e: raise EngineException(error_text + str(e)) finally: @@ -400,119 +526,283 @@ class Engine(object): def new_nsr(self, session, ns_request): """ - Creates a new nsr into database + Creates a new nsr into database. It also creates needed vnfrs :param session: contains the used login username and working project :param ns_request: params to be used for the nsr - :return: nsr descriptor to be stored at database and the _id + :return: the _id of nsr descriptor stored at database """ + rollback = [] + step = "" + try: + # look for nsr + step = "getting nsd id='{}' from database".format(ns_request.get("nsdId")) + nsd = self.get_item(session, "nsds", ns_request["nsdId"]) + nsr_id = str(uuid4()) + now = time() + step = "filling nsr from input data" + nsr_descriptor = { + "name": ns_request["nsName"], + "name-ref": ns_request["nsName"], + "short-name": ns_request["nsName"], + "admin-status": "ENABLED", + "nsd": nsd, + "datacenter": ns_request["vimAccountId"], + "resource-orchestrator": "osmopenmano", + "description": ns_request.get("nsDescription", ""), + "constituent-vnfr-ref": [], - # look for nsr - nsd = self.get_item(session, "nsds", ns_request["nsdId"]) - _id = str(uuid4()) - nsr_descriptor = { - "name": ns_request["nsName"], - "name-ref": ns_request["nsName"], - "short-name": ns_request["nsName"], - "admin-status": "ENABLED", - "nsd": nsd, - "datacenter": ns_request["vimAccountId"], - "resource-orchestrator": "osmopenmano", - "description": ns_request.get("nsDescription", ""), - "constituent-vnfr-ref": ["TODO datacenter-id, vnfr-id"], - - "operational-status": "init", # typedef ns-operational- - "config-status": "init", # typedef config-states - "detailed-status": "scheduled", - - "orchestration-progress": {}, # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}}, - - "crete-time": time(), - "nsd-name-ref": nsd["name"], - "operational-events": [], # "id", "timestamp", "description", "event", - "nsd-ref": nsd["id"], - "ns-instance-config-ref": _id, - "id": _id, + "operational-status": "init", # typedef ns-operational- + "config-status": "init", # typedef config-states + "detailed-status": "scheduled", - # "input-parameter": xpath, value, - "ssh-authorized-key": ns_request.get("key-pair-ref"), - } - ns_request["nsr_id"] = _id - return nsr_descriptor, _id + "orchestration-progress": {}, + # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}}, + + "crete-time": now, + "nsd-name-ref": nsd["name"], + "operational-events": [], # "id", "timestamp", "description", "event", + "nsd-ref": nsd["id"], + "instantiate_params": ns_request, + "ns-instance-config-ref": nsr_id, + "id": nsr_id, + "_id": nsr_id, + # "input-parameter": xpath, value, + "ssh-authorized-key": ns_request.get("key-pair-ref"), + } + ns_request["nsr_id"] = nsr_id + + # Create VNFR + needed_vnfds = {} + for member_vnf in nsd["constituent-vnfd"]: + vnfd_id = member_vnf["vnfd-id-ref"] + step = "getting vnfd id='{}' constituent-vnfd='{}' from database".format( + member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"]) + if vnfd_id not in needed_vnfds: + # Obtain vnfd + vnf_filter = {"id": vnfd_id} + self._add_read_filter(session, "vnfds", vnf_filter) + vnfd = self.db.get_one("vnfds", vnf_filter) + vnfd.pop("_admin") + needed_vnfds[vnfd_id] = vnfd + else: + vnfd = needed_vnfds[vnfd_id] + step = "filling vnfr vnfd-id='{}' constituent-vnfd='{}'".format( + member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"]) + vnfr_id = str(uuid4()) + vnfr_descriptor = { + "id": vnfr_id, + "_id": vnfr_id, + "nsr-id-ref": nsr_id, + "member-vnf-index-ref": member_vnf["member-vnf-index"], + "created-time": now, + # "vnfd": vnfd, # at OSM model.but removed to avoid data duplication TODO: revise + "vnfd-ref": vnfd_id, + "vnfd-id": vnfr_id, # not at OSM model, but useful + "vim-account-id": None, + "vdur": [], + "connection-point": [], + "ip-address": None, # mgmt-interface filled by LCM + } + for cp in vnfd.get("connection-point", ()): + vnf_cp = { + "name": cp["name"], + "connection-point-id": cp.get("id"), + "id": cp.get("id"), + # "ip-address", "mac-address" # filled by LCM + # vim-id # TODO it would be nice having a vim port id + } + vnfr_descriptor["connection-point"].append(vnf_cp) + for vdu in vnfd["vdu"]: + vdur_id = str(uuid4()) + vdur = { + "id": vdur_id, + "vdu-id-ref": vdu["id"], + "ip-address": None, # mgmt-interface filled by LCM + # "vim-id", "flavor-id", "image-id", "management-ip" # filled by LCM + "internal-connection-point": [], + } + # TODO volumes: name, volume-id + for icp in vdu.get("internal-connection-point", ()): + vdu_icp = { + "id": icp["id"], + "connection-point-id": icp["id"], + "name": icp.get("name"), + # "ip-address", "mac-address" # filled by LCM + # vim-id # TODO it would be nice having a vim port id + } + vdur["internal-connection-point"].append(vdu_icp) + vnfr_descriptor["vdur"].append(vdur) + + step = "creating vnfr vnfd-id='{}' constituent-vnfd='{}' at database".format( + member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"]) + self._format_new_data(session, "vnfrs", vnfr_descriptor) + self.db.create("vnfrs", vnfr_descriptor) + rollback.append({"session": session, "item": "vnfrs", "_id": vnfr_id, "force": True}) + nsr_descriptor["constituent-vnfr-ref"].append(vnfr_id) - def new_item(self, session, item, indata={}, kwargs=None, headers={}): + step = "creating nsr at database" + self._format_new_data(session, "nsrs", nsr_descriptor) + self.db.create("nsrs", nsr_descriptor) + return nsr_id + except Exception as e: + raise EngineException("Error {}: {}".format(step, e)) + for rollback_item in rollback: + try: + self.engine.del_item(**rollback) + except Exception as e2: + self.logger.error("Rollback Exception {}: {}".format(rollback, e2)) + + @staticmethod + def _update_descriptor(desc, kwargs): """ - Creates a new entry into database + Update descriptor with the kwargs. It contains dot separated keys + :param desc: dictionary to be updated + :param kwargs: plain dictionary to be used for updating. + :return: + """ + if not kwargs: + return + try: + for k, v in kwargs.items(): + update_content = desc + kitem_old = None + klist = k.split(".") + for kitem in klist: + if kitem_old is not None: + update_content = update_content[kitem_old] + if isinstance(update_content, dict): + kitem_old = kitem + elif isinstance(update_content, list): + kitem_old = int(kitem) + else: + raise EngineException( + "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) + update_content[kitem_old] = v + except KeyError: + raise EngineException( + "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) + except ValueError: + raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( + k, kitem)) + except IndexError: + raise EngineException( + "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) + + def new_item(self, session, item, indata={}, kwargs=None, headers={}, force=False): + """ + Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry, + that must be completed with a call to method upload_content :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... + :param item: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds :param indata: data to be inserted :param kwargs: used to override the indata descriptor :param headers: http request headers - :return: _id, transaction_id: identity of the inserted data. or transaction_id if Content-Range is used + :param force: If True avoid some dependence checks + :return: _id: identity of the inserted data. """ - # TODO validate input. Check not exist at database - # TODO add admin and status - transaction = None - if headers.get("Content-Range") or "application/gzip" in headers.get("Content-Type") or \ - "application/x-gzip" in headers.get("Content-Type") or "application/zip" in headers.get("Content-Type"): - if not indata: + try: + item_envelop = item + if item in ("nsds", "vnfds"): + item_envelop = "userDefinedData" + content = self._remove_envelop(item_envelop, indata) + + # Override descriptor with query string kwargs + self._update_descriptor(content, kwargs) + if not indata and item not in ("nsds", "vnfds"): raise EngineException("Empty payload") - transaction = self._new_item_partial(session, item, indata, headers) - if "desc" not in transaction: - return transaction["_id"], False - indata = transaction["desc"] - content = self._remove_envelop(item, indata) + validate_input(content, item, new=True) - # Override descriptor with query string kwargs - if kwargs: - try: - for k, v in kwargs.items(): - update_content = content - kitem_old = None - klist = k.split(".") - for kitem in klist: - if kitem_old is not None: - update_content = update_content[kitem_old] - if isinstance(update_content, dict): - kitem_old = kitem - elif isinstance(update_content, list): - kitem_old = int(kitem) - else: - raise EngineException( - "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) - update_content[kitem_old] = v - except KeyError: - raise EngineException( - "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) - except ValueError: - raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( - k, kitem)) - except IndexError: - raise EngineException( - "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) - if not indata: - raise EngineException("Empty payload") + if item == "nsrs": + # in this case the input descriptor is not the data to be stored + return self.new_nsr(session, ns_request=content) - if item == "nsrs": - # in this case the imput descriptor is not the data to be stored - ns_request = content - content, _id = self.new_nsr(session, ns_request) - transaction = {"_id": _id} - - self._validate_new_data(session, item, content) - self._format_new_data(session, item, content, transaction) - _id = self.db.create(item, content) - if item == "nsrs": - self.msg.write("ns", "create", _id) - return _id, True + self._validate_new_data(session, item_envelop, content, force) + if item in ("nsds", "vnfds"): + content = {"_admin": {"userDefinedData": content}} + self._format_new_data(session, item, content) + _id = self.db.create(item, content) + + if item == "vim_accounts": + msg_data = self.db.get_one(item, {"_id": _id}) + msg_data.pop("_admin", None) + self.msg.write("vim_account", "create", msg_data) + elif item == "sdns": + msg_data = self.db.get_one(item, {"_id": _id}) + msg_data.pop("_admin", None) + self.msg.write("sdn", "create", msg_data) + return _id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + + def new_nslcmop(self, session, nsInstanceId, operation, params): + now = time() + _id = str(uuid4()) + nslcmop = { + "id": _id, + "_id": _id, + "operationState": "PROCESSING", # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK + "statusEnteredTime": now, + "nsInstanceId": nsInstanceId, + "lcmOperationType": operation, + "startTime": now, + "isAutomaticInvocation": False, + "operationParams": params, + "isCancelPending": False, + "links": { + "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id, + "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsInstanceId, + } + } + return nslcmop + + def ns_operation(self, session, nsInstanceId, operation, indata, kwargs=None): + """ + Performs a new operation over a ns + :param session: contains the used login username and working project + :param nsInstanceId: _id of the nsr to perform the operation + :param operation: it can be: instantiate, terminate, action, TODO: update, heal + :param indata: descriptor with the parameters of the operation + :param kwargs: used to override the indata descriptor + :return: id of the nslcmops + """ + try: + # Override descriptor with query string kwargs + self._update_descriptor(indata, kwargs) + validate_input(indata, "ns_" + operation, new=True) + # get ns from nsr_id + nsr = self.get_item(session, "nsrs", nsInstanceId) + if not nsr["_admin"].get("nsState") or nsr["_admin"]["nsState"] == "NOT_INSTANTIATED": + if operation == "terminate" and indata.get("autoremove"): + # NSR must be deleted + return self.del_item(session, "nsrs", nsInstanceId) + if operation != "instantiate": + raise EngineException("ns_instance '{}' cannot be '{}' because it is not instantiated".format( + nsInstanceId, operation), HTTPStatus.CONFLICT) + else: + if operation == "instantiate" and not indata.get("force"): + raise EngineException("ns_instance '{}' cannot be '{}' because it is already instantiated".format( + nsInstanceId, operation), HTTPStatus.CONFLICT) + indata["nsInstanceId"] = nsInstanceId + self._check_ns_operation(session, nsr, operation, indata) + nslcmop = self.new_nslcmop(session, nsInstanceId, operation, indata) + self._format_new_data(session, "nslcmops", nslcmop) + _id = self.db.create("nslcmops", nslcmop) + indata["_id"] = _id + self.msg.write("ns", operation, nslcmop) + return _id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + # except DbException as e: + # raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND) def _add_read_filter(self, session, item, filter): if session["project_id"] == "admin": # allows all return filter if item == "users": filter["username"] = session["username"] - elif item == "vnfds" or item == "nsds": + elif item in ("vnfds", "nsds", "nsrs"): filter["_admin.projects_read.cont"] = ["ANY", session["project_id"]] def _add_delete_filter(self, session, item, filter): @@ -527,6 +817,61 @@ class Engine(object): elif item in ("vnfds", "nsds") and session["project_id"] != "admin": filter["_admin.projects_write.cont"] = ["ANY", session["project_id"]] + def get_file(self, session, item, _id, path=None, accept_header=None): + """ + Return the file content of a vnfd or nsd + :param session: contains the used login username and working project + :param item: it can be vnfds or nsds + :param _id: Identity of the vnfd, ndsd + :param path: artifact path or "$DESCRIPTOR" or None + :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain + :return: opened file or raises an exception + """ + accept_text = accept_zip = False + if accept_header: + if 'text/plain' in accept_header or '*/*' in accept_header: + accept_text = True + if 'application/zip' in accept_header or '*/*' in accept_header: + accept_zip = True + if not accept_text and not accept_zip: + raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'", + http_code=HTTPStatus.NOT_ACCEPTABLE) + + content = self.get_item(session, item, _id) + if content["_admin"]["onboardingState"] != "ONBOARDED": + raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. " + "onboardingState is {}".format(content["_admin"]["onboardingState"]), + http_code=HTTPStatus.CONFLICT) + storage = content["_admin"]["storage"] + if path is not None and path != "$DESCRIPTOR": # artifacts + if not storage.get('pkg-dir'): + raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST) + if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'): + folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path)) + return folder_content, "text/plain" + # TODO manage folders in http + else: + return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \ + "application/octet-stream" + + # pkgtype accept ZIP TEXT -> result + # manyfiles yes X -> zip + # no yes -> error + # onefile yes no -> zip + # X yes -> text + + if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"): + return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain" + elif storage.get('pkg-dir') and not accept_zip: + raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'" + "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE) + else: + if not storage.get('zipfile'): + # TODO generate zipfile if not present + raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in " + "future versions", http_code=HTTPStatus.NOT_ACCEPTABLE) + return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip" + def get_item_list(self, session, item, filter={}): """ Get a list of items @@ -536,6 +881,9 @@ class Engine(object): :return: The list, it can be empty if no one match the filter. """ # TODO add admin to filter, validate rights + # TODO transform data for SOL005 URL requests. Transform filtering + # TODO implement "field-type" query string SOL005 + self._add_read_filter(session, item, filter) return self.db.get_list(item, filter) @@ -543,12 +891,13 @@ class Engine(object): """ Get complete information on an items :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... + :param item: it can be: users, projects, vnfds, nsds, :param _id: server id of the item :return: dictionary, raise exception if not found. """ filter = {"_id": _id} # TODO add admin to filter, validate rights + # TODO transform data for SOL005 URL requests self._add_read_filter(session, item, filter) return self.db.get_one(item, filter) @@ -564,30 +913,47 @@ class Engine(object): self._add_read_filter(session, item, filter) return self.db.del_list(item, filter) - def del_item(self, session, item, _id): + def del_item(self, session, item, _id, force=False): """ - Get complete information on an items + Delete item by its internal id :param session: contains the used login username and working project :param item: it can be: users, projects, vnfds, nsds, ... :param _id: server id of the item - :return: dictionary, raise exception if not found. + :param force: indicates if deletion must be forced in case of conflict + :return: dictionary with deleted item _id. It raises exception if not found. """ # TODO add admin to filter, validate rights # data = self.get_item(item, _id) filter = {"_id": _id} self._add_delete_filter(session, item, filter) + if item in ("vnfds", "nsds") and not force: + descriptor = self.get_item(session, item, _id) + descriptor_id = descriptor["id"] + self._check_dependencies_on_descriptor(session, item, descriptor_id) if item == "nsrs": + nsr = self.db.get_one(item, filter) + if nsr["_admin"]["nsState"] == "INSTANTIATED" and not force: + raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. " + "Launch 'terminate' operation first; or force deletion".format(_id), + http_code=HTTPStatus.CONFLICT) + v = self.db.del_one(item, {"_id": _id}) + self.db.del_list("nslcmops", {"nsInstanceId": _id}) + self.db.del_list("vnfrs", {"nsr-id-ref": _id}) + self.msg.write("ns", "deleted", {"_id": _id}) + return v + if item in ("vim_accounts", "sdns"): desc = self.db.get_one(item, filter) desc["_admin"]["to_delete"] = True self.db.replace(item, _id, desc) # TODO change to set_one - self.msg.write("ns", "delete", _id) - return {"deleted": 1} + if item == "vim_accounts": + self.msg.write("vim_account", "delete", {"_id": _id}) + elif item == "sdns": + self.msg.write("sdn", "delete", {"_id": _id}) + return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED v = self.db.del_one(item, filter) self.fs.file_delete(_id, ignore_non_exist=True) - if item == "nsrs": - self.msg.write("ns", "delete", _id) return v def prune(self): @@ -599,39 +965,56 @@ class Engine(object): def create_admin(self): """ - Creates a new user admin/admin into database. Only allowed if database is empty. Useful for initialization - :return: _id identity of the inserted data. + Creates a new user admin/admin into database if database is empty. Useful for initialization + :return: _id identity of the inserted data, or None """ users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False) if users: - raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED) + return None + # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED) indata = {"username": "admin", "password": "admin", "projects": ["admin"]} fake_session = {"project_id": "admin", "username": "admin"} self._format_new_data(fake_session, "users", indata) _id = self.db.create("users", indata) return _id - def edit_item(self, session, item, id, indata={}, kwargs=None): + def init_db(self, target_version='1.0'): """ - Update an existing entry at database - :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... - :param id: identity of entry to be updated - :param indata: data to be inserted - :param kwargs: used to override the indata descriptor - :return: dictionary, raise exception if not found. + Init database if empty. If not empty it checks that database version is ok. + If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' + :return: None if ok, exception if error or if the version is different. """ + version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False) + if not version: + # create user admin + self.create_admin() + # create database version + version_data = { + "_id": '1.0', # version text + "version": 1000, # version number + "date": "2018-04-12", # version date + "description": "initial design", # changes in this version + 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR, + } + self.db.create("version", version_data) + elif version["_id"] != target_version: + # TODO implement migration process + raise EngineException("Wrong database version '{}'. Expected '{}'".format( + version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR) + elif version["status"] != 'ENABLED': + raise EngineException("Wrong database status '{}'".format( + version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR) + return - content = self.get_item(session, item, id) + def _edit_item(self, session, item, id, content, indata={}, kwargs=None, force=False): if indata: indata = self._remove_envelop(item, indata) - # TODO update content with with a deep-update # Override descriptor with query string kwargs if kwargs: try: for k, v in kwargs.items(): - update_content = content + update_content = indata kitem_old = None klist = k.split(".") for kitem in klist: @@ -654,10 +1037,35 @@ class Engine(object): except IndexError: raise EngineException( "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) + try: + validate_input(indata, item, new=False) + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - self._validate_new_data(session, item, content) + _deep_update(content, indata) + self._validate_new_data(session, item, content, id, force) # self._format_new_data(session, item, content) self.db.replace(item, id, content) + if item in ("vim_accounts", "sdns"): + indata.pop("_admin", None) + indata["_id"] = id + if item == "vim_accounts": + self.msg.write("vim_account", "edit", indata) + elif item == "sdns": + self.msg.write("sdn", "edit", indata) return id + def edit_item(self, session, item, _id, indata={}, kwargs=None, force=False): + """ + Update an existing entry at database + :param session: contains the used login username and working project + :param item: it can be: users, projects, vnfds, nsds, ... + :param _id: identifier to be updated + :param indata: data to be inserted + :param kwargs: used to override the indata descriptor + :param force: If True avoid some dependence checks + :return: dictionary, raise exception if not found. + """ + content = self.get_item(session, item, _id) + return self._edit_item(session, item, _id, content, indata, kwargs, force)