From: tierno Date: Fri, 24 Apr 2020 14:02:51 +0000 (+0000) Subject: Feature 7184 New Generation RO X-Git-Tag: v8.0.0rc2~3 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F14%2F9214%2F2;p=osm%2FRO.git Feature 7184 New Generation RO Generates the package python3-osm-ng-ro for a new RO server One or other server is controlled by env OSMRO_NG Change-Id: I1b563006eeb008d05b37d5116f9741dc4f12a9ba Signed-off-by: tierno --- diff --git a/NG-RO/MANIFEST.in b/NG-RO/MANIFEST.in new file mode 100644 index 00000000..b774a75e --- /dev/null +++ b/NG-RO/MANIFEST.in @@ -0,0 +1,17 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +## + +recursive-include osm_ng_ro *.py *.sh *.cfg *.yml +recursive-include osm_ng_ro/html_public * diff --git a/NG-RO/Makefile b/NG-RO/Makefile new file mode 100644 index 00000000..ee09e5cd --- /dev/null +++ b/NG-RO/Makefile @@ -0,0 +1,24 @@ +# Copyright 2020 Telefonica S.A. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +all: clean package + +clean: + rm -rf dist deb_dist osm_ng_ro-*.tar.gz osm_ng_ro.egg-info .eggs .temp-tox + +package: + python3 setup.py --command-packages=stdeb.command sdist_dsc + cp debian/python3-osm-ng-ro.postinst deb_dist/osm-ng-ro*/debian + cd deb_dist/osm-ng-ro*/ && dpkg-buildpackage -rfakeroot -uc -us diff --git a/NG-RO/debian/python3-osm-ng-ro.postinst b/NG-RO/debian/python3-osm-ng-ro.postinst new file mode 100755 index 00000000..09b59d2e --- /dev/null +++ b/NG-RO/debian/python3-osm-ng-ro.postinst @@ -0,0 +1,25 @@ +#!/bin/bash + +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: OSM_TECH@list.etsi.org +## + +echo "POST INSTALL OSM-RO-NG" +echo "Installing python dependencies via pip..." + +# python3 -m pip install -U pip +python3 -m pip install cherrypy==18.1.2 + diff --git a/NG-RO/osm_ng_ro/__init__.py b/NG-RO/osm_ng_ro/__init__.py new file mode 100644 index 00000000..d2ac4c4b --- /dev/null +++ b/NG-RO/osm_ng_ro/__init__.py @@ -0,0 +1,23 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +## + +version = '8.0.1.post0' +version_date = '2020-06-29' + +# Obtain installed package version. Ignore if error, e.g. pkg_resources not installed +try: + from pkg_resources import get_distribution + version = get_distribution("osm_ng_ro").version +except Exception: + pass diff --git a/NG-RO/osm_ng_ro/html_out.py b/NG-RO/osm_ng_ro/html_out.py new file mode 100644 index 00000000..40594006 --- /dev/null +++ b/NG-RO/osm_ng_ro/html_out.py @@ -0,0 +1,182 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Contains html text in variables to make and html response +""" + +import yaml +from http import HTTPStatus +from html import escape as html_escape + +__author__ = "Alfonso Tierno " + +html_start = """ + + + + +Welcome to OSM + + + +""" + +html_body = """ +

{item}

+""" + +html_end = """ + + +""" + +html_body_error = "

Error
{}

" + + +html_auth2 = """ + + + + + OSM Login + + +
+
+

+
+
+
+

{error}

+
+
+

Sign in to OSM

+
+ + + + +
Username
Password
+
+
+ +
+
+ + + +""" + + +html_nslcmop_body = """ +nslcm operations +VNFRS +
+

+ +

+
+""" + +html_nsilcmop_body = """ +nsilcm operations +
+

+ +

+
+""" + + +def format(data, request, response, toke_info): + """ + Format a nice html response, depending on the data + :param data: + :param request: cherrypy request + :param response: cherrypy response + :return: string with teh html response + """ + response.headers["Content-Type"] = 'text/html' + if response.status == HTTPStatus.UNAUTHORIZED.value: + if response.headers.get("WWW-Authenticate") and request.config.get("auth.allow_basic_authentication"): + response.headers["WWW-Authenticate"] = "Basic" + response.headers["WWW-Authenticate"][6:] + return + else: + return html_auth2.format(error=data) + if request.path_info in ("/version", "/system"): + return "
" + yaml.safe_dump(data, explicit_start=False, indent=4, default_flow_style=False) + "
" + body = html_body.format(item=request.path_info) + if response.status and response.status > 202: + body += html_body_error.format(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)) + elif isinstance(data, (list, tuple)): + # if request.path_info == "/ns/v1/deploy": + # body += html_upload_body.format(request.path_info + "_content", "VNFD") + # elif request.path_info == "/nsd/v1/ns_descriptors": + # body += html_upload_body.format(request.path_info + "_content", "NSD") + # elif request.path_info == "/nst/v1/nst_templates": + # body += html_upload_body.format(request.path_info + "_content", "NSTD") + for k in data: + if isinstance(k, dict): + data_id = k.pop("_id", None) + elif isinstance(k, str): + data_id = k + if request.path_info == "/ns/v1/deploy": + body += '

{id}: {t}

' \ + .format(url=request.path_info, id=data_id, t=html_escape(str(k))) + else: + body += '

{id}: {t}

'.format(url=request.path_info, id=data_id, + t=html_escape(str(k))) + elif isinstance(data, dict): + if "Location" in response.headers: + body += ' show '.format(response.headers["Location"]) + else: + body += ' '\ + .format(request.path_info[:request.path_info.rfind("/")]) + if request.path_info.startswith("/nslcm/v1/ns_instances_content/") or \ + request.path_info.startswith("/nslcm/v1/ns_instances/"): + _id = request.path_info[request.path_info.rfind("/")+1:] + body += html_nslcmop_body.format(id=_id) + elif request.path_info.startswith("/nsilcm/v1/netslice_instances_content/") or \ + request.path_info.startswith("/nsilcm/v1/netslice_instances/"): + _id = request.path_info[request.path_info.rfind("/")+1:] + body += html_nsilcmop_body.format(id=_id) + body += "
" + html_escape(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)) + \
+                "
" + elif data is None: + if request.method == "DELETE" or "METHOD=DELETE" in request.query_string: + body += "
 deleted 
" + else: + body = html_escape(str(data)) + user_text = " " + if toke_info: + if toke_info.get("username"): + user_text += "user: {}".format(toke_info.get("username")) + if toke_info.get("project_id"): + user_text += ", project: {}".format(toke_info.get("project_name")) + return html_start.format(user_text) + body + html_end + # yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False) + # tags=False, + # encoding='utf-8', allow_unicode=True) diff --git a/NG-RO/osm_ng_ro/html_public/OSM-logo.png b/NG-RO/osm_ng_ro/html_public/OSM-logo.png new file mode 100644 index 00000000..7de447ca Binary files /dev/null and b/NG-RO/osm_ng_ro/html_public/OSM-logo.png differ diff --git a/NG-RO/osm_ng_ro/html_public/delete.png b/NG-RO/osm_ng_ro/html_public/delete.png new file mode 100644 index 00000000..d8fc8e98 Binary files /dev/null and b/NG-RO/osm_ng_ro/html_public/delete.png differ diff --git a/NG-RO/osm_ng_ro/html_public/login.js b/NG-RO/osm_ng_ro/html_public/login.js new file mode 100755 index 00000000..20c4c976 --- /dev/null +++ b/NG-RO/osm_ng_ro/html_public/login.js @@ -0,0 +1,30 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + var login_form = document.getElementById('login_form'); + var f_user = document.getElementById('f_user'); + var f_pass = document.getElementById('f_pass'); + f_user.onkeydown = function(e) { + if (e.keyCode == 13) { + f_pass.focus(); + return false; + } + } + f_pass.onkeydown = function(e) { + if (e.keyCode == 13) { + login_form.submit(); + return false; + } + } + f_user.focus(); + diff --git a/NG-RO/osm_ng_ro/html_public/style.css b/NG-RO/osm_ng_ro/html_public/style.css new file mode 100644 index 00000000..da3c2960 --- /dev/null +++ b/NG-RO/osm_ng_ro/html_public/style.css @@ -0,0 +1,39 @@ +/* +Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +Licensed under the Apache License, Version 2.0 (the "License"); you may +not use this file except in compliance with the License. You may obtain +a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. +*/ + +#osm_header{ + display: block; + position: relative; + top: 0px; + left: 160px; + margin-bottom: -60px; + width: 140px; + padding-left: 17px; + } +#osm_topmenu { + background: none; + position: relative; + top: 0px; + left: 10px; + margin-right: 10px; +} +#osm_error_message { + padding: 5px; + margin: 2em; + width: 200em; + color: red; + font-weight: bold; +} + diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py new file mode 100644 index 00000000..eda6c487 --- /dev/null +++ b/NG-RO/osm_ng_ro/ns.py @@ -0,0 +1,806 @@ +# -*- coding: utf-8 -*- + +## +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +import logging +# import yaml +from traceback import format_exc as traceback_format_exc +from osm_ng_ro.ns_thread import NsWorker +from osm_ng_ro.validation import validate_input, deploy_schema +from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version +from osm_common.dbbase import DbException +from osm_common.fsbase import FsException +from osm_common.msgbase import MsgException +from http import HTTPStatus +from uuid import uuid4 +from threading import Lock +from random import choice as random_choice +from time import time +from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError +from cryptography.hazmat.primitives import serialization as crypto_serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.backends import default_backend as crypto_default_backend + +__author__ = "Alfonso Tierno " +min_common_version = "0.1.16" + + +class NsException(Exception): + + def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): + self.http_code = http_code + super(Exception, self).__init__(message) + + +def get_process_id(): + """ + Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it + will provide a random one + :return: Obtained ID + """ + # Try getting docker id. If fails, get pid + try: + with open("/proc/self/cgroup", "r") as f: + text_id_ = f.readline() + _, _, text_id = text_id_.rpartition("/") + text_id = text_id.replace("\n", "")[:12] + if text_id: + return text_id + except Exception: + pass + # Return a random id + return "".join(random_choice("0123456789abcdef") for _ in range(12)) + + +def versiontuple(v): + """utility for compare dot separate versions. Fills with zeros to proper number comparison""" + filled = [] + for point in v.split("."): + filled.append(point.zfill(8)) + return tuple(filled) + + +class Ns(object): + + def __init__(self): + self.db = None + self.fs = None + self.msg = None + self.config = None + # self.operations = None + self.logger = logging.getLogger("ro.ns") + self.map_topic = {} + self.write_lock = None + self.assignment = {} + self.next_worker = 0 + self.plugins = {} + self.workers = [] + + def init_db(self, target_version): + pass + + def start(self, config): + """ + Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :param config: Configuration of db, storage, etc + :return: None + """ + self.config = config + self.config["process_id"] = get_process_id() # used for HA identity + # check right version of common + if versiontuple(common_version) < versiontuple(min_common_version): + raise NsException("Not compatible osm/common version '{}'. Needed '{}' or higher".format( + common_version, min_common_version)) + + try: + if not self.db: + if config["database"]["driver"] == "mongo": + self.db = dbmongo.DbMongo() + self.db.db_connect(config["database"]) + elif config["database"]["driver"] == "memory": + self.db = dbmemory.DbMemory() + self.db.db_connect(config["database"]) + else: + raise NsException("Invalid configuration param '{}' at '[database]':'driver'".format( + config["database"]["driver"])) + if not self.fs: + if config["storage"]["driver"] == "local": + self.fs = fslocal.FsLocal() + self.fs.fs_connect(config["storage"]) + elif config["storage"]["driver"] == "mongo": + self.fs = fsmongo.FsMongo() + self.fs.fs_connect(config["storage"]) + else: + raise NsException("Invalid configuration param '{}' at '[storage]':'driver'".format( + config["storage"]["driver"])) + if not self.msg: + if config["message"]["driver"] == "local": + self.msg = msglocal.MsgLocal() + self.msg.connect(config["message"]) + elif config["message"]["driver"] == "kafka": + self.msg = msgkafka.MsgKafka() + self.msg.connect(config["message"]) + else: + raise NsException("Invalid configuration param '{}' at '[message]':'driver'".format( + config["message"]["driver"])) + + # TODO load workers to deal with exising database tasks + + self.write_lock = Lock() + except (DbException, FsException, MsgException) as e: + raise NsException(str(e), http_code=e.http_code) + + def stop(self): + try: + if self.db: + self.db.db_disconnect() + if self.fs: + self.fs.fs_disconnect() + if self.msg: + self.msg.disconnect() + self.write_lock = None + except (DbException, FsException, MsgException) as e: + raise NsException(str(e), http_code=e.http_code) + for worker in self.workers: + worker.insert_task(("terminate",)) + + def _create_worker(self, vim_account_id): + # TODO make use of the limit self.config["global"]["server.ns_threads"] + worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None) + if worker_id is None: + worker_id = len(self.workers) + self.workers.append(NsWorker(worker_id, self.config, self.plugins, self.db)) + self.workers[worker_id].start() + self.workers[worker_id].insert_task(("load_vim", vim_account_id)) + return worker_id + + def _assign_vim(self, vim_account_id): + if vim_account_id not in self.assignment: + self.assignment[vim_account_id] = self._create_worker(vim_account_id) + + def _get_cloud_init(self, where): + """ + + :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' + :return: + """ + vnfd_id, _, other = where.partition(":") + _type, _, name = other.partition(":") + vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + if _type == "file": + base_folder = vnfd["_admin"]["storage"] + cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], name) + with self.fs.file_open(cloud_init_file, "r") as ci_file: + cloud_init_content = ci_file.read() + elif _type == "vdu": + cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"] + else: + raise NsException("Mismatch descriptor for cloud init: {}".format(where)) + return cloud_init_content + + def _parse_jinja2(self, cloud_init_content, params, context): + try: + env = Environment() + ast = env.parse(cloud_init_content) + mandatory_vars = meta.find_undeclared_variables(ast) + if mandatory_vars: + for var in mandatory_vars: + if not params or var not in params: + raise NsException( + "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters" + "inside the 'additionalParamsForVnf' block".format(var, context)) + template = Template(cloud_init_content) + return template.render(params or {}) + + except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e: + raise NsException("Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(context, e)) + + def _create_db_ro_nsrs(self, nsr_id, now): + try: + key = rsa.generate_private_key( + backend=crypto_default_backend(), + public_exponent=65537, + key_size=2048 + ) + private_key = key.private_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PrivateFormat.PKCS8, + crypto_serialization.NoEncryption()) + public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH + ) + private_key = private_key.decode('utf8') + public_key = public_key.decode('utf8') + except Exception as e: + raise NsException("Cannot create ssh-keys: {}".format(e)) + + schema_version = "1.1" + private_key_encrypted = self.db.encrypt(private_key, schema_version=schema_version, salt=nsr_id) + db_content = { + "_id": nsr_id, + "_admin": { + "created": now, + "modified": now, + "schema_version": schema_version + }, + "public_key": public_key, + "private_key": private_key_encrypted, + "actions": [], + } + self.db.create("ro_nsrs", db_content) + return db_content + + def deploy(self, session, indata, version, nsr_id, *args, **kwargs): + print("ns.deploy session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id)) + validate_input(indata, deploy_schema) + action_id = indata.get("action_id", str(uuid4())) + task_index = 0 + # get current deployment + db_nsr = None + # db_nslcmop = None + db_nsr_update = {} # update operation on nsrs + db_vnfrs_update = {} + # db_nslcmop_update = {} # update operation on nslcmops + db_vnfrs = {} # vnf's info indexed by _id + vdu2cloud_init = {} + step = '' + logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) + self.logger.debug(logging_text + "Enter") + try: + step = "Getting ns and vnfr record from db" + # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + db_ro_tasks = [] + db_new_tasks = [] + # read from db: vnf's of this ns + step = "Getting vnfrs from db" + db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + if not db_vnfrs_list: + raise NsException("Cannot obtain associated VNF for ns") + for vnfr in db_vnfrs_list: + db_vnfrs[vnfr["_id"]] = vnfr + db_vnfrs_update[vnfr["_id"]] = {} + now = time() + db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False) + if not db_ro_nsr: + db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now) + ro_nsr_public_key = db_ro_nsr["public_key"] + + # check that action_id is not in the list of actions. Suffixed with :index + if action_id in db_ro_nsr["actions"]: + index = 1 + while True: + new_action_id = "{}:{}".format(action_id, index) + if new_action_id not in db_ro_nsr["actions"]: + action_id = new_action_id + self.logger.debug(logging_text + "Changing action_id in use to {}".format(action_id)) + break + index += 1 + + def _create_task(item, action, target_record, target_record_id, extra_dict=None): + nonlocal task_index + nonlocal action_id + nonlocal nsr_id + + task = { + "action_id": action_id, + "nsr_id": nsr_id, + "task_id": "{}:{}".format(action_id, task_index), + "status": "SCHEDULED", + "action": action, + "item": item, + "target_record": target_record, + "target_record_id": target_record_id, + } + if extra_dict: + task.update(extra_dict) # params, find_params, depends_on + task_index += 1 + return task + + def _create_ro_task(vim_account_id, item, action, target_record, target_record_id, extra_dict=None): + nonlocal action_id + nonlocal task_index + nonlocal now + + _id = action_id + ":" + str(task_index) + db_ro_task = { + "_id": _id, + "locked_by": None, + "locked_at": 0.0, + "target_id": "vim:" + vim_account_id, + "vim_info": { + "created": False, + "created_items": None, + "vim_id": None, + "vim_name": None, + "vim_status": None, + "vim_details": None, + "refresh_at": None, + }, + "modified_at": now, + "created_at": now, + "to_check_at": now, + "tasks": [_create_task(item, action, target_record, target_record_id, extra_dict)], + } + return db_ro_task + + def _process_image_params(target_image, vim_info): + find_params = {} + if target_image.get("image"): + find_params["filter_dict"] = {"name": target_image.get("image")} + if target_image.get("vim_image_id"): + find_params["filter_dict"] = {"id": target_image.get("vim_image_id")} + if target_image.get("image_checksum"): + find_params["filter_dict"] = {"checksum": target_image.get("image_checksum")} + return {"find_params": find_params} + + def _process_flavor_params(target_flavor, vim_info): + + def _get_resource_allocation_params(quota_descriptor): + """ + read the quota_descriptor from vnfd and fetch the resource allocation properties from the + descriptor object + :param quota_descriptor: cpu/mem/vif/disk-io quota descriptor + :return: quota params for limit, reserve, shares from the descriptor object + """ + quota = {} + if quota_descriptor.get("limit"): + quota["limit"] = int(quota_descriptor["limit"]) + if quota_descriptor.get("reserve"): + quota["reserve"] = int(quota_descriptor["reserve"]) + if quota_descriptor.get("shares"): + quota["shares"] = int(quota_descriptor["shares"]) + return quota + + flavor_data = { + "disk": int(target_flavor["storage-gb"]), + # "ram": max(int(target_flavor["memory-mb"]) // 1024, 1), + # ^ TODO manage at vim_connectors MB instead of GB + "ram": int(target_flavor["memory-mb"]), + "vcpus": target_flavor["vcpu-count"], + } + if target_flavor.get("guest-epa"): + extended = {} + numa = {} + epa_vcpu_set = False + if target_flavor["guest-epa"].get("numa-node-policy"): + numa_node_policy = target_flavor["guest-epa"].get("numa-node-policy") + if numa_node_policy.get("node"): + numa_node = numa_node_policy["node"][0] + if numa_node.get("num-cores"): + numa["cores"] = numa_node["num-cores"] + epa_vcpu_set = True + if numa_node.get("paired-threads"): + if numa_node["paired-threads"].get("num-paired-threads"): + numa["paired-threads"] = int(numa_node["paired-threads"]["num-paired-threads"]) + epa_vcpu_set = True + if len(numa_node["paired-threads"].get("paired-thread-ids")): + numa["paired-threads-id"] = [] + for pair in numa_node["paired-threads"]["paired-thread-ids"]: + numa["paired-threads-id"].append( + (str(pair["thread-a"]), str(pair["thread-b"])) + ) + if numa_node.get("num-threads"): + numa["threads"] = int(numa_node["num-threads"]) + epa_vcpu_set = True + if numa_node.get("memory-mb"): + numa["memory"] = max(int(numa_node["memory-mb"] / 1024), 1) + if target_flavor["guest-epa"].get("mempage-size"): + extended["mempage-size"] = target_flavor["guest-epa"].get("mempage-size") + if target_flavor["guest-epa"].get("cpu-pinning-policy") and not epa_vcpu_set: + if target_flavor["guest-epa"]["cpu-pinning-policy"] == "DEDICATED": + if target_flavor["guest-epa"].get("cpu-thread-pinning-policy") and \ + target_flavor["guest-epa"]["cpu-thread-pinning-policy"] != "PREFER": + numa["cores"] = max(flavor_data["vcpus"], 1) + else: + numa["threads"] = max(flavor_data["vcpus"], 1) + epa_vcpu_set = True + if target_flavor["guest-epa"].get("cpu-quota") and not epa_vcpu_set: + cpuquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("cpu-quota")) + if cpuquota: + extended["cpu-quota"] = cpuquota + if target_flavor["guest-epa"].get("mem-quota"): + vduquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("mem-quota")) + if vduquota: + extended["mem-quota"] = vduquota + if target_flavor["guest-epa"].get("disk-io-quota"): + diskioquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("disk-io-quota")) + if diskioquota: + extended["disk-io-quota"] = diskioquota + if target_flavor["guest-epa"].get("vif-quota"): + vifquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("vif-quota")) + if vifquota: + extended["vif-quota"] = vifquota + if numa: + extended["numas"] = [numa] + if extended: + flavor_data["extended"] = extended + + extra_dict = {"find_params": {"flavor_data": flavor_data}} + flavor_data_name = flavor_data.copy() + flavor_data_name["name"] = target_flavor["name"] + extra_dict["params"] = {"flavor_data": flavor_data_name} + return extra_dict + + def _process_net_params(target_vld, vim_info): + nonlocal indata + extra_dict = {} + if vim_info.get("vim_network_name"): + extra_dict["find_params"] = {"filter_dict": {"name": vim_info.get("vim_network_name")}} + elif vim_info.get("vim_network_id"): + extra_dict["find_params"] = {"filter_dict": {"id": vim_info.get("vim_network_id")}} + elif target_vld.get("mgmt-network"): + extra_dict["find_params"] = {"mgmt": True, "name": target_vld["id"]} + else: + # create + extra_dict["params"] = { + "net_name": "{}-{}".format(indata["name"][:16], target_vld.get("name", target_vld["id"])[:16]), + "ip_profile": vim_info.get('ip_profile'), + "provider_network_profile": vim_info.get('provider_network'), + } + if not target_vld.get("underlay"): + extra_dict["params"]["net_type"] = "bridge" + else: + extra_dict["params"]["net_type"] = "ptp" if target_vld.get("type") == "ELINE" else "data" + return extra_dict + + def _process_vdu_params(target_vdu, vim_info): + nonlocal vnfr_id + nonlocal nsr_id + nonlocal indata + nonlocal vnfr + nonlocal vdu2cloud_init + vnf_preffix = "vnfrs:{}".format(vnfr_id) + ns_preffix = "nsrs:{}".format(nsr_id) + image_text = ns_preffix + ":image." + target_vdu["ns-image-id"] + flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"] + extra_dict = {"depends_on": [image_text, flavor_text]} + net_list = [] + for iface_index, interface in enumerate(target_vdu["interfaces"]): + if interface.get("ns-vld-id"): + net_text = ns_preffix + ":vld." + interface["ns-vld-id"] + else: + net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] + extra_dict["depends_on"].append(net_text) + net_item = { + "name": interface["name"], + "net_id": "TASK-" + net_text, + "vpci": interface.get("vpci"), + "type": "virtual", + # TODO mac_address: used for SR-IOV ifaces #TODO for other types + # TODO floating_ip: True/False (or it can be None) + } + if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): + net_item["use"] = "data" + net_item["model"] = interface["type"] + net_item["type"] = interface["type"] + elif interface.get("type") == "OM-MGMT" or interface.get("mgmt-interface") or \ + interface.get("mgmt-vnf"): + net_item["use"] = "mgmt" + else: # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"): + net_item["use"] = "bridge" + net_item["model"] = interface.get("type") + net_list.append(net_item) + if interface.get("mgmt-vnf"): + extra_dict["mgmt_vnf_interface"] = iface_index + elif interface.get("mgmt-interface"): + extra_dict["mgmt_vdu_interface"] = iface_index + + # cloud config + cloud_config = {} + if target_vdu.get("cloud-init"): + if target_vdu["cloud-init"] not in vdu2cloud_init: + vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init(target_vdu["cloud-init"]) + cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]] + cloud_config["user-data"] = self._parse_jinja2(cloud_content_, target_vdu.get("additionalParams"), + target_vdu["cloud-init"]) + if target_vdu.get("boot-data-drive"): + cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive") + ssh_keys = [] + if target_vdu.get("ssh-keys"): + ssh_keys += target_vdu.get("ssh-keys") + if target_vdu.get("ssh-access-required"): + ssh_keys.append(ro_nsr_public_key) + if ssh_keys: + cloud_config["key-pairs"] = ssh_keys + + extra_dict["params"] = { + "name": "{}-{}-{}-{}".format(indata["name"][:16], vnfr["member-vnf-index-ref"][:16], + target_vdu["vdu-name"][:32], target_vdu.get("count-index") or 0), + "description": target_vdu["vdu-name"], + "start": True, + "image_id": "TASK-" + image_text, + "flavor_id": "TASK-" + flavor_text, + "net_list": net_list, + "cloud_config": cloud_config or None, + "disk_list": None, # TODO + "availability_zone_index": None, # TODO + "availability_zone_list": None, # TODO + } + return extra_dict + + def _process_items(target_list, existing_list, db_record, db_update, db_path, item, process_params): + nonlocal db_ro_tasks + nonlocal db_new_tasks + nonlocal task_index + + # ensure all the target_list elements has an "id". If not assign the index + for target_index, tl in enumerate(target_list): + if tl and not tl.get("id"): + tl["id"] = str(target_index) + + # step 1 networks to be deleted/updated + for vld_index, existing_vld in enumerate(existing_list): + target_vld = next((vld for vld in target_list if vld["id"] == existing_vld["id"]), None) + for existing_vim_index, existing_vim_info in enumerate(existing_vld.get("vim_info", ())): + if not existing_vim_info: + continue + if target_vld: + target_viminfo = next((target_viminfo for target_viminfo in target_vld.get("vim_info", ()) + if existing_vim_info["vim_account_id"] == target_viminfo[ + "vim_account_id"]), None) + else: + target_viminfo = None + if not target_viminfo: + # must be deleted + self._assign_vim(existing_vim_info["vim_account_id"]) + db_new_tasks.append(_create_task( + item, "DELETE", + target_record="{}.{}.vim_info.{}".format(db_record, vld_index, existing_vim_index), + target_record_id="{}.{}".format(db_record, existing_vld["id"]))) + # TODO delete + # TODO check one by one the vims to be created/deleted + + # step 2 networks to be created + for target_vld in target_list: + vld_index = -1 + for vld_index, existing_vld in enumerate(existing_list): + if existing_vld["id"] == target_vld["id"]: + break + else: + vld_index += 1 + db_update[db_path + ".{}".format(vld_index)] = target_vld + existing_list.append(target_vld) + existing_vld = None + + for vim_index, vim_info in enumerate(target_vld["vim_info"]): + existing_viminfo = None + if existing_vld: + existing_viminfo = next( + (existing_viminfo for existing_viminfo in existing_vld.get("vim_info", ()) + if vim_info["vim_account_id"] == existing_viminfo["vim_account_id"]), None) + # TODO check if different. Delete and create??? + # TODO delete if not exist + if existing_viminfo: + continue + + extra_dict = process_params(target_vld, vim_info) + + self._assign_vim(vim_info["vim_account_id"]) + db_ro_tasks.append(_create_ro_task( + vim_info["vim_account_id"], item, "CREATE", + target_record="{}.{}.vim_info.{}".format(db_record, vld_index, vim_index), + target_record_id="{}.{}".format(db_record, target_vld["id"]), + extra_dict=extra_dict)) + + db_update[db_path + ".{}".format(vld_index)] = target_vld + + def _process_action(indata): + nonlocal db_ro_tasks + nonlocal db_new_tasks + nonlocal task_index + nonlocal db_vnfrs + nonlocal db_ro_nsr + + if indata["action"] == "inject_ssh_key": + key = indata.get("key") + user = indata.get("user") + password = indata.get("password") + for vnf in indata.get("vnf", ()): + if vnf.get("_id") not in db_vnfrs: + raise NsException("Invalid vnf={}".format(vnf["_id"])) + db_vnfr = db_vnfrs[vnf["_id"]] + for target_vdu in vnf.get("vdur", ()): + vdu_index, vdur = next((i_v for i_v in enumerate(db_vnfr["vdur"]) if + i_v[1]["id"] == target_vdu["id"]), (None, None)) + if not vdur: + raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"])) + vim_info = vdur["vim_info"][0] + self._assign_vim(vim_info["vim_account_id"]) + target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index) + extra_dict = { + "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])], + "params": { + "ip_address": vdur.gt("ip_address"), + "user": user, + "key": key, + "password": password, + "private_key": db_ro_nsr["private_key"], + "salt": db_ro_nsr["_id"], + "schema_version": db_ro_nsr["_admin"]["schema_version"] + } + } + db_ro_tasks.append(_create_ro_task(vim_info["vim_account_id"], "vdu", "EXEC", + target_record=target_record, + target_record_id=None, + extra_dict=extra_dict)) + + with self.write_lock: + if indata.get("action"): + _process_action(indata) + else: + # compute network differences + # NS.vld + step = "process NS VLDs" + _process_items(target_list=indata["ns"]["vld"] or [], existing_list=db_nsr.get("vld") or [], + db_record="nsrs:{}:vld".format(nsr_id), db_update=db_nsr_update, + db_path="vld", item="net", process_params=_process_net_params) + + step = "process NS images" + _process_items(target_list=indata["image"] or [], existing_list=db_nsr.get("image") or [], + db_record="nsrs:{}:image".format(nsr_id), + db_update=db_nsr_update, db_path="image", item="image", + process_params=_process_image_params) + + step = "process NS flavors" + _process_items(target_list=indata["flavor"] or [], existing_list=db_nsr.get("flavor") or [], + db_record="nsrs:{}:flavor".format(nsr_id), + db_update=db_nsr_update, db_path="flavor", item="flavor", + process_params=_process_flavor_params) + + # VNF.vld + for vnfr_id, vnfr in db_vnfrs.items(): + # vnfr_id need to be set as global variable for among others nested method _process_vdu_params + step = "process VNF={} VLDs".format(vnfr_id) + target_vnf = next((vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id), None) + target_list = target_vnf.get("vld") if target_vnf else None + _process_items(target_list=target_list or [], existing_list=vnfr.get("vld") or [], + db_record="vnfrs:{}:vld".format(vnfr_id), db_update=db_vnfrs_update[vnfr["_id"]], + db_path="vld", item="net", process_params=_process_net_params) + + target_list = target_vnf.get("vdur") if target_vnf else None + step = "process VNF={} VDUs".format(vnfr_id) + _process_items(target_list=target_list or [], existing_list=vnfr.get("vdur") or [], + db_record="vnfrs:{}:vdur".format(vnfr_id), + db_update=db_vnfrs_update[vnfr["_id"]], db_path="vdur", item="vdu", + process_params=_process_vdu_params) + + step = "Updating database, Creating ro_tasks" + if db_ro_tasks: + self.db.create_list("ro_tasks", db_ro_tasks) + step = "Updating database, Appending tasks to ro_tasks" + for task in db_new_tasks: + if not self.db.set_one("ro_tasks", q_filter={"tasks.target_record": task["target_record"]}, + update_dict={"to_check_at": now, "modified_at": now}, + push={"tasks": task}, fail_on_empty=False): + self.logger.error(logging_text + "Cannot find task for target_record={}". + format(task["target_record"])) + # TODO something else appart from logging? + step = "Updating database, nsrs" + if db_nsr_update: + self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update) + for vnfr_id, db_vnfr_update in db_vnfrs_update.items(): + if db_vnfr_update: + step = "Updating database, vnfrs={}".format(vnfr_id) + self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update) + + self.logger.debug(logging_text + "Exit") + return {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, action_id, True + + except Exception as e: + if isinstance(e, (DbException, NsException)): + self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) + else: + e = traceback_format_exc() + self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(step, e), exc_info=True) + raise NsException(e) + + def delete(self, session, indata, version, nsr_id, *args, **kwargs): + print("ns.delete session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id)) + # TODO del when ALL "tasks.nsr_id" are None of nsr_id + # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id}) + retries = 5 + for retry in range(retries): + with self.write_lock: + ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) + if not ro_tasks: + break + now = time() + conflict = False + for ro_task in ro_tasks: + db_update = {} + to_delete = True + for index, task in enumerate(ro_task["tasks"]): + if not task: + pass + elif task["nsr_id"] == nsr_id: + db_update["tasks.{}".format(index)] = None + else: + to_delete = False # used by other nsr, cannot be deleted + # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed + if to_delete: + if not self.db.del_one("ro_tasks", + q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]}, + fail_on_empty=False): + conflict = True + elif db_update: + db_update["modified_at"] = now + if not self.db.set_one("ro_tasks", + q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]}, + update_dict=db_update, + fail_on_empty=False): + conflict = True + if not conflict: + break + else: + raise NsException("Exceeded {} retries".format(retries)) + + return None, None, True + + def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs): + print("ns.status session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version, + nsr_id, action_id)) + task_list = [] + done = 0 + total = 0 + ro_tasks = self.db.get_list("ro_tasks", {"tasks.action_id": action_id}) + global_status = "DONE" + details = [] + for ro_task in ro_tasks: + for task in ro_task["tasks"]: + if task["action_id"] == action_id: + task_list.append(task) + total += 1 + if task["status"] == "FAILED": + global_status = "FAILED" + details.append(ro_task.get("vim_details", '')) + elif task["status"] in ("SCHEDULED", "BUILD"): + if global_status != "FAILED": + global_status = "BUILD" + else: + done += 1 + return_data = { + "status": global_status, + "details": ". ".join(details) if details else "progress {}/{}".format(done, total), + "nsr_id": nsr_id, + "action_id": action_id, + "tasks": task_list + } + return return_data, None, True + + def cancel(self, session, indata, version, nsr_id, action_id, *args, **kwargs): + print("ns.cancel session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version, + nsr_id, action_id)) + return None, None, True + + def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs): + nsrs = self.db.get_list("nsrs", {}) + return_data = [] + for ns in nsrs: + return_data.append({"_id": ns["_id"], "name": ns["name"]}) + return return_data, None, True + + def get_actions(self, session, indata, version, nsr_id, action_id, *args, **kwargs): + ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) + return_data = [] + for ro_task in ro_tasks: + for task in ro_task["tasks"]: + if task["action_id"] not in return_data: + return_data.append(task["action_id"]) + return return_data, None, True diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py new file mode 100644 index 00000000..0b96c536 --- /dev/null +++ b/NG-RO/osm_ng_ro/ns_thread.py @@ -0,0 +1,919 @@ +# -*- coding: utf-8 -*- + +## +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +## + +"""" +This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. +The tasks are stored at database in table ro_tasks +A single ro_task refers to a VIM element (flavor, image, network, ...). +A ro_task can contain several 'tasks', each one with a target, where to store the results +""" + +import threading +import time +import queue +import logging +from pkg_resources import iter_entry_points +# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version +from osm_common.dbbase import DbException +# from osm_common.fsbase import FsException +# from osm_common.msgbase import MsgException +from osm_ro_plugin.vim_dummy import VimDummyConnector +from osm_ro_plugin import vimconn +from copy import deepcopy +from unittest.mock import Mock + +__author__ = "Alfonso Tierno" +__date__ = "$28-Sep-2017 12:07:15$" + + +def deep_get(target_dict, *args, **kwargs): + """ + Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None + Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None + :param target_dict: dictionary to be read + :param args: list of keys to read from target_dict + :param kwargs: only can contain default=value to return if key is not present in the nested dictionary + :return: The wanted value if exist, None or default otherwise + """ + for key in args: + if not isinstance(target_dict, dict) or key not in target_dict: + return kwargs.get("default") + target_dict = target_dict[key] + return target_dict + + +class NsWorkerException(Exception): + pass + + +class FailingConnector: + def __init__(self, error_msg): + self.error_msg = error_msg + for method in dir(vimconn.VimConnector): + if method[0] != "_": + setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg))) + + +class NsWorkerExceptionNotFound(NsWorkerException): + pass + + +class NsWorker(threading.Thread): + REFRESH_BUILD = 5 # 5 seconds + REFRESH_ACTIVE = 60 # 1 minute + REFRESH_ERROR = 600 + REFRESH_IMAGE = 3600 * 10 + REFRESH_DELETE = 3600 * 10 + QUEUE_SIZE = 2000 + # TODO delete assigment_lock = Lock() + terminate = False + # TODO delete assignment = {} + MAX_TIME_LOCKED = 3600 + + def __init__(self, worker, config, plugins, db): + """Init a thread. + Arguments: + 'id' number of thead + 'name' name of thread + 'host','user': host ip or name to manage and user + 'db', 'db_lock': database class and lock to use it in exclusion + """ + threading.Thread.__init__(self) + self.config = config + self.plugins = plugins + self.plugin_name = "unknown" + self.logger = logging.getLogger('ro.worker{}'.format("worker")) + self.worker_id = worker + self.task_queue = queue.Queue(self.QUEUE_SIZE) + self.my_vims = {} # targetvim: vimplugin class + self.db_vims = {} # targetvim: vim information from database + self.vim_targets = [] # targetvim list + self.my_id = config["process_id"] + ":" + str(worker) + self.db = db + self.item2create = { + "net": self.new_net, + "vdu": self.new_vm, + "image": self.new_image, + "flavor": self.new_flavor, + } + self.item2refresh = { + "net": self.refresh_net, + "vdu": self.refresh_vm, + "image": self.refresh_ok, + "flavor": self.refresh_ok, + } + self.item2delete = { + "net": self.del_net, + "vdu": self.del_vm, + "image": self.delete_ok, + "flavor": self.del_flavor, + } + self.item2action = { + "vdu": self.exec_vm, + } + self.time_last_task_processed = None + + def insert_task(self, task): + try: + self.task_queue.put(task, False) + return None + except queue.Full: + raise NsWorkerException("timeout inserting a task") + + def terminate(self): + self.insert_task("exit") + + def del_task(self, task): + with self.task_lock: + if task["status"] == "SCHEDULED": + task["status"] = "SUPERSEDED" + return True + else: # task["status"] == "processing" + self.task_lock.release() + return False + + def _load_plugin(self, name, type="vim"): + # type can be vim or sdn + if "rovim_dummy" not in self.plugins: + self.plugins["rovim_dummy"] = VimDummyConnector + if name in self.plugins: + return self.plugins[name] + try: + for v in iter_entry_points('osm_ro{}.plugins'.format(type), name): + self.plugins[name] = v.load() + except Exception as e: + self.logger.critical("Cannot load osm_{}: {}".format(name, e)) + if name: + self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e)) + if name and name not in self.plugins: + error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \ + " registered".format(t=type, n=name) + self.logger.critical(error_text) + self.plugins[name] = FailingConnector(error_text) + + return self.plugins[name] + + def _load_vim(self, vim_account_id): + target_id = "vim:" + vim_account_id + plugin_name = "" + vim = None + try: + step = "Getting vim={} from db".format(vim_account_id) + vim = self.db.get_one("vim_accounts", {"_id": vim_account_id}) + + # if deep_get(vim, "config", "sdn-controller"): + # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) + # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) + + step = "Decrypt password" + schema_version = vim.get("schema_version") + self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'), + schema_version=schema_version, salt=vim_account_id) + + step = "Load plugin 'rovim_{}'".format(vim.get("vim_type")) + plugin_name = "rovim_" + vim["vim_type"] + vim_module_conn = self._load_plugin(plugin_name) + self.my_vims[target_id] = vim_module_conn( + uuid=vim['_id'], name=vim['name'], + tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'), + url=vim['vim_url'], url_admin=None, + user=vim['vim_user'], passwd=vim['vim_password'], + config=vim.get('config'), persistent_info={} + ) + self.vim_targets.append(target_id) + self.db_vims[target_id] = vim + self.error_status = None + self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format( + vim_account_id, plugin_name)) + except Exception as e: + self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format( + vim_account_id, plugin_name, step, e)) + self.db_vims[target_id] = vim or {} + self.my_vims[target_id] = FailingConnector(str(e)) + self.error_status = "Error loading vimconnector: {}".format(e) + + def _get_db_task(self): + """ + Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions + :return: None + """ + now = time.time() + if not self.time_last_task_processed: + self.time_last_task_processed = now + try: + while True: + locked = self.db.set_one( + "ro_tasks", + q_filter={"target_id": self.vim_targets, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], + "locked_at.lt": now - self.MAX_TIME_LOCKED, + "to_check_at.lt": self.time_last_task_processed}, + update_dict={"locked_by": self.my_id, "locked_at": now}, + fail_on_empty=False) + if locked: + # read and return + ro_task = self.db.get_one( + "ro_tasks", + q_filter={"target_id": self.vim_targets, + "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], + "locked_at": now}) + return ro_task + if self.time_last_task_processed == now: + self.time_last_task_processed = None + return None + else: + self.time_last_task_processed = now + # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) + + except DbException as e: + self.logger.error("Database exception at _get_db_task: {}".format(e)) + except Exception as e: + self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True) + return None + + def _delete_task(self, ro_task, task_index, task_depends, db_update): + """ + Determine if this task need to be done or superseded + :return: None + """ + my_task = ro_task["tasks"][task_index] + task_id = my_task["task_id"] + needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False) + if my_task["status"] == "FAILED": + return None, None # TODO need to be retry?? + try: + for index, task in enumerate(ro_task["tasks"]): + if index == task_index: + continue # own task + if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE": + # set to finished + db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED" + elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"): + needed_delete = False + if needed_delete: + return self.item2delete[my_task["item"]](ro_task, task_index) + else: + return "SUPERSEDED", None + except Exception as e: + if not isinstance(e, NsWorkerException): + self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e), + exc_info=True) + return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)} + + def _create_task(self, ro_task, task_index, task_depends, db_update): + """ + Determine if this task need to be created + :return: None + """ + my_task = ro_task["tasks"][task_index] + task_id = my_task["task_id"] + task_status = None + if my_task["status"] == "FAILED": + return None, None # TODO need to be retry?? + elif my_task["status"] == "SCHEDULED": + # check if already created by another task + for index, task in enumerate(ro_task["tasks"]): + if index == task_index: + continue # own task + if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"): + return task["status"], "COPY_VIM_INFO" + + try: + task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends) + # TODO update other CREATE tasks + except Exception as e: + if not isinstance(e, NsWorkerException): + self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) + task_status = "FAILED" + ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} + # TODO update ro_vim_item_update + return task_status, ro_vim_item_update + else: + return None, None + + def _get_dependency(self, task_id, ro_task=None, target_id=None): + if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): + ro_task_dependency = self.db.get_one( + "ro_tasks", + q_filter={"target_id": target_id, + "tasks.target_record_id": task_id + }, + fail_on_empty=False) + if ro_task_dependency: + for task_index, task in enumerate(ro_task_dependency["tasks"]): + if task["target_record_id"] == task_id: + return ro_task_dependency, task_index + + else: + if ro_task: + for task_index, task in enumerate(ro_task["tasks"]): + if task["task_id"] == task_id: + return ro_task, task_index + ro_task_dependency = self.db.get_one( + "ro_tasks", + q_filter={"tasks.ANYINDEX.task_id": task_id, + "tasks.ANYINDEX.target_record.ne": None + }, + fail_on_empty=False) + if ro_task_dependency: + for task_index, task in ro_task_dependency["tasks"]: + if task["task_id"] == task_id: + return ro_task_dependency, task_index + raise NsWorkerException("Cannot get depending task {}".format(task_id)) + + def _proccess_pending_tasks(self, ro_task): + ro_task_id = ro_task["_id"] + now = time.time() + next_check_at = now + (24*60*60) # one day + db_ro_task_update = {} + + def _update_refresh(new_status): + # compute next_refresh + nonlocal task + nonlocal next_check_at + nonlocal db_ro_task_update + nonlocal ro_task + + next_refresh = time.time() + if task["item"] in ("image", "flavor"): + next_refresh += self.REFRESH_IMAGE + elif new_status == "BUILD": + next_refresh += self.REFRESH_BUILD + elif new_status == "DONE": + next_refresh += self.REFRESH_ACTIVE + else: + next_refresh += self.REFRESH_ERROR + next_check_at = min(next_check_at, next_refresh) + db_ro_task_update["vim_info.refresh_at"] = next_refresh + ro_task["vim_info"]["refresh_at"] = next_refresh + + try: + # 0 get task_status_create + task_status_create = None + task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and + t["status"] in ("BUILD", "DONE")), None) + if task_create: + task_status_create = task_create["status"] + # 1. look for SCHEDULED or if CREATE also DONE,BUILD + for task_action in ("DELETE", "CREATE", "EXEC"): + db_vim_update = None + for task_index, task in enumerate(ro_task["tasks"]): + target_update = None + if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\ + task["action"] != task_action or \ + (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")): + continue + task_path = "tasks.{}.status".format(task_index) + try: + if task["status"] == "SCHEDULED": + task_depends = {} + # check if tasks that this depends on have been completed + dependency_not_completed = False + for dependency_task_id in (task.get("depends_on") or ()): + dependency_ro_task, dependency_task_index = \ + self._get_dependency(dependency_task_id, target_id=ro_task["target_id"]) + dependency_task = dependency_ro_task["tasks"][dependency_task_index] + if dependency_task["status"] == "SCHEDULED": + dependency_not_completed = True + next_check_at = min(next_check_at, dependency_ro_task["to_check_at"]) + break + elif dependency_task["status"] == "FAILED": + error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( + task["action"], task["item"], dependency_task["action"], + dependency_task["item"], dependency_task_id, + dependency_ro_task["vim_info"].get("vim_details")) + self.logger.error("task={} {}".format(task["task_id"], error_text)) + raise NsWorkerException(error_text) + + task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"] + task_depends["TASK-{}".format(dependency_task_id)] = \ + dependency_ro_task["vim_info"]["vim_id"] + if dependency_not_completed: + # TODO set at vim_info.vim_details that it is waiting + continue + + if task["action"] == "DELETE": + new_status, db_vim_info_update = self._delete_task(ro_task, task_index, + task_depends, db_ro_task_update) + new_status = "FINISHED" if new_status == "DONE" else new_status + # ^with FINISHED instead of DONE it will not be refreshing + if new_status in ("FINISHED", "SUPERSEDED"): + target_update = "DELETE" + elif task["action"] == "EXEC": + self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update) + new_status = "FINISHED" if new_status == "DONE" else new_status + # ^with FINISHED instead of DONE it will not be refreshing + if new_status in ("FINISHED", "SUPERSEDED"): + target_update = "DELETE" + elif task["action"] == "CREATE": + if task["status"] == "SCHEDULED": + if task_status_create: + new_status = task_status_create + target_update = "COPY_VIM_INFO" + else: + new_status, db_vim_info_update = \ + self.item2create[task["item"]](ro_task, task_index, task_depends) + # self._create_task(ro_task, task_index, task_depends, db_ro_task_update) + _update_refresh(new_status) + else: + if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]: + new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task) + _update_refresh(new_status) + except Exception as e: + new_status = "FAILED" + db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)} + if not isinstance(e, (NsWorkerException, vimconn.VimConnException)): + self.logger.error("Unexpected exception at _delete_task task={}: {}". + format(task["task_id"], e), exc_info=True) + + try: + if db_vim_info_update: + db_vim_update = db_vim_info_update.copy() + db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()}) + ro_task["vim_info"].update(db_vim_info_update) + + if new_status: + if task_action == "CREATE": + task_status_create = new_status + db_ro_task_update[task_path] = new_status + if target_update or db_vim_update: + + if target_update == "DELETE": + self._update_target(task, None) + elif target_update == "COPY_VIM_INFO": + self._update_target(task, ro_task["vim_info"]) + else: + self._update_target(task, db_vim_update) + + except Exception as e: + self.logger.error("Unexpected exception at _update_target task={}: {}". + format(task["task_id"], e), exc_info=True) + + # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, + # outside this task (by ro_nbi) do not update it + db_ro_task_update["locked_by"] = None + # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked + db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED) + db_ro_task_update["to_check_at"] = next_check_at + if not self.db.set_one("ro_tasks", + update_dict=db_ro_task_update, + q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]}, + fail_on_empty=False): + del db_ro_task_update["to_check_at"] + self.db.set_one("ro_tasks", + q_filter={"_id": ro_task["_id"]}, + update_dict=db_ro_task_update, + fail_on_empty=True) + except DbException as e: + self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e)) + except Exception as e: + self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True) + + def _update_target(self, task, ro_vim_item_update): + try: + table, _id, path = task["target_record"].split(":") + if ro_vim_item_update: + update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in + ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')} + if ro_vim_item_update.get("interfaces"): + path_vdu = path[:path.rfind(".")] + path_vdu = path_vdu[:path_vdu.rfind(".")] + path_interfaces = path_vdu + ".interfaces" + for i, iface in enumerate(ro_vim_item_update.get("interfaces")): + if iface: + update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if + k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')}) + if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): + update_dict["ip-address"] = iface.get("ip_address").split(";")[0] + if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): + update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0] + + self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) + else: + self.db.set_one(table, q_filter={"_id": _id}, update_dict=None, + unset={path: None}) + except DbException as e: + self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e)) + + def new_image(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + # FIND + if task.get("find_params"): + vim_images = target_vim.get_image_list(**task["find_params"]) + if not vim_images: + raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format( + task["find_params"])) + elif len(vim_images) > 1: + raise NsWorkerException( + "More than one network found with this criteria: '{}'".format(task["find_params"])) + else: + vim_image_id = vim_images[0]["id"] + + ro_vim_item_update = {"vim_id": vim_image_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None} + self.logger.debug( + "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created)) + return "DONE", ro_vim_item_update + except (NsWorkerException, vimconn.VimConnException) as e: + self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def del_flavor(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + flavor_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = {"vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None} + try: + if flavor_vim_id: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_flavor(flavor_vim_id) + + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + + except vimconn.VimConnException as e: + self.logger.error("ro_task={} vim={} del-flavor={}: {}".format( + ro_task["_id"], ro_task["target_id"], flavor_vim_id, e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e)} + return "FAILED", ro_vim_item_update + + self.logger.debug("task={} {} del-flavor={} {}".format( + task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", ""))) + return "DONE", ro_vim_item_update_ok + + def refresh_ok(self, ro_task): + """skip calling VIM to get image status. Assumes ok""" + if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": + return "FAILED", {} + return "DONE", {} + + def delete_ok(self, ro_task): + """skip calling VIM to delete image status. Assumes ok""" + return "DONE", {} + + def new_flavor(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + # FIND + vim_flavor_id = None + if task.get("find_params"): + try: + flavor_data = task["find_params"]["flavor_data"] + vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) + except vimconn.VimConnNotFoundException: + pass + + if not vim_flavor_id and task.get("params"): + # CREATE + flavor_data = task["params"]["flavor_data"] + vim_flavor_id = target_vim.new_flavor(flavor_data) + created = True + + ro_vim_item_update = {"vim_id": vim_flavor_id, + "vim_status": "DONE", + "created": created, + "created_items": created_items, + "vim_details": None} + self.logger.debug( + "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created)) + return "DONE", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def new_net(self, ro_task, task_index, task_depends): + vim_net_id = None + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + # FIND + if task.get("find_params"): + # if management, get configuration of VIM + if task["find_params"].get("filter_dict"): + vim_filter = task["find_params"]["filter_dict"] + elif task["find_params"].get("mgmt"): # mamagement network + if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"): + vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]} + elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"): + vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]} + else: + vim_filter = {"name": task["find_params"]["name"]} + else: + raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"])) + + vim_nets = target_vim.get_network_list(vim_filter) + if not vim_nets and not task.get("params"): + raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format( + task.get("find_params"))) + elif len(vim_nets) > 1: + raise NsWorkerException( + "More than one network found with this criteria: '{}'".format(task["find_params"])) + if vim_nets: + vim_net_id = vim_nets[0]["id"] + else: + # CREATE + params = task["params"] + vim_net_id, created_items = target_vim.new_network(**params) + created = True + + ro_vim_item_update = {"vim_id": vim_net_id, + "vim_status": "BUILD", + "created": created, + "created_items": created_items, + "vim_details": None} + self.logger.debug( + "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created)) + return "BUILD", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def refresh_net(self, ro_task): + """Call VIM to get network status""" + ro_task_id = ro_task["_id"] + target_vim = self.my_vims[ro_task["target_id"]] + + vim_id = ro_task["vim_info"]["vim_id"] + net_to_refresh_list = [vim_id] + try: + vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) + vim_info = vim_dict[vim_id] + if vim_info["status"] == "ACTIVE": + task_status = "DONE" + elif vim_info["status"] == "BUILD": + task_status = "BUILD" + else: + task_status = "FAILED" + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + task_status = "FAILED" + + ro_vim_item_update = {} + if ro_task["vim_info"]["vim_status"] != vim_info["status"]: + ro_vim_item_update["vim_status"] = vim_info["status"] + if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): + ro_vim_item_update["vim_name"] = vim_info.get("name") + if vim_info["status"] in ("ERROR", "VIM_ERROR"): + if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: + ro_vim_item_update["vim_details"] = vim_info["error_msg"] + elif vim_info["status"] == "DELETED": + ro_vim_item_update["vim_id"] = None + ro_vim_item_update["vim_details"] = "Deleted externally" + else: + if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: + ro_vim_item_update["vim_details"] = vim_info["vim_info"] + if ro_vim_item_update: + self.logger.debug("ro_task={} {} get-net={}: status={} {}".format( + ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), + ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) + return task_status, ro_vim_item_update + + def del_net(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + net_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = {"vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None} + try: + if net_vim_id or ro_task["vim_info"]["created_items"]: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"]) + + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + + except vimconn.VimConnException as e: + self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"], + net_vim_id, e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e)} + return "FAILED", ro_vim_item_update + + self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id, + ro_vim_item_update_ok.get("vim_details", ""))) + return "DONE", ro_vim_item_update_ok + + def new_vm(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + created = False + created_items = {} + target_vim = self.my_vims[ro_task["target_id"]] + try: + created = True + params = task["params"] + params_copy = deepcopy(params) + net_list = params_copy["net_list"] + for net in net_list: + if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id + network_id = task_depends[net["net_id"]] + if not network_id: + raise NsWorkerException("Cannot create VM because depends on a network not created or found " + "for {}".format(net["net_id"])) + net["net_id"] = network_id + if params_copy["image_id"].startswith("TASK-"): + params_copy["image_id"] = task_depends[params_copy["image_id"]] + if params_copy["flavor_id"].startswith("TASK-"): + params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] + + vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) + interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] + + ro_vim_item_update = {"vim_id": vim_vm_id, + "vim_status": "BUILD", + "created": created, + "created_items": created_items, + "vim_details": None, + "interfaces_vim_ids": interfaces, + "interfaces": [], + } + self.logger.debug( + "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created)) + return "BUILD", ro_vim_item_update + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "created": created, + "vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def del_vm(self, ro_task, task_index): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + vm_vim_id = ro_task["vim_info"]["vim_id"] + ro_vim_item_update_ok = {"vim_status": "DELETED", + "created": False, + "vim_details": "DELETED", + "vim_id": None} + try: + if vm_vim_id or ro_task["vim_info"]["created_items"]: + target_vim = self.my_vims[ro_task["target_id"]] + target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"]) + + except vimconn.VimConnNotFoundException: + ro_vim_item_update_ok["vim_details"] = "already deleted" + + except vimconn.VimConnException as e: + self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"], + vm_vim_id, e)) + ro_vim_item_update = {"vim_status": "VIM_ERROR", + "vim_details": "Error while deleting: {}".format(e)} + return "FAILED", ro_vim_item_update + + self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id, + ro_vim_item_update_ok.get("vim_details", ""))) + return "DONE", ro_vim_item_update_ok + + def refresh_vm(self, ro_task): + """Call VIM to get vm status""" + ro_task_id = ro_task["_id"] + target_vim = self.my_vims[ro_task["target_id"]] + + vim_id = ro_task["vim_info"]["vim_id"] + if not vim_id: + return None, None + vm_to_refresh_list = [vim_id] + try: + vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) + vim_info = vim_dict[vim_id] + if vim_info["status"] == "ACTIVE": + task_status = "DONE" + elif vim_info["status"] == "BUILD": + task_status = "BUILD" + else: + task_status = "FAILED" + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + task_status = "FAILED" + + ro_vim_item_update = {} + # TODO check and update interfaces + vim_interfaces = [] + for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: + iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None) + # if iface: + # iface.pop("vim_info", None) + vim_interfaces.append(iface) + + task = ro_task["tasks"][0] # TODO look for a task CREATE and active + if task.get("mgmt_vnf_interface") is not None: + vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True + mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0)) + vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True + + if ro_task["vim_info"]["interfaces"] != vim_interfaces: + ro_vim_item_update["interfaces"] = vim_interfaces + if ro_task["vim_info"]["vim_status"] != vim_info["status"]: + ro_vim_item_update["vim_status"] = vim_info["status"] + if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): + ro_vim_item_update["vim_name"] = vim_info.get("name") + if vim_info["status"] in ("ERROR", "VIM_ERROR"): + if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]: + ro_vim_item_update["vim_details"] = vim_info["error_msg"] + elif vim_info["status"] == "DELETED": + ro_vim_item_update["vim_id"] = None + ro_vim_item_update["vim_details"] = "Deleted externally" + else: + if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: + ro_vim_item_update["vim_details"] = vim_info["vim_info"] + if ro_vim_item_update: + self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format( + ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"), + ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else '')) + return task_status, ro_vim_item_update + + def exec_vm(self, ro_task, task_index, task_depends): + task = ro_task["tasks"][task_index] + task_id = task["task_id"] + target_vim = self.my_vims[ro_task["target_id"]] + try: + params = task["params"] + params_copy = deepcopy(params) + params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"), + params_copy.pop("schema_version"), params_copy.pop("salt")) + + target_vim.inject_user_key(**params_copy) + self.logger.debug( + "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])) + return "DONE", params_copy["key"] + except (vimconn.VimConnException, NsWorkerException) as e: + self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e)) + ro_vim_item_update = {"vim_details": str(e)} + return "FAILED", ro_vim_item_update + + def run(self): + # load database + self.logger.debug("Starting") + while True: + try: + task = self.task_queue.get(block=False if self.my_vims else True) + if task[0] == "terminate": + break + if task[0] == "load_vim": + self._load_vim(task[1]) + continue + except queue.Empty: + pass + + try: + busy = False + ro_task = self._get_db_task() + if ro_task: + self._proccess_pending_tasks(ro_task) + busy = True + if not busy: + time.sleep(5) + except Exception as e: + self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True) + + self.logger.debug("Finishing") diff --git a/NG-RO/osm_ng_ro/ro.cfg b/NG-RO/osm_ng_ro/ro.cfg new file mode 100644 index 00000000..4af28309 --- /dev/null +++ b/NG-RO/osm_ng_ro/ro.cfg @@ -0,0 +1,93 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[/] +# tools.secureheaders.on = True +tools.sessions.on = True +# increase security on sessions +tools.sessions.secure = True +tools.sessions.httponly = True +tools.encode.on: True, +tools.encode.encoding: 'utf-8' +tools.response_headers.on = True + +# tools.auth_basic.on: True, +# tools.auth_basic.realm: 'localhost', +# tools.auth_basic.checkpassword: get_tokens + + +[/static] +# use env OSMRO_STATIC_ON, OSMRO_STATIC_DIR to override +tools.staticdir.on: True +tools.staticdir.dir: "/app/RO/RO-NG/osm_ng_ro/html_public" + + +[global] +# use env OSMRO_SERVER_XXX, OSMRO_LOG_XXX, OSMRO_TEST_XXX or OSMRO_AUTH_XXX to override. Use value in yaml format +server.socket_host: "0.0.0.0" +server.socket_port: 9998 + +# server.ssl_module: "builtin" +# server.ssl_certificate: "./http/cert.pem" +# server.ssl_private_key: "./http/privkey.pem" +# server.ssl_pass_phrase: "osm4u" +server.thread_pool: 10 +server.ns_threads: 1 + +# Uncomment for allow basic authentication apart from bearer +# auth.allow_basic_authentication: True + +# comment or set to False to disable /test URL +server.enable_test: True + +log.screen: False +log.access_file: "" +log.error_file: "" + +log.level: "DEBUG" +#log.file: /var/log/osm/ro.log + + +[database] +# use env OSMRO_DATABASE_XXX to override +driver: "mongo" # mongo or memory +uri: "mongodb://mongo:27017" +name: "osm" +# user: "user" +# password: "password" +# commonkey: "commonkey" + +[storage] +# use env OSMRO_STORAGE_XXX to override +driver: "local" # local filesystem +# for local provide file path +path: "/app/storage" #"/home/atierno/OSM/osm/NBI/local/storage" + +loglevel: "DEBUG" +#logfile: /var/log/osm/ro-storage.log + +[message] +# use env OSMRO_MESSAGE_XXX to override +driver: "kafka" # local or kafka +# for local provide file path +path: "/app/storage/kafka" +host: "kafka" +port: 9092 + +loglevel: "DEBUG" +#logfile: /var/log/osm/ro-message.log +group_id: "ro-server" + +[authentication] +# use env OSMRO_AUTHENTICATION_XXX to override + diff --git a/NG-RO/osm_ng_ro/ro_main.py b/NG-RO/osm_ng_ro/ro_main.py new file mode 100644 index 00000000..35a93fe9 --- /dev/null +++ b/NG-RO/osm_ng_ro/ro_main.py @@ -0,0 +1,740 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +## +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +import cherrypy +import time +import json +import yaml +import osm_ng_ro.html_out as html +import logging +import logging.handlers +import getopt +import sys + +from osm_ng_ro.ns import Ns, NsException +from osm_ng_ro.validation import ValidationError +from osm_common.dbbase import DbException +from osm_common.fsbase import FsException +from osm_common.msgbase import MsgException +from http import HTTPStatus +from codecs import getreader +from os import environ, path +from osm_ng_ro import version as ro_version, version_date as ro_version_date + +__author__ = "Alfonso Tierno " + +__version__ = "0.1." # file version, not NBI version +version_date = "May 2020" + +database_version = '1.2' +auth_database_version = '1.0' +ro_server = None # instance of Server class +# vim_threads = None # instance of VimThread class + +""" +RO North Bound Interface +URL: /ro GET POST PUT DELETE PATCH + /ns/v1/deploy O + / O O O + / O + /cancel O + +""" + +valid_query_string = ("ADMIN", "SET_PROJECT", "FORCE", "PUBLIC") +# ^ Contains possible administrative query string words: +# ADMIN=True(by default)|Project|Project-list: See all elements, or elements of a project +# (not owned by my session project). +# PUBLIC=True(by default)|False: See/hide public elements. Set/Unset a topic to be public +# FORCE=True(by default)|False: Force edition/deletion operations +# SET_PROJECT=Project|Project-list: Add/Delete the topic to the projects portfolio + +valid_url_methods = { + # contains allowed URL and methods, and the role_permission name + "admin": { + "v1": { + "tokens": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "tokens:", + "": { + "METHODS": ("DELETE",), + "ROLE_PERMISSION": "tokens:id:" + } + }, + } + }, + "ns": { + "v1": { + "deploy": { + "METHODS": ("GET",), + "ROLE_PERMISSION": "deploy:", + "": { + "METHODS": ("GET", "POST", "DELETE"), + "ROLE_PERMISSION": "deploy:id:", + "": { + "METHODS": ("GET",), + "ROLE_PERMISSION": "deploy:id:id:", + "cancel": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "deploy:id:id:cancel", + } + } + } + }, + } + }, +} + + +class RoException(Exception): + + def __init__(self, message, http_code=HTTPStatus.METHOD_NOT_ALLOWED): + Exception.__init__(self, message) + self.http_code = http_code + + +class AuthException(RoException): + pass + + +class Authenticator: + + def __init__(self, valid_url_methods, valid_query_string): + self.valid_url_methods = valid_url_methods + self.valid_query_string = valid_query_string + + def authorize(self, *args, **kwargs): + return {"token": "ok", "id": "ok"} + + def new_token(self, token_info, indata, remote): + return {"token": "ok", + "id": "ok", + "remote": remote} + + def del_token(self, token_id): + pass + + def start(self, engine_config): + pass + + +class Server(object): + instance = 0 + # to decode bytes to str + reader = getreader("utf-8") + + def __init__(self): + self.instance += 1 + self.authenticator = Authenticator(valid_url_methods, valid_query_string) + self.ns = Ns() + self.map_operation = { + "token:post": self.new_token, + "token:id:delete": self.del_token, + "deploy:get": self.ns.get_deploy, + "deploy:id:get": self.ns.get_actions, + "deploy:id:post": self.ns.deploy, + "deploy:id:delete": self.ns.delete, + "deploy:id:id:get": self.ns.status, + "deploy:id:id:cancel:post": self.ns.cancel, + } + + def _format_in(self, kwargs): + try: + indata = None + if cherrypy.request.body.length: + error_text = "Invalid input format " + + if "Content-Type" in cherrypy.request.headers: + if "application/json" in cherrypy.request.headers["Content-Type"]: + error_text = "Invalid json format " + indata = json.load(self.reader(cherrypy.request.body)) + cherrypy.request.headers.pop("Content-File-MD5", None) + elif "application/yaml" in cherrypy.request.headers["Content-Type"]: + error_text = "Invalid yaml format " + indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader) + cherrypy.request.headers.pop("Content-File-MD5", None) + elif "application/binary" in cherrypy.request.headers["Content-Type"] or \ + "application/gzip" in cherrypy.request.headers["Content-Type"] or \ + "application/zip" in cherrypy.request.headers["Content-Type"] or \ + "text/plain" in cherrypy.request.headers["Content-Type"]: + indata = cherrypy.request.body # .read() + elif "multipart/form-data" in cherrypy.request.headers["Content-Type"]: + if "descriptor_file" in kwargs: + filecontent = kwargs.pop("descriptor_file") + if not filecontent.file: + raise RoException("empty file or content", HTTPStatus.BAD_REQUEST) + indata = filecontent.file # .read() + if filecontent.content_type.value: + cherrypy.request.headers["Content-Type"] = filecontent.content_type.value + else: + # raise cherrypy.HTTPError(HTTPStatus.Not_Acceptable, + # "Only 'Content-Type' of type 'application/json' or + # 'application/yaml' for input format are available") + error_text = "Invalid yaml format " + indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader) + cherrypy.request.headers.pop("Content-File-MD5", None) + else: + error_text = "Invalid yaml format " + indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader) + cherrypy.request.headers.pop("Content-File-MD5", None) + if not indata: + indata = {} + + format_yaml = False + if cherrypy.request.headers.get("Query-String-Format") == "yaml": + format_yaml = True + + for k, v in kwargs.items(): + if isinstance(v, str): + if v == "": + kwargs[k] = None + elif format_yaml: + try: + kwargs[k] = yaml.load(v, Loader=yaml.SafeLoader) + except Exception: + pass + elif k.endswith(".gt") or k.endswith(".lt") or k.endswith(".gte") or k.endswith(".lte"): + try: + kwargs[k] = int(v) + except Exception: + try: + kwargs[k] = float(v) + except Exception: + pass + elif v.find(",") > 0: + kwargs[k] = v.split(",") + elif isinstance(v, (list, tuple)): + for index in range(0, len(v)): + if v[index] == "": + v[index] = None + elif format_yaml: + try: + v[index] = yaml.load(v[index], Loader=yaml.SafeLoader) + except Exception: + pass + + return indata + except (ValueError, yaml.YAMLError) as exc: + raise RoException(error_text + str(exc), HTTPStatus.BAD_REQUEST) + except KeyError as exc: + raise RoException("Query string error: " + str(exc), HTTPStatus.BAD_REQUEST) + except Exception as exc: + raise RoException(error_text + str(exc), HTTPStatus.BAD_REQUEST) + + @staticmethod + def _format_out(data, token_info=None, _format=None): + """ + return string of dictionary data according to requested json, yaml, xml. By default json + :param data: response to be sent. Can be a dict, text or file + :param token_info: Contains among other username and project + :param _format: The format to be set as Content-Type if data is a file + :return: None + """ + accept = cherrypy.request.headers.get("Accept") + if data is None: + if accept and "text/html" in accept: + return html.format(data, cherrypy.request, cherrypy.response, token_info) + # cherrypy.response.status = HTTPStatus.NO_CONTENT.value + return + elif hasattr(data, "read"): # file object + if _format: + cherrypy.response.headers["Content-Type"] = _format + elif "b" in data.mode: # binariy asssumig zip + cherrypy.response.headers["Content-Type"] = 'application/zip' + else: + cherrypy.response.headers["Content-Type"] = 'text/plain' + # TODO check that cherrypy close file. If not implement pending things to close per thread next + return data + if accept: + if "application/json" in accept: + cherrypy.response.headers["Content-Type"] = 'application/json; charset=utf-8' + a = json.dumps(data, indent=4) + "\n" + return a.encode("utf8") + elif "text/html" in accept: + return html.format(data, cherrypy.request, cherrypy.response, token_info) + + elif "application/yaml" in accept or "*/*" in accept or "text/plain" in accept: + pass + # if there is not any valid accept, raise an error. But if response is already an error, format in yaml + elif cherrypy.response.status >= 400: + raise cherrypy.HTTPError(HTTPStatus.NOT_ACCEPTABLE.value, + "Only 'Accept' of type 'application/json' or 'application/yaml' " + "for output format are available") + cherrypy.response.headers["Content-Type"] = 'application/yaml' + return yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False, tags=False, + encoding='utf-8', allow_unicode=True) # , canonical=True, default_style='"' + + @cherrypy.expose + def index(self, *args, **kwargs): + token_info = None + try: + if cherrypy.request.method == "GET": + token_info = self.authenticator.authorize() + outdata = token_info # Home page + else: + raise cherrypy.HTTPError(HTTPStatus.METHOD_NOT_ALLOWED.value, + "Method {} not allowed for tokens".format(cherrypy.request.method)) + + return self._format_out(outdata, token_info) + + except (NsException, AuthException) as e: + # cherrypy.log("index Exception {}".format(e)) + cherrypy.response.status = e.http_code.value + return self._format_out("Welcome to OSM!", token_info) + + @cherrypy.expose + def version(self, *args, **kwargs): + # TODO consider to remove and provide version using the static version file + try: + if cherrypy.request.method != "GET": + raise RoException("Only method GET is allowed", HTTPStatus.METHOD_NOT_ALLOWED) + elif args or kwargs: + raise RoException("Invalid URL or query string for version", HTTPStatus.METHOD_NOT_ALLOWED) + # TODO include version of other modules, pick up from some kafka admin message + osm_ng_ro_version = {"version": ro_version, "date": ro_version_date} + return self._format_out(osm_ng_ro_version) + except RoException as e: + cherrypy.response.status = e.http_code.value + problem_details = { + "code": e.http_code.name, + "status": e.http_code.value, + "detail": str(e), + } + return self._format_out(problem_details, None) + + def new_token(self, engine_session, indata, *args, **kwargs): + token_info = None + + try: + token_info = self.authenticator.authorize() + except Exception: + token_info = None + if kwargs: + indata.update(kwargs) + # This is needed to log the user when authentication fails + cherrypy.request.login = "{}".format(indata.get("username", "-")) + token_info = self.authenticator.new_token(token_info, indata, cherrypy.request.remote) + cherrypy.session['Authorization'] = token_info["id"] + self._set_location_header("admin", "v1", "tokens", token_info["id"]) + # for logging + + # cherrypy.response.cookie["Authorization"] = outdata["id"] + # cherrypy.response.cookie["Authorization"]['expires'] = 3600 + return token_info, token_info["id"], True + + def del_token(self, engine_session, indata, version, _id, *args, **kwargs): + token_id = _id + if not token_id and "id" in kwargs: + token_id = kwargs["id"] + elif not token_id: + token_info = self.authenticator.authorize() + # for logging + token_id = token_info["id"] + self.authenticator.del_token(token_id) + token_info = None + cherrypy.session['Authorization'] = "logout" + # cherrypy.response.cookie["Authorization"] = token_id + # cherrypy.response.cookie["Authorization"]['expires'] = 0 + return None, None, True + + @cherrypy.expose + def test(self, *args, **kwargs): + if not cherrypy.config.get("server.enable_test") or (isinstance(cherrypy.config["server.enable_test"], str) and + cherrypy.config["server.enable_test"].lower() == "false"): + cherrypy.response.status = HTTPStatus.METHOD_NOT_ALLOWED.value + return "test URL is disabled" + thread_info = None + if args and args[0] == "help": + return "
\ninit\nfile/  download file\ndb-clear/table\nfs-clear[/folder]\nlogin\nlogin2\n"\
+                   "sleep/
" + + elif args and args[0] == "init": + try: + # self.ns.load_dbase(cherrypy.request.app.config) + self.ns.create_admin() + return "Done. User 'admin', password 'admin' created" + except Exception: + cherrypy.response.status = HTTPStatus.FORBIDDEN.value + return self._format_out("Database already initialized") + elif args and args[0] == "file": + return cherrypy.lib.static.serve_file(cherrypy.tree.apps['/ro'].config["storage"]["path"] + "/" + args[1], + "text/plain", "attachment") + elif args and args[0] == "file2": + f_path = cherrypy.tree.apps['/ro'].config["storage"]["path"] + "/" + args[1] + f = open(f_path, "r") + cherrypy.response.headers["Content-type"] = "text/plain" + return f + + elif len(args) == 2 and args[0] == "db-clear": + deleted_info = self.ns.db.del_list(args[1], kwargs) + return "{} {} deleted\n".format(deleted_info["deleted"], args[1]) + elif len(args) and args[0] == "fs-clear": + if len(args) >= 2: + folders = (args[1],) + else: + folders = self.ns.fs.dir_ls(".") + for folder in folders: + self.ns.fs.file_delete(folder) + return ",".join(folders) + " folders deleted\n" + elif args and args[0] == "login": + if not cherrypy.request.headers.get("Authorization"): + cherrypy.response.headers["WWW-Authenticate"] = 'Basic realm="Access to OSM site", charset="UTF-8"' + cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value + elif args and args[0] == "login2": + if not cherrypy.request.headers.get("Authorization"): + cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="Access to OSM site"' + cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value + elif args and args[0] == "sleep": + sleep_time = 5 + try: + sleep_time = int(args[1]) + except Exception: + cherrypy.response.status = HTTPStatus.FORBIDDEN.value + return self._format_out("Database already initialized") + thread_info = cherrypy.thread_data + print(thread_info) + time.sleep(sleep_time) + # thread_info + elif len(args) >= 2 and args[0] == "message": + main_topic = args[1] + return_text = "
{} ->\n".format(main_topic)
+            try:
+                if cherrypy.request.method == 'POST':
+                    to_send = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+                    for k, v in to_send.items():
+                        self.ns.msg.write(main_topic, k, v)
+                        return_text += "  {}: {}\n".format(k, v)
+                elif cherrypy.request.method == 'GET':
+                    for k, v in kwargs.items():
+                        self.ns.msg.write(main_topic, k, yaml.load(v, Loader=yaml.SafeLoader))
+                        return_text += "  {}: {}\n".format(k, yaml.load(v, Loader=yaml.SafeLoader))
+            except Exception as e:
+                return_text += "Error: " + str(e)
+            return_text += "
\n" + return return_text + + return_text = ( + "
\nheaders:\n  args: {}\n".format(args) +
+            "  kwargs: {}\n".format(kwargs) +
+            "  headers: {}\n".format(cherrypy.request.headers) +
+            "  path_info: {}\n".format(cherrypy.request.path_info) +
+            "  query_string: {}\n".format(cherrypy.request.query_string) +
+            "  session: {}\n".format(cherrypy.session) +
+            "  cookie: {}\n".format(cherrypy.request.cookie) +
+            "  method: {}\n".format(cherrypy.request.method) +
+            "  session: {}\n".format(cherrypy.session.get('fieldname')) +
+            "  body:\n")
+        return_text += "    length: {}\n".format(cherrypy.request.body.length)
+        if cherrypy.request.body.length:
+            return_text += "    content: {}\n".format(
+                str(cherrypy.request.body.read(int(cherrypy.request.headers.get('Content-Length', 0)))))
+        if thread_info:
+            return_text += "thread: {}\n".format(thread_info)
+        return_text += "
" + return return_text + + @staticmethod + def _check_valid_url_method(method, *args): + if len(args) < 3: + raise RoException("URL must contain at least 'main_topic/version/topic'", HTTPStatus.METHOD_NOT_ALLOWED) + + reference = valid_url_methods + for arg in args: + if arg is None: + break + if not isinstance(reference, dict): + raise RoException("URL contains unexpected extra items '{}'".format(arg), + HTTPStatus.METHOD_NOT_ALLOWED) + + if arg in reference: + reference = reference[arg] + elif "" in reference: + reference = reference[""] + elif "*" in reference: + # reference = reference["*"] + break + else: + raise RoException("Unexpected URL item {}".format(arg), HTTPStatus.METHOD_NOT_ALLOWED) + if "TODO" in reference and method in reference["TODO"]: + raise RoException("Method {} not supported yet for this URL".format(method), HTTPStatus.NOT_IMPLEMENTED) + elif "METHODS" not in reference or method not in reference["METHODS"]: + raise RoException("Method {} not supported for this URL".format(method), HTTPStatus.METHOD_NOT_ALLOWED) + return reference["ROLE_PERMISSION"] + method.lower() + + @staticmethod + def _set_location_header(main_topic, version, topic, id): + """ + Insert response header Location with the URL of created item base on URL params + :param main_topic: + :param version: + :param topic: + :param id: + :return: None + """ + # Use cherrypy.request.base for absoluted path and make use of request.header HOST just in case behind aNAT + cherrypy.response.headers["Location"] = "/ro/{}/{}/{}/{}".format(main_topic, version, topic, id) + return + + @cherrypy.expose + def default(self, main_topic=None, version=None, topic=None, _id=None, _id2=None, *args, **kwargs): + token_info = None + outdata = None + _format = None + method = "DONE" + rollback = [] + engine_session = None + try: + if not main_topic or not version or not topic: + raise RoException("URL must contain at least 'main_topic/version/topic'", + HTTPStatus.METHOD_NOT_ALLOWED) + if main_topic not in ("admin", "ns",): + raise RoException("URL main_topic '{}' not supported".format(main_topic), + HTTPStatus.METHOD_NOT_ALLOWED) + if version != 'v1': + raise RoException("URL version '{}' not supported".format(version), HTTPStatus.METHOD_NOT_ALLOWED) + + if kwargs and "METHOD" in kwargs and kwargs["METHOD"] in ("PUT", "POST", "DELETE", "GET", "PATCH"): + method = kwargs.pop("METHOD") + else: + method = cherrypy.request.method + + role_permission = self._check_valid_url_method(method, main_topic, version, topic, _id, _id2, *args, + **kwargs) + # skip token validation if requesting a token + indata = self._format_in(kwargs) + if main_topic != "admin" or topic != "tokens": + token_info = self.authenticator.authorize(role_permission, _id) + outdata, created_id, done = self.map_operation[role_permission]( + engine_session, indata, version, _id, _id2, *args, *kwargs) + if created_id: + self._set_location_header(main_topic, version, topic, _id) + cherrypy.response.status = HTTPStatus.ACCEPTED.value if not done else HTTPStatus.OK.value if \ + outdata is not None else HTTPStatus.NO_CONTENT.value + return self._format_out(outdata, token_info, _format) + except Exception as e: + if isinstance(e, (RoException, NsException, DbException, FsException, MsgException, AuthException, + ValidationError)): + http_code_value = cherrypy.response.status = e.http_code.value + http_code_name = e.http_code.name + cherrypy.log("Exception {}".format(e)) + else: + http_code_value = cherrypy.response.status = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR + cherrypy.log("CRITICAL: Exception {}".format(e), traceback=True) + http_code_name = HTTPStatus.BAD_REQUEST.name + if hasattr(outdata, "close"): # is an open file + outdata.close() + error_text = str(e) + rollback.reverse() + for rollback_item in rollback: + try: + if rollback_item.get("operation") == "set": + self.ns.db.set_one(rollback_item["topic"], {"_id": rollback_item["_id"]}, + rollback_item["content"], fail_on_empty=False) + else: + self.ns.db.del_one(rollback_item["topic"], {"_id": rollback_item["_id"]}, + fail_on_empty=False) + except Exception as e2: + rollback_error_text = "Rollback Exception {}: {}".format(rollback_item, e2) + cherrypy.log(rollback_error_text) + error_text += ". " + rollback_error_text + # if isinstance(e, MsgException): + # error_text = "{} has been '{}' but other modules cannot be informed because an error on bus".format( + # engine_topic[:-1], method, error_text) + problem_details = { + "code": http_code_name, + "status": http_code_value, + "detail": error_text, + } + return self._format_out(problem_details, token_info) + # raise cherrypy.HTTPError(e.http_code.value, str(e)) + finally: + if token_info: + if method in ("PUT", "PATCH", "POST") and isinstance(outdata, dict): + for logging_id in ("id", "op_id", "nsilcmop_id", "nslcmop_id"): + if outdata.get(logging_id): + cherrypy.request.login += ";{}={}".format(logging_id, outdata[logging_id][:36]) + + +def _start_service(): + """ + Callback function called when cherrypy.engine starts + Override configuration with env variables + Set database, storage, message configuration + Init database with admin/admin user password + """ + global ro_server + # global vim_threads + cherrypy.log.error("Starting osm_ng_ro") + # update general cherrypy configuration + update_dict = {} + + engine_config = cherrypy.tree.apps['/ro'].config + for k, v in environ.items(): + if not k.startswith("OSMRO_"): + continue + k1, _, k2 = k[6:].lower().partition("_") + if not k2: + continue + try: + if k1 in ("server", "test", "auth", "log"): + # update [global] configuration + update_dict[k1 + '.' + k2] = yaml.safe_load(v) + elif k1 == "static": + # update [/static] configuration + engine_config["/static"]["tools.staticdir." + k2] = yaml.safe_load(v) + elif k1 == "tools": + # update [/] configuration + engine_config["/"]["tools." + k2.replace('_', '.')] = yaml.safe_load(v) + elif k1 in ("message", "database", "storage", "authentication"): + # update [message], [database], ... configuration + if k2 in ("port", "db_port"): + engine_config[k1][k2] = int(v) + else: + engine_config[k1][k2] = v + + except Exception as e: + raise RoException("Cannot load env '{}': {}".format(k, e)) + + if update_dict: + cherrypy.config.update(update_dict) + engine_config["global"].update(update_dict) + + # logging cherrypy + log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s" + log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S') + logger_server = logging.getLogger("cherrypy.error") + logger_access = logging.getLogger("cherrypy.access") + logger_cherry = logging.getLogger("cherrypy") + logger_nbi = logging.getLogger("ro") + + if "log.file" in engine_config["global"]: + file_handler = logging.handlers.RotatingFileHandler(engine_config["global"]["log.file"], + maxBytes=100e6, backupCount=9, delay=0) + file_handler.setFormatter(log_formatter_simple) + logger_cherry.addHandler(file_handler) + logger_nbi.addHandler(file_handler) + # log always to standard output + for format_, logger in {"ro.server %(filename)s:%(lineno)s": logger_server, + "ro.access %(filename)s:%(lineno)s": logger_access, + "%(name)s %(filename)s:%(lineno)s": logger_nbi + }.items(): + log_format_cherry = "%(asctime)s %(levelname)s {} %(message)s".format(format_) + log_formatter_cherry = logging.Formatter(log_format_cherry, datefmt='%Y-%m-%dT%H:%M:%S') + str_handler = logging.StreamHandler() + str_handler.setFormatter(log_formatter_cherry) + logger.addHandler(str_handler) + + if engine_config["global"].get("log.level"): + logger_cherry.setLevel(engine_config["global"]["log.level"]) + logger_nbi.setLevel(engine_config["global"]["log.level"]) + + # logging other modules + for k1, logname in {"message": "ro.msg", "database": "ro.db", "storage": "ro.fs"}.items(): + engine_config[k1]["logger_name"] = logname + logger_module = logging.getLogger(logname) + if "logfile" in engine_config[k1]: + file_handler = logging.handlers.RotatingFileHandler(engine_config[k1]["logfile"], + maxBytes=100e6, backupCount=9, delay=0) + file_handler.setFormatter(log_formatter_simple) + logger_module.addHandler(file_handler) + if "loglevel" in engine_config[k1]: + logger_module.setLevel(engine_config[k1]["loglevel"]) + # TODO add more entries, e.g.: storage + + engine_config["assignment"] = {} + # ^ each VIM, SDNc will be assigned one worker id. Ns class will add items and VimThread will auto-assign + cherrypy.tree.apps['/ro'].root.ns.start(engine_config) + cherrypy.tree.apps['/ro'].root.authenticator.start(engine_config) + cherrypy.tree.apps['/ro'].root.ns.init_db(target_version=database_version) + + # # start subscriptions thread: + # vim_threads = [] + # for thread_id in range(engine_config["global"]["server.ns_threads"]): + # vim_thread = VimThread(thread_id, config=engine_config, engine=ro_server.ns) + # vim_thread.start() + # vim_threads.append(vim_thread) + # # Do not capture except SubscriptionException + + backend = engine_config["authentication"]["backend"] + cherrypy.log.error("Starting OSM NBI Version '{} {}' with '{}' authentication backend" + .format(ro_version, ro_version_date, backend)) + + +def _stop_service(): + """ + Callback function called when cherrypy.engine stops + TODO: Ending database connections. + """ + # global vim_threads + # if vim_threads: + # for vim_thread in vim_threads: + # vim_thread.terminate() + # vim_threads = None + cherrypy.tree.apps['/ro'].root.ns.stop() + cherrypy.log.error("Stopping osm_ng_ro") + + +def ro_main(config_file): + global ro_server + ro_server = Server() + cherrypy.engine.subscribe('start', _start_service) + cherrypy.engine.subscribe('stop', _stop_service) + cherrypy.quickstart(ro_server, '/ro', config_file) + + +def usage(): + print("""Usage: {} [options] + -c|--config [configuration_file]: loads the configuration file (default: ./ro.cfg) + -h|--help: shows this help + """.format(sys.argv[0])) + # --log-socket-host HOST: send logs to this host") + # --log-socket-port PORT: send logs using this port (default: 9022)") + + +if __name__ == '__main__': + try: + # load parameters and configuration + opts, args = getopt.getopt(sys.argv[1:], "hvc:", ["config=", "help"]) + # TODO add "log-socket-host=", "log-socket-port=", "log-file=" + config_file = None + for o, a in opts: + if o in ("-h", "--help"): + usage() + sys.exit() + elif o in ("-c", "--config"): + config_file = a + else: + assert False, "Unhandled option" + if config_file: + if not path.isfile(config_file): + print("configuration file '{}' that not exist".format(config_file), file=sys.stderr) + exit(1) + else: + for config_file in (path.dirname(__file__) + "/ro.cfg", "./ro.cfg", "/etc/osm/ro.cfg"): + if path.isfile(config_file): + break + else: + print("No configuration file 'ro.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr) + exit(1) + ro_main(config_file) + except getopt.GetoptError as e: + print(str(e), file=sys.stderr) + # usage() + exit(1) diff --git a/NG-RO/osm_ng_ro/validation.py b/NG-RO/osm_ng_ro/validation.py new file mode 100644 index 00000000..060a3ebb --- /dev/null +++ b/NG-RO/osm_ng_ro/validation.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from jsonschema import validate as js_v, exceptions as js_e +from http import HTTPStatus + +__author__ = "Alfonso Tierno " +__version__ = "0.1" +version_date = "Jun 2020" + +""" +Validator of input data using JSON schemas +""" + +# Basis schemas +name_schema = {"type": "string", "minLength": 1, "maxLength": 255, "pattern": "^[^,;()'\"]+$"} +string_schema = {"type": "string", "minLength": 1, "maxLength": 255} +ssh_key_schema = {"type": "string", "minLength": 1} +id_schema = {"type": "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"} +bool_schema = {"type": "boolean"} +null_schema = {"type": "null"} + +image_schema = { + "title": "image input validation", + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + # TODO +} + +flavor_schema = { + "title": "image input validation", + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + # TODO +} + +ns_schema = { + "title": "image input validation", + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + # TODO +} + +deploy_schema = { + "title": "deploy input validation", + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "action_id": string_schema, + "name": name_schema, + "action": {"enum" ["inject_ssh_key"]}, + "key": ssh_key_schema, + "user": name_schema, + "password": string_schema, + "vnf": { + "type": "object", + "properties": { + "_id": id_schema, + # TODO + }, + "required": ["_id"], + "additionalProperties": True, + }, + "image": { + "type": "array", + "minItems": 1, + "items": image_schema + }, + "flavor": { + "type": "array", + "minItems": 1, + "items": flavor_schema + }, + "ns": ns_schema, + }, + + "required": ["name"], + "additionalProperties": False +} + + +class ValidationError(Exception): + def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY): + self.http_code = http_code + Exception.__init__(self, message) + + +def validate_input(indata, schema_to_use): + """ + Validates input data against json schema + :param indata: user input data. Should be a dictionary + :param schema_to_use: jsonschema to test + :return: None if ok, raises ValidationError exception on error + """ + try: + if schema_to_use: + js_v(indata, schema_to_use) + return None + except js_e.ValidationError as e: + if e.path: + error_pos = "at '" + ":".join(map(str, e.path)) + "'" + else: + error_pos = "" + raise ValidationError("Format error {} '{}' ".format(error_pos, e.message)) + except js_e.SchemaError: + raise ValidationError("Bad json schema {}".format(schema_to_use), http_code=HTTPStatus.INTERNAL_SERVER_ERROR) diff --git a/NG-RO/requirements.txt b/NG-RO/requirements.txt new file mode 100644 index 00000000..1ed02019 --- /dev/null +++ b/NG-RO/requirements.txt @@ -0,0 +1,22 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +## + +PyYAML +CherryPy==18.1.2 +osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common +requests +cryptography +osm-im @ git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im +osm-ro-plugin @ git+https://osm.etsi.org/gerrit/osm/RO.git#egg=osm-ro-plugin&subdirectory=RO-plugin +logutils diff --git a/NG-RO/setup.py b/NG-RO/setup.py new file mode 100644 index 00000000..a0018361 --- /dev/null +++ b/NG-RO/setup.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright 2020 Telefonica S.A. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup, find_packages + +_name = "osm_ng_ro" +_readme = "osm-ng-ro is the New Generation Resource Orchestrator for OSM" +setup( + name=_name, + description='OSM Resource Orchestrator', + long_description=_readme, + version_command=('git describe --match v* --tags --long --dirty', 'pep440-git-full'), + author='ETSI OSM', + author_email='alfonso.tiernosepulveda@telefonica.com', + maintainer='Alfonso Tierno', + maintainer_email='alfonso.tiernosepulveda@telefonica.com', + url='https://osm.etsi.org/gitweb/?p=osm/RO.git;a=summary', + license='Apache 2.0', + + packages=find_packages(exclude=["temp", "local"]), + include_package_data=True, + install_requires=[ + 'CherryPy==18.1.2', + 'osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common', + 'jsonschema', + 'PyYAML', + 'requests', + 'cryptography', + 'osm-im @ git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im', + "osm-ro-plugin @ git+https://osm.etsi.org/gerrit/osm/RO.git#egg=osm-ro-plugin&subdirectory=RO-plugin", + ], + setup_requires=['setuptools-version-command'], +) diff --git a/NG-RO/stdeb.cfg b/NG-RO/stdeb.cfg new file mode 100644 index 00000000..f7ebd4a0 --- /dev/null +++ b/NG-RO/stdeb.cfg @@ -0,0 +1,16 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +[DEFAULT] +X-Python3-Version : >= 3.5 +Depends3 : python3-osm-common, python3-osm-im, python3-cherrypy3, python3-yaml, python3-jsonschema, + python3-pip, python3-requests, python3-osm-ro-plugin diff --git a/NG-RO/tox.ini b/NG-RO/tox.ini new file mode 100644 index 00000000..081bc1c8 --- /dev/null +++ b/NG-RO/tox.ini @@ -0,0 +1,33 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[tox] +envlist = flake8 +toxworkdir={toxinidir}/../.tox + +[testenv] +usedevelop = True +basepython = python3 +install_command = python3 -m pip install -r requirements.txt -U {opts} {packages} + +[testenv:flake8] +basepython = python3 +deps = flake8 +commands = flake8 osm_ng_ro --max-line-length 120 \ + --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,osm_im --ignore W291,W293,E226,E402,W504 + +[testenv:build] +basepython = python3 +deps = stdeb + setuptools-version-command +commands = python3 setup.py --command-packages=stdeb.command bdist_deb diff --git a/RO-VIM-openstack/osm_rovim_openstack/vimconn_openstack.py b/RO-VIM-openstack/osm_rovim_openstack/vimconn_openstack.py index 289c8278..7cf5e883 100644 --- a/RO-VIM-openstack/osm_rovim_openstack/vimconn_openstack.py +++ b/RO-VIM-openstack/osm_rovim_openstack/vimconn_openstack.py @@ -1222,19 +1222,19 @@ class vimconnector(vimconn.VimConnector): vim_id: filled/added by this function floating_ip: True/False (or it can be None) 'cloud_config': (optional) dictionary with: - 'key-pairs': (optional) list of strings with the public key to be inserted to the default user - 'users': (optional) list of users to be inserted, each item is a dict with: - 'name': (mandatory) user name, - 'key-pairs': (optional) list of strings with the public key to be inserted to the user - 'user-data': (optional) string is a text script to be passed directly to cloud-init - 'config-files': (optional). List of files to be transferred. Each item is a dict with: - 'dest': (mandatory) string with the destination absolute path - 'encoding': (optional, by default text). Can be one of: - 'b64', 'base64', 'gz', 'gz+b64', 'gz+base64', 'gzip+b64', 'gzip+base64' - 'content' (mandatory): string with the content of the file - 'permissions': (optional) string with file permissions, typically octal notation '0644' - 'owner': (optional) file owner, string with the format 'owner:group' - 'boot-data-drive': boolean to indicate if user-data must be passed using a boot drive (hard disk) + 'key-pairs': (optional) list of strings with the public key to be inserted to the default user + 'users': (optional) list of users to be inserted, each item is a dict with: + 'name': (mandatory) user name, + 'key-pairs': (optional) list of strings with the public key to be inserted to the user + 'user-data': (optional) string is a text script to be passed directly to cloud-init + 'config-files': (optional). List of files to be transferred. Each item is a dict with: + 'dest': (mandatory) string with the destination absolute path + 'encoding': (optional, by default text). Can be one of: + 'b64', 'base64', 'gz', 'gz+b64', 'gz+base64', 'gzip+b64', 'gzip+base64' + 'content' (mandatory): string with the content of the file + 'permissions': (optional) string with file permissions, typically octal notation '0644' + 'owner': (optional) file owner, string with the format 'owner:group' + 'boot-data-drive': boolean to indicate if user-data must be passed using a boot drive (hard disk) 'disk_list': (optional) list with additional disks to the VM. Each item is a dict with: 'image_id': (optional). VIM id of an existing image. If not provided an empty disk must be mounted 'size': (mandatory) string with the size of the disk in GB diff --git a/RO/osm_ro/scripts/RO-start.sh b/RO/osm_ro/scripts/RO-start.sh index 94183e92..abe3c5c5 100755 --- a/RO/osm_ro/scripts/RO-start.sh +++ b/RO/osm_ro/scripts/RO-start.sh @@ -22,6 +22,10 @@ [ -z "$RO_DB_OVIM_HOST" ] && export RO_DB_OVIM_HOST="$RO_DB_HOST" [ -z "$RO_DB_OVIM_ROOT_PASSWORD" ] && export RO_DB_OVIM_ROOT_PASSWORD="$RO_DB_ROOT_PASSWORD" +# IF OSMRO_SERVER_NG use new server that not need any database init +[ -n "$OSMRO_SERVER_NG" ] && python3 -m osm_ng_ro.ro_main + + function is_db_created() { db_host=$1 db_port=$2