X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fengine.py;fp=osm_nbi%2Fengine.py;h=c35617b7b832a7e47fa1afdf8399c3d341b75f0a;hp=0000000000000000000000000000000000000000;hb=c94c3df90aa64298a7935a80b221f80f3c043260;hpb=22ed16460edb54806e9b957be18cbafb2f63b54d diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py new file mode 100644 index 0000000..c35617b --- /dev/null +++ b/osm_nbi/engine.py @@ -0,0 +1,663 @@ +# -*- coding: utf-8 -*- + +import dbmongo +import dbmemory +import fslocal +import msglocal +import msgkafka +import tarfile +import yaml +import json +import logging +from random import choice as random_choice +from uuid import uuid4 +from hashlib import sha256, md5 +from dbbase import DbException +from fsbase import FsException +from msgbase import MsgException +from http import HTTPStatus +from time import time +from copy import deepcopy + +__author__ = "Alfonso Tierno " + + +class EngineException(Exception): + + def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): + self.http_code = http_code + Exception.__init__(self, message) + + +class Engine(object): + + def __init__(self): + self.tokens = {} + self.db = None + self.fs = None + self.msg = None + self.config = None + self.logger = logging.getLogger("nbi.engine") + + def start(self, config): + """ + Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + self.config = config + try: + if not self.db: + if config["database"]["driver"] == "mongo": + self.db = dbmongo.DbMongo() + self.db.db_connect(config["database"]) + elif config["database"]["driver"] == "memory": + self.db = dbmemory.DbMemory() + self.db.db_connect(config["database"]) + else: + raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format( + config["database"]["driver"])) + if not self.fs: + if config["storage"]["driver"] == "local": + self.fs = fslocal.FsLocal() + self.fs.fs_connect(config["storage"]) + else: + raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format( + config["storage"]["driver"])) + if not self.msg: + if config["message"]["driver"] == "local": + self.msg = msglocal.MsgLocal() + self.msg.connect(config["message"]) + elif config["message"]["driver"] == "kafka": + self.msg = msgkafka.MsgKafka() + self.msg.connect(config["message"]) + else: + raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format( + config["storage"]["driver"])) + except (DbException, FsException, MsgException) as e: + raise EngineException(str(e), http_code=e.http_code) + + def stop(self): + try: + if self.db: + self.db.db_disconnect() + if self.fs: + self.fs.fs_disconnect() + if self.fs: + self.fs.fs_disconnect() + except (DbException, FsException, MsgException) as e: + raise EngineException(str(e), http_code=e.http_code) + + def authorize(self, token): + try: + if not token: + raise EngineException("Needed a token or Authorization http header", + http_code=HTTPStatus.UNAUTHORIZED) + if token not in self.tokens: + raise EngineException("Invalid token or Authorization http header", + http_code=HTTPStatus.UNAUTHORIZED) + session = self.tokens[token] + now = time() + if session["expires"] < now: + del self.tokens[token] + raise EngineException("Expired Token or Authorization http header", + http_code=HTTPStatus.UNAUTHORIZED) + return session + except EngineException: + if self.config["global"].get("test.user_not_authorized"): + return {"id": "fake-token-id-for-test", + "project_id": self.config["global"].get("test.project_not_authorized", "admin"), + "username": self.config["global"]["test.user_not_authorized"]} + else: + raise + + def new_token(self, session, indata, remote): + now = time() + user_content = None + + # Try using username/password + if indata.get("username"): + user_rows = self.db.get_list("users", {"username": indata.get("username")}) + user_content = None + if user_rows: + user_content = user_rows[0] + salt = user_content["_admin"]["salt"] + shadow_password = sha256(indata.get("password", "").encode('utf-8') + salt.encode('utf-8')).hexdigest() + if shadow_password != user_content["password"]: + user_content = None + if not user_content: + raise EngineException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED) + elif session: + user_rows = self.db.get_list("users", {"username": session["username"]}) + if user_rows: + user_content = user_rows[0] + else: + raise EngineException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED) + else: + raise EngineException("Provide credentials: username/password or Authorization Bearer token", + http_code=HTTPStatus.UNAUTHORIZED) + + token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') + for _ in range(0, 32)) + if indata.get("project_id"): + project_id = indata.get("project_id") + if project_id not in user_content["projects"]: + raise EngineException("project {} not allowed for this user".format(project_id), + http_code=HTTPStatus.UNAUTHORIZED) + else: + project_id = user_content["projects"][0] + if project_id == "admin": + session_admin = True + else: + project = self.db.get_one("projects", {"_id": project_id}) + session_admin = project.get("admin", False) + new_session = {"issued_at": now, "expires": now+3600, + "_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"], + "remote_port": remote.port, "admin": session_admin} + if remote.name: + new_session["remote_host"] = remote.name + elif remote.ip: + new_session["remote_host"] = remote.ip + + self.tokens[token_id] = new_session + return deepcopy(new_session) + + def get_token_list(self, session): + token_list = [] + for token_id, token_value in self.tokens.items(): + if token_value["username"] == session["username"]: + token_list.append(deepcopy(token_value)) + return token_list + + def get_token(self, session, token_id): + token_value = self.tokens.get(token_id) + if not token_value: + raise EngineException("token not found", http_code=HTTPStatus.NOT_FOUND) + if token_value["username"] != session["username"] and not session["admin"]: + raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED) + return token_value + + def del_token(self, token_id): + try: + del self.tokens[token_id] + return "token '{}' deleted".format(token_id) + except KeyError: + raise EngineException("Token '{}' not found".format(token_id), http_code=HTTPStatus.NOT_FOUND) + + @staticmethod + def _remove_envelop(item, indata=None): + """ + Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the + vnfd or nsd content + :param item: can be vnfds, nsds, users, projects, + :param indata: Content to be inspected + :return: the useful part of indata + """ + clean_indata = indata + if not indata: + return {} + if item == "vnfds": + if clean_indata.get('vnfd:vnfd-catalog'): + clean_indata = clean_indata['vnfd:vnfd-catalog'] + elif clean_indata.get('vnfd-catalog'): + clean_indata = clean_indata['vnfd-catalog'] + if clean_indata.get('vnfd'): + if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1: + raise EngineException("'vnfd' must be a list only one element") + clean_indata = clean_indata['vnfd'][0] + elif item == "nsds": + if clean_indata.get('nsd:nsd-catalog'): + clean_indata = clean_indata['nsd:nsd-catalog'] + elif clean_indata.get('nsd-catalog'): + clean_indata = clean_indata['nsd-catalog'] + if clean_indata.get('nsd'): + if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: + raise EngineException("'nsd' must be a list only one element") + clean_indata = clean_indata['nsd'][0] + return clean_indata + + def _validate_new_data(self, session, item, indata): + if item == "users": + if not indata.get("username"): + raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY) + if not indata.get("password"): + raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY) + if not indata.get("projects"): + raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY) + # check username not exist + if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False): + raise EngineException("username '{}' exist".format(indata["username"]), HTTPStatus.CONFLICT) + elif item == "projects": + if not indata.get("name"): + raise EngineException("missing 'name'") + # check name not exist + if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): + raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT) + elif item == "vnfds" or item == "nsds": + filter = {"id": indata["id"]} + # TODO add admin to filter, validate rights + self._add_read_filter(session, item, filter) + if self.db.get_one(item, filter, fail_on_empty=False): + raise EngineException("{} with id '{}' already exist for this tenant".format(item[:-1], indata["id"]), + HTTPStatus.CONFLICT) + + # TODO validate with pyangbind + elif item == "nsrs": + pass + + def _format_new_data(self, session, item, indata, admin=None): + now = time() + if not "_admin" in indata: + indata["_admin"] = {} + indata["_admin"]["created"] = now + indata["_admin"]["modified"] = now + if item == "users": + _id = indata["username"] + salt = uuid4().hex + indata["_admin"]["salt"] = salt + indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest() + elif item == "projects": + _id = indata["name"] + else: + _id = None + storage = None + if admin: + _id = admin.get("_id") + storage = admin.get("storage") + if not _id: + _id = str(uuid4()) + if item == "vnfds" or item == "nsds": + if not indata["_admin"].get("projects_read"): + indata["_admin"]["projects_read"] = [session["project_id"]] + if not indata["_admin"].get("projects_write"): + indata["_admin"]["projects_write"] = [session["project_id"]] + if storage: + indata["_admin"]["storage"] = storage + indata["_id"] = _id + + def _new_item_partial(self, session, item, indata, headers): + """ + Used for recieve content by chunks (with a transaction_id header and/or gzip file. It will store and extract + :param session: session + :param item: + :param indata: http body request + :param headers: http request headers + :return: a dict with:: + _id: + storage: : where it is saving + desc: : descriptor: Only present when all the content is received, extracted and read the descriptor + """ + content_range_text = headers.get("Content-Range") + transaction_id = headers.get("Transaction-Id") + filename = headers.get("Content-Filename", "pkg") + # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 + expected_md5 = headers.get("Content-File-MD5") + compressed = None + if "application/gzip" in headers.get("Content-Type") or "application/x-gzip" in headers.get("Content-Type") or \ + "application/zip" in headers.get("Content-Type"): + compressed = "gzip" + file_pkg = None + error_text = "" + try: + if content_range_text: + content_range = content_range_text.replace("-", " ").replace("/", " ").split() + if content_range[0] != "bytes": # TODO check x