X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fengine.py;h=f7bc0121b15d93f3104d3fe05531f09738f611a1;hp=872238364a8c847816fdb08e01e8682d7770f163;hb=c5a18892d3b9e5a515c3adab0bafcdc097d9fe28;hpb=ad17766b32a303ba7b1c1f6ca9b6d363ae9d36ed diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index 8722383..f7bc012 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -1,78 +1,77 @@ # -*- coding: utf-8 -*- -from osm_common import dbmongo -from osm_common import dbmemory -from osm_common import fslocal -from osm_common import msglocal -from osm_common import msgkafka -import tarfile -import yaml -import json +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from random import choice as random_choice -from uuid import uuid4 -from hashlib import sha256, md5 +import yaml +from osm_common import dbmongo, dbmemory, fslocal, msglocal, msgkafka, version as common_version from osm_common.dbbase import DbException from osm_common.fsbase import FsException from osm_common.msgbase import MsgException from http import HTTPStatus -from time import time -from copy import deepcopy, copy -from validation import validate_input, ValidationError - -__author__ = "Alfonso Tierno " - -class EngineException(Exception): +from authconn_keystone import AuthconnKeystone +from base_topic import EngineException, versiontuple +from admin_topics import UserTopic, ProjectTopic, VimAccountTopic, WimAccountTopic, SdnTopic +from admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth +from descriptor_topics import VnfdTopic, NsdTopic, PduTopic, NstTopic +from instance_topics import NsrTopic, VnfrTopic, NsLcmOpTopic, NsiTopic, NsiLcmOpTopic +from pmjobs_topics import PmJobsTopic +from base64 import b64encode +from os import urandom, path +from threading import Lock - def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): - self.http_code = http_code - Exception.__init__(self, message) - - -def _deep_update(dict_to_change, dict_reference): - """ - Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396 - :param dict_to_change: Ends modified - :param dict_reference: reference - :return: none - """ - for k in dict_reference: - if dict_reference[k] is None: # None->Anything - if k in dict_to_change: - del dict_to_change[k] - elif not isinstance(dict_reference[k], dict): # NotDict->Anything - dict_to_change[k] = dict_reference[k] - elif k not in dict_to_change: # Dict->Empty - dict_to_change[k] = deepcopy(dict_reference[k]) - _deep_update(dict_to_change[k], dict_reference[k]) - elif isinstance(dict_to_change[k], dict): # Dict->Dict - _deep_update(dict_to_change[k], dict_reference[k]) - else: # Dict->NotDict - dict_to_change[k] = deepcopy(dict_reference[k]) - _deep_update(dict_to_change[k], dict_reference[k]) - - -def get_iterable(input): - """ - Returns an iterable, in case input is None it just returns an empty tuple - :param input: - :return: iterable - """ - if input is None: - return () - return input +__author__ = "Alfonso Tierno " +min_common_version = "0.1.16" class Engine(object): + map_from_topic_to_class = { + "vnfds": VnfdTopic, + "nsds": NsdTopic, + "nsts": NstTopic, + "pdus": PduTopic, + "nsrs": NsrTopic, + "vnfrs": VnfrTopic, + "nslcmops": NsLcmOpTopic, + "vim_accounts": VimAccountTopic, + "wim_accounts": WimAccountTopic, + "sdns": SdnTopic, + "users": UserTopic, + "projects": ProjectTopic, + "nsis": NsiTopic, + "nsilcmops": NsiLcmOpTopic + # [NEW_TOPIC]: add an entry here + # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters + } + + map_target_version_to_int = { + "1.0": 1000, + "1.1": 1001 + # Add new versions here + } def __init__(self): - self.tokens = {} self.db = None self.fs = None self.msg = None + self.auth = None self.config = None + self.operations = None self.logger = logging.getLogger("nbi.engine") + self.map_topic = {} + self.write_lock = None def start(self, config): """ @@ -81,6 +80,11 @@ class Engine(object): :return: None """ self.config = config + # check right version of common + if versiontuple(common_version) < versiontuple(min_common_version): + raise EngineException("Not compatible osm/common version '{}'. Needed '{}' or higher".format( + common_version, min_common_version)) + try: if not self.db: if config["database"]["driver"] == "mongo": @@ -108,7 +112,51 @@ class Engine(object): self.msg.connect(config["message"]) else: raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format( - config["storage"]["driver"])) + config["message"]["driver"])) + if not self.auth: + if config["authentication"]["backend"] == "keystone": + self.auth = AuthconnKeystone(config["authentication"]) + if not self.operations: + if "resources_to_operations" in config["rbac"]: + resources_to_operations_file = config["rbac"]["resources_to_operations"] + else: + possible_paths = ( + __file__[:__file__.rfind("engine.py")] + "resources_to_operations.yml", + "./resources_to_operations.yml" + ) + for config_file in possible_paths: + if path.isfile(config_file): + resources_to_operations_file = config_file + break + if not resources_to_operations_file: + raise EngineException("Invalid permission configuration: resources_to_operations file missing") + + with open(resources_to_operations_file, 'r') as f: + resources_to_operations = yaml.load(f) + + self.operations = [] + + for _, value in resources_to_operations["resources_to_operations"].items(): + if value not in self.operations: + self.operations += [value] + + if config["authentication"]["backend"] == "keystone": + self.map_from_topic_to_class["users"] = UserTopicAuth + self.map_from_topic_to_class["projects"] = ProjectTopicAuth + self.map_from_topic_to_class["roles"] = RoleTopicAuth + + self.write_lock = Lock() + # create one class per topic + for topic, topic_class in self.map_from_topic_to_class.items(): + if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth): + self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth) + elif self.auth and topic_class == RoleTopicAuth: + self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth, + self.operations) + else: + self.map_topic[topic] = topic_class(self.db, self.fs, self.msg) + + self.map_topic["pm_jobs"] = PmJobsTopic(config["prometheus"].get("host"), config["prometheus"].get("port")) except (DbException, FsException, MsgException) as e: raise EngineException(str(e), http_code=e.http_code) @@ -118,977 +166,140 @@ class Engine(object): self.db.db_disconnect() if self.fs: self.fs.fs_disconnect() - if self.fs: - self.fs.fs_disconnect() + if self.msg: + self.msg.disconnect() + self.write_lock = None except (DbException, FsException, MsgException) as e: raise EngineException(str(e), http_code=e.http_code) - def authorize(self, token): - try: - if not token: - raise EngineException("Needed a token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - if token not in self.tokens: - raise EngineException("Invalid token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - session = self.tokens[token] - now = time() - if session["expires"] < now: - del self.tokens[token] - raise EngineException("Expired Token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - return session - except EngineException: - if self.config["global"].get("test.user_not_authorized"): - return {"id": "fake-token-id-for-test", - "project_id": self.config["global"].get("test.project_not_authorized", "admin"), - "username": self.config["global"]["test.user_not_authorized"]} - else: - raise - - def new_token(self, session, indata, remote): - now = time() - user_content = None - - # Try using username/password - if indata.get("username"): - user_rows = self.db.get_list("users", {"username": indata.get("username")}) - user_content = None - if user_rows: - user_content = user_rows[0] - salt = user_content["_admin"]["salt"] - shadow_password = sha256(indata.get("password", "").encode('utf-8') + salt.encode('utf-8')).hexdigest() - if shadow_password != user_content["password"]: - user_content = None - if not user_content: - raise EngineException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED) - elif session: - user_rows = self.db.get_list("users", {"username": session["username"]}) - if user_rows: - user_content = user_rows[0] - else: - raise EngineException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED) - else: - raise EngineException("Provide credentials: username/password or Authorization Bearer token", - http_code=HTTPStatus.UNAUTHORIZED) - - token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') - for _ in range(0, 32)) - if indata.get("project_id"): - project_id = indata.get("project_id") - if project_id not in user_content["projects"]: - raise EngineException("project {} not allowed for this user".format(project_id), - http_code=HTTPStatus.UNAUTHORIZED) - else: - project_id = user_content["projects"][0] - if project_id == "admin": - session_admin = True - else: - project = self.db.get_one("projects", {"_id": project_id}) - session_admin = project.get("admin", False) - new_session = {"issued_at": now, "expires": now+3600, - "_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"], - "remote_port": remote.port, "admin": session_admin} - if remote.name: - new_session["remote_host"] = remote.name - elif remote.ip: - new_session["remote_host"] = remote.ip - - self.tokens[token_id] = new_session - return deepcopy(new_session) - - def get_token_list(self, session): - token_list = [] - for token_id, token_value in self.tokens.items(): - if token_value["username"] == session["username"]: - token_list.append(deepcopy(token_value)) - return token_list - - def get_token(self, session, token_id): - token_value = self.tokens.get(token_id) - if not token_value: - raise EngineException("token not found", http_code=HTTPStatus.NOT_FOUND) - if token_value["username"] != session["username"] and not session["admin"]: - raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED) - return token_value - - def del_token(self, token_id): - try: - del self.tokens[token_id] - return "token '{}' deleted".format(token_id) - except KeyError: - raise EngineException("Token '{}' not found".format(token_id), http_code=HTTPStatus.NOT_FOUND) - - @staticmethod - def _remove_envelop(item, indata=None): - """ - Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the - vnfd or nsd content - :param item: can be vnfds, nsds, users, projects, userDefinedData (initial content of a vnfds, nsds - :param indata: Content to be inspected - :return: the useful part of indata - """ - clean_indata = indata - if not indata: - return {} - if item == "vnfds": - if clean_indata.get('vnfd:vnfd-catalog'): - clean_indata = clean_indata['vnfd:vnfd-catalog'] - elif clean_indata.get('vnfd-catalog'): - clean_indata = clean_indata['vnfd-catalog'] - if clean_indata.get('vnfd'): - if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1: - raise EngineException("'vnfd' must be a list only one element") - clean_indata = clean_indata['vnfd'][0] - elif item == "nsds": - if clean_indata.get('nsd:nsd-catalog'): - clean_indata = clean_indata['nsd:nsd-catalog'] - elif clean_indata.get('nsd-catalog'): - clean_indata = clean_indata['nsd-catalog'] - if clean_indata.get('nsd'): - if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: - raise EngineException("'nsd' must be a list only one element") - clean_indata = clean_indata['nsd'][0] - elif item == "userDefinedData": - if "userDefinedData" in indata: - clean_indata = clean_indata['userDefinedData'] - return clean_indata - - def _check_dependencies_on_descriptor(self, session, item, descriptor_id, _id): - """ - Check that the descriptor to be deleded is not a dependency of others - :param session: client session information - :param item: can be vnfds, nsds - :param descriptor_id: id (provided by client) of descriptor to be deleted - :param _id: internal id of descriptor to be deleted - :return: None or raises exception - """ - if item == "vnfds": - _filter = {"constituent-vnfd.ANYINDEX.vnfd-id-ref": descriptor_id} - if self.get_item_list(session, "nsds", _filter): - raise EngineException("There are nsd that depends on this VNFD", http_code=HTTPStatus.CONFLICT) - if self.get_item_list(session, "vnfrs", {"vnfd-id": _id}): - raise EngineException("There are vnfr that depends on this VNFD", http_code=HTTPStatus.CONFLICT) - elif item == "nsds": - _filter = {"nsdId": _id} - if self.get_item_list(session, "nsrs", _filter): - raise EngineException("There are nsr that depends on this NSD", http_code=HTTPStatus.CONFLICT) - - def _check_descriptor_dependencies(self, session, item, descriptor): - """ - Check that the dependent descriptors exist on a new descriptor or edition - :param session: client session information - :param item: can be nsds, nsrs - :param descriptor: descriptor to be inserted or edit - :return: None or raises exception - """ - if item == "nsds": - if not descriptor.get("constituent-vnfd"): - return - for vnf in descriptor["constituent-vnfd"]: - vnfd_id = vnf["vnfd-id-ref"] - if not self.get_item_list(session, "vnfds", {"id": vnfd_id}): - raise EngineException("Descriptor error at 'constituent-vnfd':'vnfd-id-ref'='{}' references a non " - "existing vnfd".format(vnfd_id), http_code=HTTPStatus.CONFLICT) - elif item == "nsrs": - if not descriptor.get("nsdId"): - return - nsd_id = descriptor["nsdId"] - if not self.get_item_list(session, "nsds", {"id": nsd_id}): - raise EngineException("Descriptor error at nsdId='{}' references a non exist nsd".format(nsd_id), - http_code=HTTPStatus.CONFLICT) - - def _validate_new_data(self, session, item, indata, id=None, force=False): - if item == "users": - if not indata.get("username"): - raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY) - if not indata.get("password"): - raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY) - if not indata.get("projects"): - raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY) - # check username not exists - if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("username '{}' exists".format(indata["username"]), HTTPStatus.CONFLICT) - elif item == "projects": - if not indata.get("name"): - raise EngineException("missing 'name'") - # check name not exists - if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT) - elif item in ("vnfds", "nsds"): - filter = {"id": indata["id"]} - if id: - filter["_id.neq"] = id - # TODO add admin to filter, validate rights - self._add_read_filter(session, item, filter) - if self.db.get_one(item, filter, fail_on_empty=False): - raise EngineException("{} with id '{}' already exists for this tenant".format(item[:-1], indata["id"]), - HTTPStatus.CONFLICT) - # TODO validate with pyangbind. Load and dumps to convert data types - if item == "nsds": - # transform constituent-vnfd:member-vnf-index to string - if indata.get("constituent-vnfd"): - for constituent_vnfd in indata["constituent-vnfd"]: - if "member-vnf-index" in constituent_vnfd: - constituent_vnfd["member-vnf-index"] = str(constituent_vnfd["member-vnf-index"]) - - if item == "nsds" and not force: - self._check_descriptor_dependencies(session, "nsds", indata) - elif item == "userDefinedData": - # TODO validate userDefinedData is a keypair values - pass - - elif item == "nsrs": - pass - elif item == "vim_accounts" or item == "sdns": - filter = {"name": indata.get("name")} - if id: - filter["_id.neq"] = id - if self.db.get_one(item, filter, fail_on_empty=False, fail_on_more=False): - raise EngineException("name '{}' already exists for {}".format(indata["name"], item), - HTTPStatus.CONFLICT) - - def _check_ns_operation(self, session, nsr, operation, indata): - """ - Check that user has enter right parameters for the operation - :param session: - :param operation: it can be: instantiate, terminate, action, TODO: update, heal - :param indata: descriptor with the parameters of the operation - :return: None - """ - vnfds = {} - vim_accounts = [] - nsd = nsr["nsd"] - - def check_valid_vnf_member_index(member_vnf_index): - for vnf in nsd["constituent-vnfd"]: - if member_vnf_index == vnf["member-vnf-index"]: - vnfd_id = vnf["vnfd-id-ref"] - if vnfd_id not in vnfds: - vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id}) - return vnfds[vnfd_id] - else: - raise EngineException("Invalid parameter member_vnf_index='{}' is not one of the " - "nsd:constituent-vnfd".format(member_vnf_index)) - - def check_valid_vim_account(vim_account): - if vim_account in vim_accounts: - return - try: - self.db.get_one("vim_accounts", {"_id": vim_account}) - except Exception: - raise EngineException("Invalid vimAccountId='{}' not present".format(vim_account)) - vim_accounts.append(vim_account) - - if operation == "action": - # check vnf_member_index - if indata.get("vnf_member_index"): - indata["member_vnf_index"] = indata.pop("vnf_member_index") # for backward compatibility - if not indata.get("member_vnf_index"): - raise EngineException("Missing 'member_vnf_index' parameter") - vnfd = check_valid_vnf_member_index(indata["member_vnf_index"]) - # check primitive - for config_primitive in get_iterable(vnfd.get("vnf-configuration", {}).get("config-primitive")): - if indata["primitive"] == config_primitive["name"]: - # check needed primitive_params are provided - if indata.get("primitive_params"): - in_primitive_params_copy = copy(indata["primitive_params"]) - else: - in_primitive_params_copy = {} - for paramd in get_iterable(config_primitive.get("parameter")): - if paramd["name"] in in_primitive_params_copy: - del in_primitive_params_copy[paramd["name"]] - elif not paramd.get("default-value"): - raise EngineException("Needed parameter {} not provided for primitive '{}'".format( - paramd["name"], indata["primitive"])) - # check no extra primitive params are provided - if in_primitive_params_copy: - raise EngineException("parameter/s '{}' not present at vnfd for primitive '{}'".format( - list(in_primitive_params_copy.keys()), indata["primitive"])) - break - else: - raise EngineException("Invalid primitive '{}' is not present at vnfd".format(indata["primitive"])) - if operation == "scale": - vnfd = check_valid_vnf_member_index(indata["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]) - for scaling_group in get_iterable(vnfd.get("scaling-group-descriptor")): - if indata["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"] == scaling_group["name"]: - break - else: - raise EngineException("Invalid scaleVnfData:scaleByStepData:scaling-group-descriptor '{}' is not " - "present at vnfd:scaling-group-descriptor".format( - indata["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"])) - if operation == "instantiate": - # check vim_account - check_valid_vim_account(indata["vimAccountId"]) - for in_vnf in get_iterable(indata.get("vnf")): - vnfd = check_valid_vnf_member_index(in_vnf["member-vnf-index"]) - if in_vnf.get("vimAccountId"): - check_valid_vim_account(in_vnf["vimAccountId"]) - for in_vdu in get_iterable(in_vnf.get("vdu")): - for vdud in get_iterable(vnfd.get("vdu")): - if vdud["id"] == in_vdu["id"]: - for volume in get_iterable(in_vdu.get("volume")): - for volumed in get_iterable(vdud.get("volumes")): - if volumed["name"] == volume["name"]: - break - else: - raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu[id='{}']:" - "volume:name='{}' is not present at vnfd:vdu:volumes list". - format(in_vnf["member-vnf-index"], in_vdu["id"], - volume["name"])) - break - else: - raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu:id='{}' is not " - "present at vnfd".format(in_vnf["member-vnf-index"], in_vdu["id"])) - - for in_internal_vld in get_iterable(in_vnf.get("internal-vld")): - for internal_vldd in get_iterable(vnfd.get("internal-vld")): - if in_internal_vld["name"] == internal_vldd["name"] or \ - in_internal_vld["name"] == internal_vldd["id"]: - break - else: - raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:internal-vld:name='{}'" - " is not present at vnfd '{}'".format(in_vnf["member-vnf-index"], - in_internal_vld["name"], - vnfd["id"])) - for in_vld in get_iterable(indata.get("vld")): - for vldd in get_iterable(nsd.get("vld")): - if in_vld["name"] == vldd["name"] or in_vld["name"] == vldd["id"]: - break - else: - raise EngineException("Invalid parameter vld:name='{}' is not present at nsd:vld".format( - in_vld["name"])) - - def _format_new_data(self, session, item, indata): - now = time() - if "_admin" not in indata: - indata["_admin"] = {} - indata["_admin"]["created"] = now - indata["_admin"]["modified"] = now - if item == "users": - indata["_id"] = indata["username"] - salt = uuid4().hex - indata["_admin"]["salt"] = salt - indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest() - elif item == "projects": - indata["_id"] = indata["name"] - else: - if not indata.get("_id"): - indata["_id"] = str(uuid4()) - if item in ("vnfds", "nsds", "nsrs", "vnfrs"): - if not indata["_admin"].get("projects_read"): - indata["_admin"]["projects_read"] = [session["project_id"]] - if not indata["_admin"].get("projects_write"): - indata["_admin"]["projects_write"] = [session["project_id"]] - if item in ("vnfds", "nsds"): - indata["_admin"]["onboardingState"] = "CREATED" - indata["_admin"]["operationalState"] = "DISABLED" - indata["_admin"]["usageSate"] = "NOT_IN_USE" - if item == "nsrs": - indata["_admin"]["nsState"] = "NOT_INSTANTIATED" - if item in ("vim_accounts", "sdns"): - indata["_admin"]["operationalState"] = "PROCESSING" - - def upload_content(self, session, item, _id, indata, kwargs, headers): - """ - Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract) - :param session: session - :param item: can be nsds or vnfds - :param _id : the nsd,vnfd is already created, this is the id - :param indata: http body request - :param kwargs: user query string to override parameters. NOT USED - :param headers: http request headers - :return: True package has is completely uploaded or False if partial content has been uplodaed. - Raise exception on error - """ - # Check that _id exists and it is valid - current_desc = self.get_item(session, item, _id) - - content_range_text = headers.get("Content-Range") - expected_md5 = headers.get("Content-File-MD5") - compressed = None - content_type = headers.get("Content-Type") - if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \ - "application/zip" in content_type: - compressed = "gzip" - filename = headers.get("Content-Filename") - if not filename: - filename = "package.tar.gz" if compressed else "package" - # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 - file_pkg = None - error_text = "" - try: - if content_range_text: - content_range = content_range_text.replace("-", " ").replace("/", " ").split() - if content_range[0] != "bytes": # TODO check x result - # manyfiles yes X -> zip - # no yes -> error - # onefile yes no -> zip - # X yes -> text - - if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"): - return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain" - elif storage.get('pkg-dir') and not accept_zip: - raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'" - "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE) - else: - if not storage.get('zipfile'): - # TODO generate zipfile if not present - raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in " - "future versions", http_code=HTTPStatus.NOT_ACCEPTABLE) - return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip" - - def get_item_list(self, session, item, filter={}): + def get_item(self, session, topic, _id): """ - Get a list of items + Get complete information on an item :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... - :param filter: filter of data to be applied - :return: The list, it can be empty if no one match the filter. + :param topic: it can be: users, projects, vnfds, nsds, + :param _id: server id of the item + :return: dictionary, raise exception if not found. """ - # TODO add admin to filter, validate rights - # TODO transform data for SOL005 URL requests. Transform filtering - # TODO implement "field-type" query string SOL005 - - self._add_read_filter(session, item, filter) - return self.db.get_list(item, filter) + if topic not in self.map_topic: + raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR) + return self.map_topic[topic].show(session, _id) - def get_item(self, session, item, _id): + def get_file(self, session, topic, _id, path=None, accept_header=None): """ - Get complete information on an items + Get descriptor package or artifact file content :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, + :param topic: it can be: users, projects, vnfds, nsds, :param _id: server id of the item - :return: dictionary, raise exception if not found. + :param path: artifact path or "$DESCRIPTOR" or None + :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain + :return: opened file plus Accept format or raises an exception """ - filter = {"_id": _id} - # TODO add admin to filter, validate rights - # TODO transform data for SOL005 URL requests - self._add_read_filter(session, item, filter) - return self.db.get_one(item, filter) + if topic not in self.map_topic: + raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR) + return self.map_topic[topic].get_file(session, _id, path, accept_header) - def del_item_list(self, session, item, filter={}): + def del_item_list(self, session, topic, _filter=None): """ Delete a list of items :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... - :param filter: filter of data to be applied - :return: The deleted list, it can be empty if no one match the filter. + :param topic: it can be: users, projects, vnfds, nsds, ... + :param _filter: filter of data to be applied + :return: The deleted list, it can be empty if no one match the _filter. """ - # TODO add admin to filter, validate rights - self._add_read_filter(session, item, filter) - return self.db.del_list(item, filter) + if topic not in self.map_topic: + raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR) + with self.write_lock: + return self.map_topic[topic].delete_list(session, _filter) - def del_item(self, session, item, _id, force=False): + def del_item(self, session, topic, _id): """ Delete item by its internal id :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... + :param topic: it can be: users, projects, vnfds, nsds, ... :param _id: server id of the item - :param force: indicates if deletion must be forced in case of conflict :return: dictionary with deleted item _id. It raises exception if not found. """ - # TODO add admin to filter, validate rights - # data = self.get_item(item, _id) - filter = {"_id": _id} - self._add_delete_filter(session, item, filter) - if item in ("vnfds", "nsds") and not force: - descriptor = self.get_item(session, item, _id) - descriptor_id = descriptor.get("id") - if descriptor_id: - self._check_dependencies_on_descriptor(session, item, descriptor_id, _id) + if topic not in self.map_topic: + raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR) + with self.write_lock: + return self.map_topic[topic].delete(session, _id) - if item == "nsrs": - nsr = self.db.get_one(item, filter) - if nsr["_admin"].get("nsState") == "INSTANTIATED" and not force: - raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. " - "Launch 'terminate' operation first; or force deletion".format(_id), - http_code=HTTPStatus.CONFLICT) - v = self.db.del_one(item, {"_id": _id}) - self.db.del_list("nslcmops", {"nsInstanceId": _id}) - self.db.del_list("vnfrs", {"nsr-id-ref": _id}) - self.msg.write("ns", "deleted", {"_id": _id}) - return v - if item in ("vim_accounts", "sdns") and not force: - self.db.set_one(item, {"_id": _id}, {"_admin.to_delete": True}) # TODO change status - if item == "vim_accounts": - self.msg.write("vim_account", "delete", {"_id": _id}) - elif item == "sdns": - self.msg.write("sdn", "delete", {"_id": _id}) - return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED - - v = self.db.del_one(item, filter) - if item in ("vnfds", "nsds"): - self.fs.file_delete(_id, ignore_non_exist=True) - if item in ("vim_accounts", "sdns", "vnfds", "nsds"): - self.msg.write(item[:-1], "deleted", {"_id": _id}) - return v + def edit_item(self, session, topic, _id, indata=None, kwargs=None): + """ + Update an existing entry at database + :param session: contains the used login username and working project + :param topic: it can be: users, projects, vnfds, nsds, ... + :param _id: identifier to be updated + :param indata: data to be inserted + :param kwargs: used to override the indata descriptor + :return: dictionary, raise exception if not found. + """ + if topic not in self.map_topic: + raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR) + with self.write_lock: + return self.map_topic[topic].edit(session, _id, indata, kwargs) - def prune(self): + def create_admin_project(self): """ - Prune database not needed content - :return: None + Creates a new project 'admin' into database if database is empty. Useful for initialization. + :return: _id identity of the inserted data, or None """ - return self.db.del_list("nsrs", {"_admin.to_delete": True}) - def create_admin(self): + projects = self.db.get_one("projects", fail_on_empty=False, fail_on_more=False) + if projects: + return None + project_desc = {"name": "admin"} + fake_session = {"project_id": "admin", "username": "admin", "admin": True, "force": True, "public": None} + rollback_list = [] + _id = self.map_topic["projects"].new(rollback_list, fake_session, project_desc) + return _id + + def create_admin_user(self): """ Creates a new user admin/admin into database if database is empty. Useful for initialization :return: _id identity of the inserted data, or None @@ -1097,100 +308,87 @@ class Engine(object): if users: return None # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED) - indata = {"username": "admin", "password": "admin", "projects": ["admin"]} - fake_session = {"project_id": "admin", "username": "admin"} - self._format_new_data(fake_session, "users", indata) - _id = self.db.create("users", indata) + user_desc = {"username": "admin", "password": "admin", "projects": ["admin"]} + fake_session = {"project_id": "admin", "username": "admin", "admin": True, "force": True, "public": None} + roolback_list = [] + _id = self.map_topic["users"].new(roolback_list, fake_session, user_desc) return _id - def init_db(self, target_version='1.0'): + def create_admin(self): """ - Init database if empty. If not empty it checks that database version is ok. - If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' - :return: None if ok, exception if error or if the version is different. + Creates new 'admin' user and project into database if database is empty. Useful for initialization. + :return: _id identity of the inserted data, or None """ - version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False) - if not version: - # create user admin - self.create_admin() + project_id = self.create_admin_project() + user_id = self.create_admin_user() + if not project_id and not user_id: + return None + else: + return {'project_id': project_id, 'user_id': user_id} + + def upgrade_db(self, current_version, target_version): + if target_version not in self.map_target_version_to_int.keys(): + raise EngineException("Wrong database version '{}'. Expected '{}'" + ". It cannot be up/down-grade".format(current_version, target_version), + http_code=HTTPStatus.INTERNAL_SERVER_ERROR) + + if current_version == target_version: + return + + target_version_int = self.map_target_version_to_int[target_version] + + if not current_version: # create database version + serial = urandom(32) version_data = { - "_id": '1.0', # version text - "version": 1000, # version number - "date": "2018-04-12", # version date - "description": "initial design", # changes in this version - 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR, + "_id": "version", # Always "version" + "version_int": 1000, # version number + "version": "1.0", # version text + "date": "2018-10-25", # version date + "description": "added serial", # changes in this version + 'status': "ENABLED", # ENABLED, DISABLED (migration in process), ERROR, + 'serial': b64encode(serial) + } + self.db.create("admin", version_data) + self.db.set_secret_key(serial) + current_version = "1.0" + + if current_version == "1.0" and target_version_int >= self.map_target_version_to_int["1.1"]: + self.db.del_list("roles_operations") + + version_data = { + "_id": "version", + "version_int": 1001, + "version": "1.1", + "date": "2019-05-24", + "description": "set new format for roles_operations" } - self.db.create("version", version_data) - elif version["_id"] != target_version: - # TODO implement migration process - raise EngineException("Wrong database version '{}'. Expected '{}'".format( - version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR) - elif version["status"] != 'ENABLED': - raise EngineException("Wrong database status '{}'".format( - version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR) - return - - def _edit_item(self, session, item, id, content, indata={}, kwargs=None, force=False): - if indata: - indata = self._remove_envelop(item, indata) - - # Override descriptor with query string kwargs - if kwargs: - try: - for k, v in kwargs.items(): - update_content = indata - kitem_old = None - klist = k.split(".") - for kitem in klist: - if kitem_old is not None: - update_content = update_content[kitem_old] - if isinstance(update_content, dict): - kitem_old = kitem - elif isinstance(update_content, list): - kitem_old = int(kitem) - else: - raise EngineException( - "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) - update_content[kitem_old] = v - except KeyError: - raise EngineException( - "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) - except ValueError: - raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( - k, kitem)) - except IndexError: - raise EngineException( - "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) - try: - validate_input(indata, item, new=False) - except ValidationError as e: - raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - _deep_update(content, indata) - self._validate_new_data(session, item, content, id, force) - # self._format_new_data(session, item, content) - self.db.replace(item, id, content) - if item in ("vim_accounts", "sdns"): - indata.pop("_admin", None) - indata["_id"] = id - if item == "vim_accounts": - self.msg.write("vim_account", "edit", indata) - elif item == "sdns": - self.msg.write("sdn", "edit", indata) - return id + self.db.set_one("admin", {"_id": "version"}, version_data) + current_version = "1.1" + # TODO add future migrations here - def edit_item(self, session, item, _id, indata={}, kwargs=None, force=False): + def init_db(self, target_version='1.0'): """ - Update an existing entry at database - :param session: contains the used login username and working project - :param item: it can be: users, projects, vnfds, nsds, ... - :param _id: identifier to be updated - :param indata: data to be inserted - :param kwargs: used to override the indata descriptor - :param force: If True avoid some dependence checks - :return: dictionary, raise exception if not found. + Init database if empty. If not empty it checks that database version and migrates if needed + If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' + :param target_version: check desired database version. Migrate to it if possible or raises exception + :return: None if ok, exception if error or if the version is different. """ - content = self.get_item(session, item, _id) - return self._edit_item(session, item, _id, content, indata, kwargs, force) + version_data = self.db.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True) + # check database status is ok + if version_data and version_data.get("status") != 'ENABLED': + raise EngineException("Wrong database status '{}'".format( + version_data["status"]), HTTPStatus.INTERNAL_SERVER_ERROR) + + # check version + db_version = None if not version_data else version_data.get("version") + if db_version != target_version: + self.upgrade_db(db_version, target_version) + + # create user admin if not exist + if not self.auth: + self.create_admin() + + return