X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fengine.py;h=cb274145e62eade1f40b8713c83bd158c16072b5;hp=c35617b7b832a7e47fa1afdf8399c3d341b75f0a;hb=5758955b7b394517ff5caf5506a4400cdc5aa372;hpb=c94c3df90aa64298a7935a80b221f80f3c043260;ds=sidebyside diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index c35617b..cb27414 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -1,43 +1,113 @@ # -*- coding: utf-8 -*- -import dbmongo -import dbmemory -import fslocal -import msglocal -import msgkafka -import tarfile -import yaml -import json -import logging -from random import choice as random_choice -from uuid import uuid4 -from hashlib import sha256, md5 -from dbbase import DbException -from fsbase import FsException -from msgbase import MsgException -from http import HTTPStatus -from time import time -from copy import deepcopy +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. -__author__ = "Alfonso Tierno " +import logging +# import yaml +from osm_common import ( + dbmongo, + dbmemory, + fslocal, + fsmongo, + msglocal, + msgkafka, + version as common_version, +) +from osm_common.dbbase import DbException +from osm_common.fsbase import FsException +from osm_common.msgbase import MsgException +from http import HTTPStatus -class EngineException(Exception): +from osm_nbi.authconn_keystone import AuthconnKeystone +from osm_nbi.authconn_internal import AuthconnInternal +from osm_nbi.authconn_tacacs import AuthconnTacacs +from osm_nbi.base_topic import EngineException, versiontuple +from osm_nbi.admin_topics import VimAccountTopic, WimAccountTopic, SdnTopic +from osm_nbi.admin_topics import K8sClusterTopic, K8sRepoTopic, OsmRepoTopic +from osm_nbi.admin_topics import VcaTopic +from osm_nbi.admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth +from osm_nbi.descriptor_topics import ( + VnfdTopic, + NsdTopic, + PduTopic, + NstTopic, + VnfPkgOpTopic, +) +from osm_nbi.instance_topics import ( + NsrTopic, + VnfrTopic, + NsLcmOpTopic, + NsiTopic, + NsiLcmOpTopic, +) +from osm_nbi.pmjobs_topics import PmJobsTopic +from osm_nbi.subscription_topics import NslcmSubscriptionsTopic +from base64 import b64encode +from os import urandom # , path +from threading import Lock - def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): - self.http_code = http_code - Exception.__init__(self, message) +__author__ = "Alfonso Tierno " +min_common_version = "0.1.16" class Engine(object): - - def __init__(self): - self.tokens = {} + map_from_topic_to_class = { + "vnfds": VnfdTopic, + "nsds": NsdTopic, + "nsts": NstTopic, + "pdus": PduTopic, + "nsrs": NsrTopic, + "vnfrs": VnfrTopic, + "nslcmops": NsLcmOpTopic, + "vim_accounts": VimAccountTopic, + "wim_accounts": WimAccountTopic, + "sdns": SdnTopic, + "k8sclusters": K8sClusterTopic, + "vca": VcaTopic, + "k8srepos": K8sRepoTopic, + "osmrepos": OsmRepoTopic, + "users": UserTopicAuth, # Valid for both internal and keystone authentication backends + "projects": ProjectTopicAuth, # Valid for both internal and keystone authentication backends + "roles": RoleTopicAuth, # Valid for both internal and keystone authentication backends + "nsis": NsiTopic, + "nsilcmops": NsiLcmOpTopic, + "vnfpkgops": VnfPkgOpTopic, + "nslcm_subscriptions": NslcmSubscriptionsTopic, + # [NEW_TOPIC]: add an entry here + # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters + } + + map_target_version_to_int = { + "1.0": 1000, + "1.1": 1001, + "1.2": 1002, + # Add new versions here + } + + def __init__(self, authenticator): self.db = None self.fs = None self.msg = None + self.authconn = None self.config = None + # self.operations = None self.logger = logging.getLogger("nbi.engine") + self.map_topic = {} + self.write_lock = None + # self.token_cache = token_cache + self.authenticator = authenticator def start(self, config): """ @@ -46,6 +116,14 @@ class Engine(object): :return: None """ self.config = config + # check right version of common + if versiontuple(common_version) < versiontuple(min_common_version): + raise EngineException( + "Not compatible osm/common version '{}'. Needed '{}' or higher".format( + common_version, min_common_version + ) + ) + try: if not self.db: if config["database"]["driver"] == "mongo": @@ -55,15 +133,24 @@ class Engine(object): self.db = dbmemory.DbMemory() self.db.db_connect(config["database"]) else: - raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format( - config["database"]["driver"])) + raise EngineException( + "Invalid configuration param '{}' at '[database]':'driver'".format( + config["database"]["driver"] + ) + ) if not self.fs: if config["storage"]["driver"] == "local": self.fs = fslocal.FsLocal() self.fs.fs_connect(config["storage"]) + elif config["storage"]["driver"] == "mongo": + self.fs = fsmongo.FsMongo() + self.fs.fs_connect(config["storage"]) else: - raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format( - config["storage"]["driver"])) + raise EngineException( + "Invalid configuration param '{}' at '[storage]':'driver'".format( + config["storage"]["driver"] + ) + ) if not self.msg: if config["message"]["driver"] == "local": self.msg = msglocal.MsgLocal() @@ -72,8 +159,69 @@ class Engine(object): self.msg = msgkafka.MsgKafka() self.msg.connect(config["message"]) else: - raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format( - config["storage"]["driver"])) + raise EngineException( + "Invalid configuration param '{}' at '[message]':'driver'".format( + config["message"]["driver"] + ) + ) + if not self.authconn: + if config["authentication"]["backend"] == "keystone": + self.authconn = AuthconnKeystone( + config["authentication"], + self.db, + self.authenticator.role_permissions, + ) + elif config["authentication"]["backend"] == "tacacs": + self.authconn = AuthconnTacacs( + config["authentication"], + self.db, + self.authenticator.role_permissions, + ) + else: + self.authconn = AuthconnInternal( + config["authentication"], + self.db, + self.authenticator.role_permissions, + ) + # if not self.operations: + # if "resources_to_operations" in config["rbac"]: + # resources_to_operations_file = config["rbac"]["resources_to_operations"] + # else: + # possible_paths = ( + # __file__[:__file__.rfind("engine.py")] + "resources_to_operations.yml", + # "./resources_to_operations.yml" + # ) + # for config_file in possible_paths: + # if path.isfile(config_file): + # resources_to_operations_file = config_file + # break + # if not resources_to_operations_file: + # raise EngineException("Invalid permission configuration:" + # "resources_to_operations file missing") + # + # with open(resources_to_operations_file, 'r') as f: + # resources_to_operations = yaml.load(f, Loader=yaml.Loader) + # + # self.operations = [] + # + # for _, value in resources_to_operations["resources_to_operations"].items(): + # if value not in self.operations: + # self.operations += [value] + + self.write_lock = Lock() + # create one class per topic + for topic, topic_class in self.map_from_topic_to_class.items(): + # if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth): + # self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth) + self.map_topic[topic] = topic_class( + self.db, self.fs, self.msg, self.authconn + ) + + self.map_topic["pm_jobs"] = PmJobsTopic( + self.db, + config["prometheus"].get("host"), + config["prometheus"].get("port"), + ) except (DbException, FsException, MsgException) as e: raise EngineException(str(e), http_code=e.http_code) @@ -83,581 +231,216 @@ class Engine(object): self.db.db_disconnect() if self.fs: self.fs.fs_disconnect() - if self.fs: - self.fs.fs_disconnect() + if self.msg: + self.msg.disconnect() + self.write_lock = None except (DbException, FsException, MsgException) as e: raise EngineException(str(e), http_code=e.http_code) - def authorize(self, token): - try: - if not token: - raise EngineException("Needed a token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - if token not in self.tokens: - raise EngineException("Invalid token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - session = self.tokens[token] - now = time() - if session["expires"] < now: - del self.tokens[token] - raise EngineException("Expired Token or Authorization http header", - http_code=HTTPStatus.UNAUTHORIZED) - return session - except EngineException: - if self.config["global"].get("test.user_not_authorized"): - return {"id": "fake-token-id-for-test", - "project_id": self.config["global"].get("test.project_not_authorized", "admin"), - "username": self.config["global"]["test.user_not_authorized"]} - else: - raise - - def new_token(self, session, indata, remote): - now = time() - user_content = None - - # Try using username/password - if indata.get("username"): - user_rows = self.db.get_list("users", {"username": indata.get("username")}) - user_content = None - if user_rows: - user_content = user_rows[0] - salt = user_content["_admin"]["salt"] - shadow_password = sha256(indata.get("password", "").encode('utf-8') + salt.encode('utf-8')).hexdigest() - if shadow_password != user_content["password"]: - user_content = None - if not user_content: - raise EngineException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED) - elif session: - user_rows = self.db.get_list("users", {"username": session["username"]}) - if user_rows: - user_content = user_rows[0] - else: - raise EngineException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED) - else: - raise EngineException("Provide credentials: username/password or Authorization Bearer token", - http_code=HTTPStatus.UNAUTHORIZED) - - token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') - for _ in range(0, 32)) - if indata.get("project_id"): - project_id = indata.get("project_id") - if project_id not in user_content["projects"]: - raise EngineException("project {} not allowed for this user".format(project_id), - http_code=HTTPStatus.UNAUTHORIZED) - else: - project_id = user_content["projects"][0] - if project_id == "admin": - session_admin = True - else: - project = self.db.get_one("projects", {"_id": project_id}) - session_admin = project.get("admin", False) - new_session = {"issued_at": now, "expires": now+3600, - "_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"], - "remote_port": remote.port, "admin": session_admin} - if remote.name: - new_session["remote_host"] = remote.name - elif remote.ip: - new_session["remote_host"] = remote.ip - - self.tokens[token_id] = new_session - return deepcopy(new_session) - - def get_token_list(self, session): - token_list = [] - for token_id, token_value in self.tokens.items(): - if token_value["username"] == session["username"]: - token_list.append(deepcopy(token_value)) - return token_list - - def get_token(self, session, token_id): - token_value = self.tokens.get(token_id) - if not token_value: - raise EngineException("token not found", http_code=HTTPStatus.NOT_FOUND) - if token_value["username"] != session["username"] and not session["admin"]: - raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED) - return token_value - - def del_token(self, token_id): - try: - del self.tokens[token_id] - return "token '{}' deleted".format(token_id) - except KeyError: - raise EngineException("Token '{}' not found".format(token_id), http_code=HTTPStatus.NOT_FOUND) - - @staticmethod - def _remove_envelop(item, indata=None): - """ - Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the - vnfd or nsd content - :param item: can be vnfds, nsds, users, projects, - :param indata: Content to be inspected - :return: the useful part of indata + def new_item( + self, rollback, session, topic, indata=None, kwargs=None, headers=None + ): """ - clean_indata = indata - if not indata: - return {} - if item == "vnfds": - if clean_indata.get('vnfd:vnfd-catalog'): - clean_indata = clean_indata['vnfd:vnfd-catalog'] - elif clean_indata.get('vnfd-catalog'): - clean_indata = clean_indata['vnfd-catalog'] - if clean_indata.get('vnfd'): - if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1: - raise EngineException("'vnfd' must be a list only one element") - clean_indata = clean_indata['vnfd'][0] - elif item == "nsds": - if clean_indata.get('nsd:nsd-catalog'): - clean_indata = clean_indata['nsd:nsd-catalog'] - elif clean_indata.get('nsd-catalog'): - clean_indata = clean_indata['nsd-catalog'] - if clean_indata.get('nsd'): - if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: - raise EngineException("'nsd' must be a list only one element") - clean_indata = clean_indata['nsd'][0] - return clean_indata - - def _validate_new_data(self, session, item, indata): - if item == "users": - if not indata.get("username"): - raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY) - if not indata.get("password"): - raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY) - if not indata.get("projects"): - raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY) - # check username not exist - if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("username '{}' exist".format(indata["username"]), HTTPStatus.CONFLICT) - elif item == "projects": - if not indata.get("name"): - raise EngineException("missing 'name'") - # check name not exist - if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): - raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT) - elif item == "vnfds" or item == "nsds": - filter = {"id": indata["id"]} - # TODO add admin to filter, validate rights - self._add_read_filter(session, item, filter) - if self.db.get_one(item, filter, fail_on_empty=False): - raise EngineException("{} with id '{}' already exist for this tenant".format(item[:-1], indata["id"]), - HTTPStatus.CONFLICT) - - # TODO validate with pyangbind - elif item == "nsrs": - pass - - def _format_new_data(self, session, item, indata, admin=None): - now = time() - if not "_admin" in indata: - indata["_admin"] = {} - indata["_admin"]["created"] = now - indata["_admin"]["modified"] = now - if item == "users": - _id = indata["username"] - salt = uuid4().hex - indata["_admin"]["salt"] = salt - indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest() - elif item == "projects": - _id = indata["name"] - else: - _id = None - storage = None - if admin: - _id = admin.get("_id") - storage = admin.get("storage") - if not _id: - _id = str(uuid4()) - if item == "vnfds" or item == "nsds": - if not indata["_admin"].get("projects_read"): - indata["_admin"]["projects_read"] = [session["project_id"]] - if not indata["_admin"].get("projects_write"): - indata["_admin"]["projects_write"] = [session["project_id"]] - if storage: - indata["_admin"]["storage"] = storage - indata["_id"] = _id - - def _new_item_partial(self, session, item, indata, headers): - """ - Used for recieve content by chunks (with a transaction_id header and/or gzip file. It will store and extract - :param session: session - :param item: - :param indata: http body request - :param headers: http request headers - :return: a dict with:: - _id: - storage: : where it is saving - desc: : descriptor: Only present when all the content is received, extracted and read the descriptor - """ - content_range_text = headers.get("Content-Range") - transaction_id = headers.get("Transaction-Id") - filename = headers.get("Content-Filename", "pkg") - # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 - expected_md5 = headers.get("Content-File-MD5") - compressed = None - if "application/gzip" in headers.get("Content-Type") or "application/x-gzip" in headers.get("Content-Type") or \ - "application/zip" in headers.get("Content-Type"): - compressed = "gzip" - file_pkg = None - error_text = "" - try: - if content_range_text: - content_range = content_range_text.replace("-", " ").replace("/", " ").split() - if content_range[0] != "bytes": # TODO check x= self.map_target_version_to_int["1.2"] + ): + if self.config["authentication"]["backend"] == "internal": + self.db.del_list("roles") + + version_data = { + "_id": "version", + "version_int": 1002, + "version": "1.2", + "date": "2019-06-11", + "description": "set new format for roles_operations", + } + + self.db.set_one("admin", {"_id": "version"}, version_data) + current_version = "1.2" + # TODO add future migrations here + + def init_db(self, target_version="1.0"): + """ + Init database if empty. If not empty it checks that database version and migrates if needed + If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' + :param target_version: check desired database version. Migrate to it if possible or raises exception + :return: None if ok, exception if error or if the version is different. """ - content = self.get_item(session, item, id) - if indata: - indata = self._remove_envelop(item, indata) - # TODO update content with with a deep-update - - # Override descriptor with query string kwargs - if kwargs: - try: - for k, v in kwargs.items(): - update_content = content - kitem_old = None - klist = k.split(".") - for kitem in klist: - if kitem_old is not None: - update_content = update_content[kitem_old] - if isinstance(update_content, dict): - kitem_old = kitem - elif isinstance(update_content, list): - kitem_old = int(kitem) - else: - raise EngineException( - "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) - update_content[kitem_old] = v - except KeyError: - raise EngineException( - "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) - except ValueError: - raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( - k, kitem)) - except IndexError: - raise EngineException( - "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) - - self._validate_new_data(session, item, content) - # self._format_new_data(session, item, content) - self.db.replace(item, id, content) - return id - - + version_data = self.db.get_one( + "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True + ) + # check database status is ok + if version_data and version_data.get("status") != "ENABLED": + raise EngineException( + "Wrong database status '{}'".format(version_data["status"]), + HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + # check version + db_version = None if not version_data else version_data.get("version") + if db_version != target_version: + self.upgrade_db(db_version, target_version) + + return