X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fengine.py;h=e7bf7c9e9f23eb726e6574e164114b941bf00364;hp=c35617b7b832a7e47fa1afdf8399c3d341b75f0a;hb=refs%2Fchanges%2F79%2F5979%2F1;hpb=c94c3df90aa64298a7935a80b221f80f3c043260 diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index c35617b..e7bf7c9 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -18,6 +18,7 @@ from msgbase import MsgException from http import HTTPStatus from time import time from copy import deepcopy +from validation import validate_input, ValidationError __author__ = "Alfonso Tierno " @@ -29,6 +30,29 @@ class EngineException(Exception): Exception.__init__(self, message) +def _deep_update(dict_to_change, dict_reference): + """ + Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396 + :param dict_to_change: Ends modified + :param dict_reference: reference + :return: none + """ + + for k in dict_reference: + if dict_reference[k] is None: # None->Anything + if k in dict_to_change: + del dict_to_change[k] + elif not isinstance(dict_reference[k], dict): # NotDict->Anything + dict_to_change[k] = dict_reference[k] + elif k not in dict_to_change: # Dict->Empty + dict_to_change[k] = deepcopy(dict_reference[k]) + _deep_update(dict_to_change[k], dict_reference[k]) + elif isinstance(dict_to_change[k], dict): # Dict->Dict + _deep_update(dict_to_change[k], dict_reference[k]) + else: # Dict->NotDict + dict_to_change[k] = deepcopy(dict_reference[k]) + _deep_update(dict_to_change[k], dict_reference[k]) + class Engine(object): def __init__(self): @@ -189,7 +213,7 @@ class Engine(object): """ Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the vnfd or nsd content - :param item: can be vnfds, nsds, users, projects, + :param item: can be vnfds, nsds, users, projects, userDefinedData (initial content of a vnfds, nsds :param indata: Content to be inspected :return: the useful part of indata """ @@ -214,9 +238,12 @@ class Engine(object): if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: raise EngineException("'nsd' must be a list only one element") clean_indata = clean_indata['nsd'][0] + elif item == "userDefinedData": + if "userDefinedData" in indata: + clean_indata = clean_indata['userDefinedData'] return clean_indata - def _validate_new_data(self, session, item, indata): + def _validate_new_data(self, session, item, indata, id=None): if item == "users": if not indata.get("username"): raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY) @@ -233,8 +260,10 @@ class Engine(object): # check name not exist if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT) - elif item == "vnfds" or item == "nsds": + elif item in ("vnfds", "nsds"): filter = {"id": indata["id"]} + if id: + filter["_id.neq"] = id # TODO add admin to filter, validate rights self._add_read_filter(session, item, filter) if self.db.get_one(item, filter, fail_on_empty=False): @@ -242,8 +271,17 @@ class Engine(object): HTTPStatus.CONFLICT) # TODO validate with pyangbind + elif item == "userDefinedData": + # TODO validate userDefinedData is a keypair values + pass + elif item == "nsrs": pass + elif item == "vims" or item == "sdns": + if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): + raise EngineException("name '{}' already exist for {}".format(indata["name"], item), + HTTPStatus.CONFLICT) + def _format_new_data(self, session, item, indata, admin=None): now = time() @@ -254,7 +292,7 @@ class Engine(object): if item == "users": _id = indata["username"] salt = uuid4().hex - indata["_admin"]["salt"] = salt + indata["_admin"]["salt"] = salt indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest() elif item == "projects": _id = indata["name"] @@ -266,36 +304,47 @@ class Engine(object): storage = admin.get("storage") if not _id: _id = str(uuid4()) - if item == "vnfds" or item == "nsds": + if item in ("vnfds", "nsds"): if not indata["_admin"].get("projects_read"): indata["_admin"]["projects_read"] = [session["project_id"]] if not indata["_admin"].get("projects_write"): indata["_admin"]["projects_write"] = [session["project_id"]] + indata["_admin"]["onboardingState"] = "CREATED" + indata["_admin"]["operationalState"] = "DISABLED" + indata["_admin"]["usageSate"] = "NOT_IN_USE" + elif item in ("vims", "sdns"): + indata["_admin"]["operationalState"] = "PROCESSING" + if storage: indata["_admin"]["storage"] = storage indata["_id"] = _id - def _new_item_partial(self, session, item, indata, headers): + def upload_content(self, session, item, _id, indata, kwargs, headers): """ - Used for recieve content by chunks (with a transaction_id header and/or gzip file. It will store and extract + Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract) :param session: session - :param item: + :param item: can be nsds or vnfds + :param _id : the nsd,vnfd is already created, this is the id :param indata: http body request + :param kwargs: user query string to override parameters. NOT USED :param headers: http request headers - :return: a dict with:: - _id: - storage: : where it is saving - desc: : descriptor: Only present when all the content is received, extracted and read the descriptor + :return: True package has is completely uploaded or False if partial content has been uplodaed. + Raise exception on error """ + # Check that _id exist and it is valid + current_desc = self.get_item(session, item, _id) + content_range_text = headers.get("Content-Range") - transaction_id = headers.get("Transaction-Id") - filename = headers.get("Content-Filename", "pkg") - # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 expected_md5 = headers.get("Content-File-MD5") compressed = None - if "application/gzip" in headers.get("Content-Type") or "application/x-gzip" in headers.get("Content-Type") or \ - "application/zip" in headers.get("Content-Type"): + content_type = headers.get("Content-Type") + if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \ + "application/zip" in content_type: compressed = "gzip" + filename = headers.get("Content-Filename") + if not filename: + filename = "package.tar.gz" if compressed else "package" + # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 file_pkg = None error_text = "" try: @@ -306,41 +355,48 @@ class Engine(object): start = int(content_range[1]) end = int(content_range[2]) + 1 total = int(content_range[3]) - if len(indata) != end-start: - raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format( - start, end-1, len(indata)), HTTPStatus.BAD_REQUEST) else: start = 0 - total = end = len(indata) - if not transaction_id: - # generate transaction - transaction_id = str(uuid4()) - self.fs.mkdir(transaction_id) - # control_file = open(self.storage["path"] + transaction_id + "/.osm.yaml", 'wb') - # control = {"received": 0} - elif not self.fs.file_exists(transaction_id): - raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND) + + if start: + if not self.fs.file_exists(_id, 'dir'): + raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND) else: - pass - # control_file = open(self.storage["path"] + transaction_id + "/.osm.yaml", 'rw') - # control = yaml.load(control_file) - # control_file.seek(0, 0) + self.fs.file_delete(_id, ignore_non_exist=True) + self.fs.mkdir(_id) + storage = self.fs.get_params() - storage["folder"] = transaction_id - storage["file"] = filename + storage["folder"] = _id - file_path = (transaction_id, filename) - if self.fs.file_exists(file_path): + file_path = (_id, filename) + if self.fs.file_exists(file_path, 'file'): file_size = self.fs.file_size(file_path) else: file_size = 0 if file_size != start: - raise EngineException("invalid upload transaction sequence, expected '{}' but received '{}'".format( - file_size, start), HTTPStatus.BAD_REQUEST) + raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format( + file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) file_pkg = self.fs.file_open(file_path, 'a+b') - file_pkg.write(indata) - if end != total: - return {"_id": transaction_id, "storage": storage} + if isinstance(indata, dict): + indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False) + file_pkg.write(indata_text.encode(encoding="utf-8")) + else: + indata_len = 0 + while True: + indata_text = indata.read(4096) + indata_len += len(indata_text) + if not indata_text: + break + file_pkg.write(indata_text) + if content_range_text: + if indata_len != end-start: + raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format( + start, end-1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) + if end != total: + # TODO update to UPLOADING + return False + + # PACKAGE UPLOADED if expected_md5: file_pkg.seek(0, 0) file_md5 = md5() @@ -352,8 +408,6 @@ class Engine(object): raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT) file_pkg.seek(0, 0) if compressed == "gzip": - # TODO unzip, - storage["tarfile"] = filename tar = tarfile.open(mode='r', fileobj=file_pkg) descriptor_file_name = None for tarinfo in tar: @@ -364,32 +418,43 @@ class Engine(object): if len(tarname_path) == 1 and not tarinfo.isdir(): raise EngineException("All files must be inside a dir for package descriptor tar.gz") if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"): - storage["file"] = tarname_path[0] + storage["pkg-dir"] = tarname_path[0] if len(tarname_path) == 2: if descriptor_file_name: raise EngineException("Found more than one descriptor file at package descriptor tar.gz") descriptor_file_name = tarname if not descriptor_file_name: raise EngineException("Not found any descriptor file at package descriptor tar.gz") - self.fs.file_extract(tar, transaction_id) - with self.fs.file_open((transaction_id, descriptor_file_name), "r") as descriptor_file: + storage["descriptor"] = descriptor_file_name + storage["zipfile"] = filename + self.fs.file_extract(tar, _id) + with self.fs.file_open((_id, descriptor_file_name), "r") as descriptor_file: content = descriptor_file.read() else: content = file_pkg.read() - tarname = "" + storage["descriptor"] = descriptor_file_name = filename - if tarname.endswith(".json"): + if descriptor_file_name.endswith(".json"): error_text = "Invalid json format " indata = json.load(content) else: error_text = "Invalid yaml format " indata = yaml.load(content) - return {"_id": transaction_id, "storage": storage, "desc": indata} + + current_desc["_admin"]["storage"] = storage + current_desc["_admin"]["onboardingState"] = "ONBOARDED" + current_desc["_admin"]["operationalState"] = "ENABLED" + + self._edit_item(session, item, _id, current_desc, indata, kwargs) + # TODO if descriptor has changed because kwargs update content and remove cached zip + # TODO if zip is not present creates one + return True + except EngineException: raise except IndexError: raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'", - HTTPStatus.BAD_REQUEST) + HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE) except IOError as e: raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST) except (ValueError, yaml.YAMLError) as e: @@ -441,28 +506,22 @@ class Engine(object): def new_item(self, session, item, indata={}, kwargs=None, headers={}): """ - Creates a new entry into database + Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry, + that must be completed with a call to method upload_content :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... + :param item: it can be: users, projects, vims, sdns, nsrs, nsds, vnfds :param indata: data to be inserted :param kwargs: used to override the indata descriptor :param headers: http request headers :return: _id, transaction_id: identity of the inserted data. or transaction_id if Content-Range is used """ - # TODO validate input. Check not exist at database - # TODO add admin and status transaction = None - if headers.get("Content-Range") or "application/gzip" in headers.get("Content-Type") or \ - "application/x-gzip" in headers.get("Content-Type") or "application/zip" in headers.get("Content-Type"): - if not indata: - raise EngineException("Empty payload") - transaction = self._new_item_partial(session, item, indata, headers) - if "desc" not in transaction: - return transaction["_id"], False - indata = transaction["desc"] - content = self._remove_envelop(item, indata) + item_envelop = item + if item in ("nsds", "vnfds"): + item_envelop = "userDefinedData" + content = self._remove_envelop(item_envelop, indata) # Override descriptor with query string kwargs if kwargs: @@ -491,7 +550,12 @@ class Engine(object): except IndexError: raise EngineException( "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) - if not indata: + try: + validate_input(content, item, new=True) + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + + if not indata and item not in ("nsds", "vnfds"): raise EngineException("Empty payload") if item == "nsrs": @@ -500,12 +564,22 @@ class Engine(object): content, _id = self.new_nsr(session, ns_request) transaction = {"_id": _id} - self._validate_new_data(session, item, content) + self._validate_new_data(session, item_envelop, content) + if item in ("nsds", "vnfds"): + content = {"_admin": {"userDefinedData": content}} self._format_new_data(session, item, content, transaction) _id = self.db.create(item, content) if item == "nsrs": self.msg.write("ns", "create", _id) - return _id, True + elif item == "vims": + msg_data = self.db.get_one(item, {"_id": _id}) + msg_data.pop("_admin", None) + self.msg.write("vim_account", "create", msg_data) + elif item == "sdns": + msg_data = self.db.get_one(item, {"_id": _id}) + msg_data.pop("_admin", None) + self.msg.write("sdn", "create", msg_data) + return _id def _add_read_filter(self, session, item, filter): if session["project_id"] == "admin": # allows all @@ -527,6 +601,61 @@ class Engine(object): elif item in ("vnfds", "nsds") and session["project_id"] != "admin": filter["_admin.projects_write.cont"] = ["ANY", session["project_id"]] + def get_file(self, session, item, _id, path=None, accept_header=None): + """ + Return the file content of a vnfd or nsd + :param session: contains the used login username and working project + :param item: it can be vnfds or nsds + :param _id: Identity of the vnfd, ndsd + :param path: artifact path or "$DESCRIPTOR" or None + :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain + :return: opened file or raises an exception + """ + accept_text = accept_zip = False + if accept_header: + if 'text/plain' in accept_header or '*/*' in accept_header: + accept_text = True + if 'application/zip' in accept_header or '*/*' in accept_header: + accept_zip = True + if not accept_text and not accept_zip: + raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'", + http_code=HTTPStatus.NOT_ACCEPTABLE) + + content = self.get_item(session, item, _id) + if content["_admin"]["onboardingState"] != "ONBOARDED": + raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. " + "onboardingState is {}".format(content["_admin"]["onboardingState"]), + http_code=HTTPStatus.CONFLICT) + storage = content["_admin"]["storage"] + if path is not None and path != "$DESCRIPTOR": # artifacts + if not storage.get('pkg-dir'): + raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST) + if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'): + folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path)) + return folder_content, "text/plain" + # TODO manage folders in http + else: + return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \ + "application/octet-stream" + + # pkgtype accept ZIP TEXT -> result + # manyfiles yes X -> zip + # no yes -> error + # onefile yes no -> zip + # X yes -> text + + if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"): + return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain" + elif storage.get('pkg-dir') and not accept_zip: + raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'" + "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE) + else: + if not storage.get('zipfile'): + # TODO generate zipfile if not present + raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in future versions" + "", http_code=HTTPStatus.NOT_ACCEPTABLE) + return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip" + def get_item_list(self, session, item, filter={}): """ Get a list of items @@ -536,6 +665,9 @@ class Engine(object): :return: The list, it can be empty if no one match the filter. """ # TODO add admin to filter, validate rights + # TODO transform data for SOL005 URL requests. Transform filtering + # TODO implement "field-type" query string SOL005 + self._add_read_filter(session, item, filter) return self.db.get_list(item, filter) @@ -543,12 +675,14 @@ class Engine(object): """ Get complete information on an items :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... + :param item: it can be: users, projects, vnfds, nsds, :param _id: server id of the item :return: dictionary, raise exception if not found. """ + database_item = item filter = {"_id": _id} # TODO add admin to filter, validate rights + # TODO transform data for SOL005 URL requests self._add_read_filter(session, item, filter) return self.db.get_one(item, filter) @@ -577,17 +711,20 @@ class Engine(object): filter = {"_id": _id} self._add_delete_filter(session, item, filter) - if item == "nsrs": + if item in ("nsrs", "vims", "sdns"): desc = self.db.get_one(item, filter) desc["_admin"]["to_delete"] = True self.db.replace(item, _id, desc) # TODO change to set_one - self.msg.write("ns", "delete", _id) - return {"deleted": 1} + if item == "nsrs": + self.msg.write("ns", "delete", _id) + elif item == "vims": + self.msg.write("vim_account", "delete", {"_id": _id}) + elif item == "sdns": + self.msg.write("sdn", "delete", {"_id": _id}) + return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED v = self.db.del_one(item, filter) self.fs.file_delete(_id, ignore_non_exist=True) - if item == "nsrs": - self.msg.write("ns", "delete", _id) return v def prune(self): @@ -599,39 +736,56 @@ class Engine(object): def create_admin(self): """ - Creates a new user admin/admin into database. Only allowed if database is empty. Useful for initialization - :return: _id identity of the inserted data. + Creates a new user admin/admin into database if database is empty. Useful for initialization + :return: _id identity of the inserted data, or None """ users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False) if users: - raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED) + return None + # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED) indata = {"username": "admin", "password": "admin", "projects": ["admin"]} fake_session = {"project_id": "admin", "username": "admin"} self._format_new_data(fake_session, "users", indata) _id = self.db.create("users", indata) return _id - def edit_item(self, session, item, id, indata={}, kwargs=None): + def init_db(self, target_version='1.0'): """ - Update an existing entry at database - :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... - :param id: identity of entry to be updated - :param indata: data to be inserted - :param kwargs: used to override the indata descriptor - :return: dictionary, raise exception if not found. + Init database if empty. If not empty it checks that database version is ok. + If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' + :return: None if ok, exception if error or if the version is different. """ - - content = self.get_item(session, item, id) + version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False) + if not version: + # create user admin + self.create_admin() + # create database version + version_data = { + "_id": '1.0', # version text + "version": 1000, # version number + "date": "2018-04-12", # version date + "description": "initial design", # changes in this version + 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR, + } + self.db.create("version", version_data) + elif version["_id"] != target_version: + # TODO implement migration process + raise EngineException("Wrong database version '{}'. Expected '{}'".format( + version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR) + elif version["status"] != 'ENABLED': + raise EngineException("Wrong database status '{}'".format( + version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR) + return + + def _edit_item(self, session, item, id, content, indata={}, kwargs=None): if indata: indata = self._remove_envelop(item, indata) - # TODO update content with with a deep-update # Override descriptor with query string kwargs if kwargs: try: for k, v in kwargs.items(): - update_content = content + update_content = indata kitem_old = None klist = k.split(".") for kitem in klist: @@ -654,10 +808,37 @@ class Engine(object): except IndexError: raise EngineException( "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) + try: + validate_input(content, item, new=False) + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - self._validate_new_data(session, item, content) + _deep_update(content, indata) + self._validate_new_data(session, item, content, id) # self._format_new_data(session, item, content) self.db.replace(item, id, content) + if item in ("vims", "sdns"): + indata.pop("_admin", None) + indata["_id"] = id + if item == "vims": + self.msg.write("vim_account", "edit", indata) + elif item == "sdns": + self.msg.write("sdn", "edit", indata) return id + def edit_item(self, session, item, _id, indata={}, kwargs=None): + """ + Update an existing entry at database + :param session: contains the used login username and working project + :param item: it can be: users, projects, vnfds, nsds, ... + :param _id: identifier to be updated + :param indata: data to be inserted + :param kwargs: used to override the indata descriptor + :return: dictionary, raise exception if not found. + """ + + content = self.get_item(session, item, _id) + return self._edit_item(session, item, _id, content, indata, kwargs) + +