--- /dev/null
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+##
+
+recursive-include osm_ng_ro *.py *.sh *.cfg *.yml
+recursive-include osm_ng_ro/html_public *
--- /dev/null
+# Copyright 2020 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+all: clean package
+
+clean:
+ rm -rf dist deb_dist osm_ng_ro-*.tar.gz osm_ng_ro.egg-info .eggs .temp-tox
+
+package:
+ python3 setup.py --command-packages=stdeb.command sdist_dsc
+ cp debian/python3-osm-ng-ro.postinst deb_dist/osm-ng-ro*/debian
+ cd deb_dist/osm-ng-ro*/ && dpkg-buildpackage -rfakeroot -uc -us
--- /dev/null
+#!/bin/bash
+
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: OSM_TECH@list.etsi.org
+##
+
+echo "POST INSTALL OSM-RO-NG"
+echo "Installing python dependencies via pip..."
+
+# python3 -m pip install -U pip
+python3 -m pip install cherrypy==18.1.2
+
--- /dev/null
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+##
+
+version = '8.0.1.post0'
+version_date = '2020-06-29'
+
+# Obtain installed package version. Ignore if error, e.g. pkg_resources not installed
+try:
+ from pkg_resources import get_distribution
+ version = get_distribution("osm_ng_ro").version
+except Exception:
+ pass
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Contains html text in variables to make and html response
+"""
+
+import yaml
+from http import HTTPStatus
+from html import escape as html_escape
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+html_start = """
+ <!DOCTYPE html>
+<html>
+<head>
+ <link href="/ro/static/style.css" rel="stylesheet">
+<title>Welcome to OSM</title>
+</head>
+<body>
+ <div id="osm_topmenu">
+ <div>
+ <a href="https://osm.etsi.org"> <img src="/ro/static/OSM-logo.png" height="42" width="100"
+ style="vertical-align:middle"> </a>
+ <a>( {} )</a>
+ <a href="/ro/ns/v1/deploy">NSs </a>
+ <a href="/ro/admin/v1/k8srepos">K8s_repos </a>
+ <a href="/ro/admin/v1/tokens?METHOD=DELETE">logout </a>
+ </div>
+ </div>
+"""
+
+html_body = """
+<h1>{item}</h1>
+"""
+
+html_end = """
+</body>
+</html>
+"""
+
+html_body_error = "<h2> Error <pre>{}</pre> </h2>"
+
+
+html_auth2 = """
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
+<html>
+<head><META http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <link href="/ro/static/style.css" rel="stylesheet">
+ <title>OSM Login</title>
+</head>
+<body>
+ <div id="osm_header">
+ <div>
+ <a href="https://osm.etsi.org"> <h1><img src="/ro/static/OSM-logo.png" style="vertical-align:middle"></h1> </a>
+ </div>
+ </div>
+ <div id="osm_error_message">
+ <h1>{error}</h1>
+ </div>
+ <div class="gerritBody" id="osm_body">
+ <h1>Sign in to OSM</h1>
+ <form action="/ro/admin/v1/tokens" id="login_form" method="POST">
+ <table style="border: 0;">
+ <tr><th>Username</th><td><input id="f_user" name="username" size="25" tabindex="1" type="text"></td></tr>
+ <tr><th>Password</th><td><input id="f_pass" name="password" size="25" tabindex="2" type="password"></td></tr>
+ <tr><td><input tabindex="3" type="submit" value="Sign In"></td></tr>
+ </table>
+ </form>
+ <div style="clear: both; margin-top: 15px; padding-top: 2px; margin-bottom: 15px;">
+ <div id="osm_footer">
+ <div></div>
+ </div>
+ </div>
+ </div>
+ <script src="/ro/static/login.js"> </script>
+</body>
+</html>
+"""
+
+
+html_nslcmop_body = """
+<a href="/ro/nslcm/v1/ns_lcm_op_occs?nsInstanceId={id}">nslcm operations </a>
+<a href="/ro/nslcm/v1/vnf_instances?nsr-id-ref={id}">VNFRS </a>
+<form action="/ro/nslcm/v1/ns_instances/{id}/terminate" method="post" enctype="multipart/form-data">
+ <h3> <table style="border: 0;"> <tr>
+ <td> <input type="submit" value="Terminate"/> </td>
+ </tr> </table> </h3>
+</form>
+"""
+
+html_nsilcmop_body = """
+<a href="/ro/nsilcm/v1/nsi_lcm_op_occs?netsliceInstanceId={id}">nsilcm operations </a>
+<form action="/ro/nsilcm/v1/netslice_instances/{id}/terminate" method="post" enctype="multipart/form-data">
+ <h3> <table style="border: 0;"> <tr>
+ <td> <input type="submit" value="Terminate"/> </td>
+ </tr> </table> </h3>
+</form>
+"""
+
+
+def format(data, request, response, toke_info):
+ """
+ Format a nice html response, depending on the data
+ :param data:
+ :param request: cherrypy request
+ :param response: cherrypy response
+ :return: string with teh html response
+ """
+ response.headers["Content-Type"] = 'text/html'
+ if response.status == HTTPStatus.UNAUTHORIZED.value:
+ if response.headers.get("WWW-Authenticate") and request.config.get("auth.allow_basic_authentication"):
+ response.headers["WWW-Authenticate"] = "Basic" + response.headers["WWW-Authenticate"][6:]
+ return
+ else:
+ return html_auth2.format(error=data)
+ if request.path_info in ("/version", "/system"):
+ return "<pre>" + yaml.safe_dump(data, explicit_start=False, indent=4, default_flow_style=False) + "</pre>"
+ body = html_body.format(item=request.path_info)
+ if response.status and response.status > 202:
+ body += html_body_error.format(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False))
+ elif isinstance(data, (list, tuple)):
+ # if request.path_info == "/ns/v1/deploy":
+ # body += html_upload_body.format(request.path_info + "_content", "VNFD")
+ # elif request.path_info == "/nsd/v1/ns_descriptors":
+ # body += html_upload_body.format(request.path_info + "_content", "NSD")
+ # elif request.path_info == "/nst/v1/nst_templates":
+ # body += html_upload_body.format(request.path_info + "_content", "NSTD")
+ for k in data:
+ if isinstance(k, dict):
+ data_id = k.pop("_id", None)
+ elif isinstance(k, str):
+ data_id = k
+ if request.path_info == "/ns/v1/deploy":
+ body += '<p> <a href="/ro/{url}/{id}?METHOD=DELETE"> <img src="/ro/static/delete.png" height="25"' \
+ ' width="25"> </a><a href="/ro/{url}/{id}">{id}</a>: {t} </p>' \
+ .format(url=request.path_info, id=data_id, t=html_escape(str(k)))
+ else:
+ body += '<p> <a href="/ro/{url}/{id}">{id}</a>: {t} </p>'.format(url=request.path_info, id=data_id,
+ t=html_escape(str(k)))
+ elif isinstance(data, dict):
+ if "Location" in response.headers:
+ body += '<a href="{}"> show </a>'.format(response.headers["Location"])
+ else:
+ body += '<a href="/ro/{}?METHOD=DELETE"> <img src="/ro/static/delete.png" height="25" width="25"> </a>'\
+ .format(request.path_info[:request.path_info.rfind("/")])
+ if request.path_info.startswith("/nslcm/v1/ns_instances_content/") or \
+ request.path_info.startswith("/nslcm/v1/ns_instances/"):
+ _id = request.path_info[request.path_info.rfind("/")+1:]
+ body += html_nslcmop_body.format(id=_id)
+ elif request.path_info.startswith("/nsilcm/v1/netslice_instances_content/") or \
+ request.path_info.startswith("/nsilcm/v1/netslice_instances/"):
+ _id = request.path_info[request.path_info.rfind("/")+1:]
+ body += html_nsilcmop_body.format(id=_id)
+ body += "<pre>" + html_escape(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)) + \
+ "</pre>"
+ elif data is None:
+ if request.method == "DELETE" or "METHOD=DELETE" in request.query_string:
+ body += "<pre> deleted </pre>"
+ else:
+ body = html_escape(str(data))
+ user_text = " "
+ if toke_info:
+ if toke_info.get("username"):
+ user_text += "user: {}".format(toke_info.get("username"))
+ if toke_info.get("project_id"):
+ user_text += ", project: {}".format(toke_info.get("project_name"))
+ return html_start.format(user_text) + body + html_end
+ # yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)
+ # tags=False,
+ # encoding='utf-8', allow_unicode=True)
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ var login_form = document.getElementById('login_form');
+ var f_user = document.getElementById('f_user');
+ var f_pass = document.getElementById('f_pass');
+ f_user.onkeydown = function(e) {
+ if (e.keyCode == 13) {
+ f_pass.focus();
+ return false;
+ }
+ }
+ f_pass.onkeydown = function(e) {
+ if (e.keyCode == 13) {
+ login_form.submit();
+ return false;
+ }
+ }
+ f_user.focus();
+
--- /dev/null
+/*
+Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+Licensed under the Apache License, Version 2.0 (the "License"); you may
+not use this file except in compliance with the License. You may obtain
+a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+#osm_header{
+ display: block;
+ position: relative;
+ top: 0px;
+ left: 160px;
+ margin-bottom: -60px;
+ width: 140px;
+ padding-left: 17px;
+ }
+#osm_topmenu {
+ background: none;
+ position: relative;
+ top: 0px;
+ left: 10px;
+ margin-right: 10px;
+}
+#osm_error_message {
+ padding: 5px;
+ margin: 2em;
+ width: 200em;
+ color: red;
+ font-weight: bold;
+}
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import logging
+# import yaml
+from traceback import format_exc as traceback_format_exc
+from osm_ng_ro.ns_thread import NsWorker
+from osm_ng_ro.validation import validate_input, deploy_schema
+from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
+from osm_common.dbbase import DbException
+from osm_common.fsbase import FsException
+from osm_common.msgbase import MsgException
+from http import HTTPStatus
+from uuid import uuid4
+from threading import Lock
+from random import choice as random_choice
+from time import time
+from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
+from cryptography.hazmat.primitives import serialization as crypto_serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.backends import default_backend as crypto_default_backend
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+min_common_version = "0.1.16"
+
+
+class NsException(Exception):
+
+ def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
+ self.http_code = http_code
+ super(Exception, self).__init__(message)
+
+
+def get_process_id():
+ """
+ Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
+ will provide a random one
+ :return: Obtained ID
+ """
+ # Try getting docker id. If fails, get pid
+ try:
+ with open("/proc/self/cgroup", "r") as f:
+ text_id_ = f.readline()
+ _, _, text_id = text_id_.rpartition("/")
+ text_id = text_id.replace("\n", "")[:12]
+ if text_id:
+ return text_id
+ except Exception:
+ pass
+ # Return a random id
+ return "".join(random_choice("0123456789abcdef") for _ in range(12))
+
+
+def versiontuple(v):
+ """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
+ filled = []
+ for point in v.split("."):
+ filled.append(point.zfill(8))
+ return tuple(filled)
+
+
+class Ns(object):
+
+ def __init__(self):
+ self.db = None
+ self.fs = None
+ self.msg = None
+ self.config = None
+ # self.operations = None
+ self.logger = logging.getLogger("ro.ns")
+ self.map_topic = {}
+ self.write_lock = None
+ self.assignment = {}
+ self.next_worker = 0
+ self.plugins = {}
+ self.workers = []
+
+ def init_db(self, target_version):
+ pass
+
+ def start(self, config):
+ """
+ Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :param config: Configuration of db, storage, etc
+ :return: None
+ """
+ self.config = config
+ self.config["process_id"] = get_process_id() # used for HA identity
+ # check right version of common
+ if versiontuple(common_version) < versiontuple(min_common_version):
+ raise NsException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
+ common_version, min_common_version))
+
+ try:
+ if not self.db:
+ if config["database"]["driver"] == "mongo":
+ self.db = dbmongo.DbMongo()
+ self.db.db_connect(config["database"])
+ elif config["database"]["driver"] == "memory":
+ self.db = dbmemory.DbMemory()
+ self.db.db_connect(config["database"])
+ else:
+ raise NsException("Invalid configuration param '{}' at '[database]':'driver'".format(
+ config["database"]["driver"]))
+ if not self.fs:
+ if config["storage"]["driver"] == "local":
+ self.fs = fslocal.FsLocal()
+ self.fs.fs_connect(config["storage"])
+ elif config["storage"]["driver"] == "mongo":
+ self.fs = fsmongo.FsMongo()
+ self.fs.fs_connect(config["storage"])
+ else:
+ raise NsException("Invalid configuration param '{}' at '[storage]':'driver'".format(
+ config["storage"]["driver"]))
+ if not self.msg:
+ if config["message"]["driver"] == "local":
+ self.msg = msglocal.MsgLocal()
+ self.msg.connect(config["message"])
+ elif config["message"]["driver"] == "kafka":
+ self.msg = msgkafka.MsgKafka()
+ self.msg.connect(config["message"])
+ else:
+ raise NsException("Invalid configuration param '{}' at '[message]':'driver'".format(
+ config["message"]["driver"]))
+
+ # TODO load workers to deal with exising database tasks
+
+ self.write_lock = Lock()
+ except (DbException, FsException, MsgException) as e:
+ raise NsException(str(e), http_code=e.http_code)
+
+ def stop(self):
+ try:
+ if self.db:
+ self.db.db_disconnect()
+ if self.fs:
+ self.fs.fs_disconnect()
+ if self.msg:
+ self.msg.disconnect()
+ self.write_lock = None
+ except (DbException, FsException, MsgException) as e:
+ raise NsException(str(e), http_code=e.http_code)
+ for worker in self.workers:
+ worker.insert_task(("terminate",))
+
+ def _create_worker(self, vim_account_id):
+ # TODO make use of the limit self.config["global"]["server.ns_threads"]
+ worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
+ if worker_id is None:
+ worker_id = len(self.workers)
+ self.workers.append(NsWorker(worker_id, self.config, self.plugins, self.db))
+ self.workers[worker_id].start()
+ self.workers[worker_id].insert_task(("load_vim", vim_account_id))
+ return worker_id
+
+ def _assign_vim(self, vim_account_id):
+ if vim_account_id not in self.assignment:
+ self.assignment[vim_account_id] = self._create_worker(vim_account_id)
+
+ def _get_cloud_init(self, where):
+ """
+
+ :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
+ :return:
+ """
+ vnfd_id, _, other = where.partition(":")
+ _type, _, name = other.partition(":")
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ if _type == "file":
+ base_folder = vnfd["_admin"]["storage"]
+ cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], name)
+ with self.fs.file_open(cloud_init_file, "r") as ci_file:
+ cloud_init_content = ci_file.read()
+ elif _type == "vdu":
+ cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"]
+ else:
+ raise NsException("Mismatch descriptor for cloud init: {}".format(where))
+ return cloud_init_content
+
+ def _parse_jinja2(self, cloud_init_content, params, context):
+ try:
+ env = Environment()
+ ast = env.parse(cloud_init_content)
+ mandatory_vars = meta.find_undeclared_variables(ast)
+ if mandatory_vars:
+ for var in mandatory_vars:
+ if not params or var not in params:
+ raise NsException(
+ "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
+ "inside the 'additionalParamsForVnf' block".format(var, context))
+ template = Template(cloud_init_content)
+ return template.render(params or {})
+
+ except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
+ raise NsException("Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(context, e))
+
+ def _create_db_ro_nsrs(self, nsr_id, now):
+ try:
+ key = rsa.generate_private_key(
+ backend=crypto_default_backend(),
+ public_exponent=65537,
+ key_size=2048
+ )
+ private_key = key.private_bytes(
+ crypto_serialization.Encoding.PEM,
+ crypto_serialization.PrivateFormat.PKCS8,
+ crypto_serialization.NoEncryption())
+ public_key = key.public_key().public_bytes(
+ crypto_serialization.Encoding.OpenSSH,
+ crypto_serialization.PublicFormat.OpenSSH
+ )
+ private_key = private_key.decode('utf8')
+ public_key = public_key.decode('utf8')
+ except Exception as e:
+ raise NsException("Cannot create ssh-keys: {}".format(e))
+
+ schema_version = "1.1"
+ private_key_encrypted = self.db.encrypt(private_key, schema_version=schema_version, salt=nsr_id)
+ db_content = {
+ "_id": nsr_id,
+ "_admin": {
+ "created": now,
+ "modified": now,
+ "schema_version": schema_version
+ },
+ "public_key": public_key,
+ "private_key": private_key_encrypted,
+ "actions": [],
+ }
+ self.db.create("ro_nsrs", db_content)
+ return db_content
+
+ def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
+ print("ns.deploy session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
+ validate_input(indata, deploy_schema)
+ action_id = indata.get("action_id", str(uuid4()))
+ task_index = 0
+ # get current deployment
+ db_nsr = None
+ # db_nslcmop = None
+ db_nsr_update = {} # update operation on nsrs
+ db_vnfrs_update = {}
+ # db_nslcmop_update = {} # update operation on nslcmops
+ db_vnfrs = {} # vnf's info indexed by _id
+ vdu2cloud_init = {}
+ step = ''
+ logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
+ self.logger.debug(logging_text + "Enter")
+ try:
+ step = "Getting ns and vnfr record from db"
+ # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_ro_tasks = []
+ db_new_tasks = []
+ # read from db: vnf's of this ns
+ step = "Getting vnfrs from db"
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ if not db_vnfrs_list:
+ raise NsException("Cannot obtain associated VNF for ns")
+ for vnfr in db_vnfrs_list:
+ db_vnfrs[vnfr["_id"]] = vnfr
+ db_vnfrs_update[vnfr["_id"]] = {}
+ now = time()
+ db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False)
+ if not db_ro_nsr:
+ db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
+ ro_nsr_public_key = db_ro_nsr["public_key"]
+
+ # check that action_id is not in the list of actions. Suffixed with :index
+ if action_id in db_ro_nsr["actions"]:
+ index = 1
+ while True:
+ new_action_id = "{}:{}".format(action_id, index)
+ if new_action_id not in db_ro_nsr["actions"]:
+ action_id = new_action_id
+ self.logger.debug(logging_text + "Changing action_id in use to {}".format(action_id))
+ break
+ index += 1
+
+ def _create_task(item, action, target_record, target_record_id, extra_dict=None):
+ nonlocal task_index
+ nonlocal action_id
+ nonlocal nsr_id
+
+ task = {
+ "action_id": action_id,
+ "nsr_id": nsr_id,
+ "task_id": "{}:{}".format(action_id, task_index),
+ "status": "SCHEDULED",
+ "action": action,
+ "item": item,
+ "target_record": target_record,
+ "target_record_id": target_record_id,
+ }
+ if extra_dict:
+ task.update(extra_dict) # params, find_params, depends_on
+ task_index += 1
+ return task
+
+ def _create_ro_task(vim_account_id, item, action, target_record, target_record_id, extra_dict=None):
+ nonlocal action_id
+ nonlocal task_index
+ nonlocal now
+
+ _id = action_id + ":" + str(task_index)
+ db_ro_task = {
+ "_id": _id,
+ "locked_by": None,
+ "locked_at": 0.0,
+ "target_id": "vim:" + vim_account_id,
+ "vim_info": {
+ "created": False,
+ "created_items": None,
+ "vim_id": None,
+ "vim_name": None,
+ "vim_status": None,
+ "vim_details": None,
+ "refresh_at": None,
+ },
+ "modified_at": now,
+ "created_at": now,
+ "to_check_at": now,
+ "tasks": [_create_task(item, action, target_record, target_record_id, extra_dict)],
+ }
+ return db_ro_task
+
+ def _process_image_params(target_image, vim_info):
+ find_params = {}
+ if target_image.get("image"):
+ find_params["filter_dict"] = {"name": target_image.get("image")}
+ if target_image.get("vim_image_id"):
+ find_params["filter_dict"] = {"id": target_image.get("vim_image_id")}
+ if target_image.get("image_checksum"):
+ find_params["filter_dict"] = {"checksum": target_image.get("image_checksum")}
+ return {"find_params": find_params}
+
+ def _process_flavor_params(target_flavor, vim_info):
+
+ def _get_resource_allocation_params(quota_descriptor):
+ """
+ read the quota_descriptor from vnfd and fetch the resource allocation properties from the
+ descriptor object
+ :param quota_descriptor: cpu/mem/vif/disk-io quota descriptor
+ :return: quota params for limit, reserve, shares from the descriptor object
+ """
+ quota = {}
+ if quota_descriptor.get("limit"):
+ quota["limit"] = int(quota_descriptor["limit"])
+ if quota_descriptor.get("reserve"):
+ quota["reserve"] = int(quota_descriptor["reserve"])
+ if quota_descriptor.get("shares"):
+ quota["shares"] = int(quota_descriptor["shares"])
+ return quota
+
+ flavor_data = {
+ "disk": int(target_flavor["storage-gb"]),
+ # "ram": max(int(target_flavor["memory-mb"]) // 1024, 1),
+ # ^ TODO manage at vim_connectors MB instead of GB
+ "ram": int(target_flavor["memory-mb"]),
+ "vcpus": target_flavor["vcpu-count"],
+ }
+ if target_flavor.get("guest-epa"):
+ extended = {}
+ numa = {}
+ epa_vcpu_set = False
+ if target_flavor["guest-epa"].get("numa-node-policy"):
+ numa_node_policy = target_flavor["guest-epa"].get("numa-node-policy")
+ if numa_node_policy.get("node"):
+ numa_node = numa_node_policy["node"][0]
+ if numa_node.get("num-cores"):
+ numa["cores"] = numa_node["num-cores"]
+ epa_vcpu_set = True
+ if numa_node.get("paired-threads"):
+ if numa_node["paired-threads"].get("num-paired-threads"):
+ numa["paired-threads"] = int(numa_node["paired-threads"]["num-paired-threads"])
+ epa_vcpu_set = True
+ if len(numa_node["paired-threads"].get("paired-thread-ids")):
+ numa["paired-threads-id"] = []
+ for pair in numa_node["paired-threads"]["paired-thread-ids"]:
+ numa["paired-threads-id"].append(
+ (str(pair["thread-a"]), str(pair["thread-b"]))
+ )
+ if numa_node.get("num-threads"):
+ numa["threads"] = int(numa_node["num-threads"])
+ epa_vcpu_set = True
+ if numa_node.get("memory-mb"):
+ numa["memory"] = max(int(numa_node["memory-mb"] / 1024), 1)
+ if target_flavor["guest-epa"].get("mempage-size"):
+ extended["mempage-size"] = target_flavor["guest-epa"].get("mempage-size")
+ if target_flavor["guest-epa"].get("cpu-pinning-policy") and not epa_vcpu_set:
+ if target_flavor["guest-epa"]["cpu-pinning-policy"] == "DEDICATED":
+ if target_flavor["guest-epa"].get("cpu-thread-pinning-policy") and \
+ target_flavor["guest-epa"]["cpu-thread-pinning-policy"] != "PREFER":
+ numa["cores"] = max(flavor_data["vcpus"], 1)
+ else:
+ numa["threads"] = max(flavor_data["vcpus"], 1)
+ epa_vcpu_set = True
+ if target_flavor["guest-epa"].get("cpu-quota") and not epa_vcpu_set:
+ cpuquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("cpu-quota"))
+ if cpuquota:
+ extended["cpu-quota"] = cpuquota
+ if target_flavor["guest-epa"].get("mem-quota"):
+ vduquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("mem-quota"))
+ if vduquota:
+ extended["mem-quota"] = vduquota
+ if target_flavor["guest-epa"].get("disk-io-quota"):
+ diskioquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("disk-io-quota"))
+ if diskioquota:
+ extended["disk-io-quota"] = diskioquota
+ if target_flavor["guest-epa"].get("vif-quota"):
+ vifquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("vif-quota"))
+ if vifquota:
+ extended["vif-quota"] = vifquota
+ if numa:
+ extended["numas"] = [numa]
+ if extended:
+ flavor_data["extended"] = extended
+
+ extra_dict = {"find_params": {"flavor_data": flavor_data}}
+ flavor_data_name = flavor_data.copy()
+ flavor_data_name["name"] = target_flavor["name"]
+ extra_dict["params"] = {"flavor_data": flavor_data_name}
+ return extra_dict
+
+ def _process_net_params(target_vld, vim_info):
+ nonlocal indata
+ extra_dict = {}
+ if vim_info.get("vim_network_name"):
+ extra_dict["find_params"] = {"filter_dict": {"name": vim_info.get("vim_network_name")}}
+ elif vim_info.get("vim_network_id"):
+ extra_dict["find_params"] = {"filter_dict": {"id": vim_info.get("vim_network_id")}}
+ elif target_vld.get("mgmt-network"):
+ extra_dict["find_params"] = {"mgmt": True, "name": target_vld["id"]}
+ else:
+ # create
+ extra_dict["params"] = {
+ "net_name": "{}-{}".format(indata["name"][:16], target_vld.get("name", target_vld["id"])[:16]),
+ "ip_profile": vim_info.get('ip_profile'),
+ "provider_network_profile": vim_info.get('provider_network'),
+ }
+ if not target_vld.get("underlay"):
+ extra_dict["params"]["net_type"] = "bridge"
+ else:
+ extra_dict["params"]["net_type"] = "ptp" if target_vld.get("type") == "ELINE" else "data"
+ return extra_dict
+
+ def _process_vdu_params(target_vdu, vim_info):
+ nonlocal vnfr_id
+ nonlocal nsr_id
+ nonlocal indata
+ nonlocal vnfr
+ nonlocal vdu2cloud_init
+ vnf_preffix = "vnfrs:{}".format(vnfr_id)
+ ns_preffix = "nsrs:{}".format(nsr_id)
+ image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
+ flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
+ extra_dict = {"depends_on": [image_text, flavor_text]}
+ net_list = []
+ for iface_index, interface in enumerate(target_vdu["interfaces"]):
+ if interface.get("ns-vld-id"):
+ net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
+ else:
+ net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
+ extra_dict["depends_on"].append(net_text)
+ net_item = {
+ "name": interface["name"],
+ "net_id": "TASK-" + net_text,
+ "vpci": interface.get("vpci"),
+ "type": "virtual",
+ # TODO mac_address: used for SR-IOV ifaces #TODO for other types
+ # TODO floating_ip: True/False (or it can be None)
+ }
+ if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+ net_item["use"] = "data"
+ net_item["model"] = interface["type"]
+ net_item["type"] = interface["type"]
+ elif interface.get("type") == "OM-MGMT" or interface.get("mgmt-interface") or \
+ interface.get("mgmt-vnf"):
+ net_item["use"] = "mgmt"
+ else: # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
+ net_item["use"] = "bridge"
+ net_item["model"] = interface.get("type")
+ net_list.append(net_item)
+ if interface.get("mgmt-vnf"):
+ extra_dict["mgmt_vnf_interface"] = iface_index
+ elif interface.get("mgmt-interface"):
+ extra_dict["mgmt_vdu_interface"] = iface_index
+
+ # cloud config
+ cloud_config = {}
+ if target_vdu.get("cloud-init"):
+ if target_vdu["cloud-init"] not in vdu2cloud_init:
+ vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init(target_vdu["cloud-init"])
+ cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
+ cloud_config["user-data"] = self._parse_jinja2(cloud_content_, target_vdu.get("additionalParams"),
+ target_vdu["cloud-init"])
+ if target_vdu.get("boot-data-drive"):
+ cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
+ ssh_keys = []
+ if target_vdu.get("ssh-keys"):
+ ssh_keys += target_vdu.get("ssh-keys")
+ if target_vdu.get("ssh-access-required"):
+ ssh_keys.append(ro_nsr_public_key)
+ if ssh_keys:
+ cloud_config["key-pairs"] = ssh_keys
+
+ extra_dict["params"] = {
+ "name": "{}-{}-{}-{}".format(indata["name"][:16], vnfr["member-vnf-index-ref"][:16],
+ target_vdu["vdu-name"][:32], target_vdu.get("count-index") or 0),
+ "description": target_vdu["vdu-name"],
+ "start": True,
+ "image_id": "TASK-" + image_text,
+ "flavor_id": "TASK-" + flavor_text,
+ "net_list": net_list,
+ "cloud_config": cloud_config or None,
+ "disk_list": None, # TODO
+ "availability_zone_index": None, # TODO
+ "availability_zone_list": None, # TODO
+ }
+ return extra_dict
+
+ def _process_items(target_list, existing_list, db_record, db_update, db_path, item, process_params):
+ nonlocal db_ro_tasks
+ nonlocal db_new_tasks
+ nonlocal task_index
+
+ # ensure all the target_list elements has an "id". If not assign the index
+ for target_index, tl in enumerate(target_list):
+ if tl and not tl.get("id"):
+ tl["id"] = str(target_index)
+
+ # step 1 networks to be deleted/updated
+ for vld_index, existing_vld in enumerate(existing_list):
+ target_vld = next((vld for vld in target_list if vld["id"] == existing_vld["id"]), None)
+ for existing_vim_index, existing_vim_info in enumerate(existing_vld.get("vim_info", ())):
+ if not existing_vim_info:
+ continue
+ if target_vld:
+ target_viminfo = next((target_viminfo for target_viminfo in target_vld.get("vim_info", ())
+ if existing_vim_info["vim_account_id"] == target_viminfo[
+ "vim_account_id"]), None)
+ else:
+ target_viminfo = None
+ if not target_viminfo:
+ # must be deleted
+ self._assign_vim(existing_vim_info["vim_account_id"])
+ db_new_tasks.append(_create_task(
+ item, "DELETE",
+ target_record="{}.{}.vim_info.{}".format(db_record, vld_index, existing_vim_index),
+ target_record_id="{}.{}".format(db_record, existing_vld["id"])))
+ # TODO delete
+ # TODO check one by one the vims to be created/deleted
+
+ # step 2 networks to be created
+ for target_vld in target_list:
+ vld_index = -1
+ for vld_index, existing_vld in enumerate(existing_list):
+ if existing_vld["id"] == target_vld["id"]:
+ break
+ else:
+ vld_index += 1
+ db_update[db_path + ".{}".format(vld_index)] = target_vld
+ existing_list.append(target_vld)
+ existing_vld = None
+
+ for vim_index, vim_info in enumerate(target_vld["vim_info"]):
+ existing_viminfo = None
+ if existing_vld:
+ existing_viminfo = next(
+ (existing_viminfo for existing_viminfo in existing_vld.get("vim_info", ())
+ if vim_info["vim_account_id"] == existing_viminfo["vim_account_id"]), None)
+ # TODO check if different. Delete and create???
+ # TODO delete if not exist
+ if existing_viminfo:
+ continue
+
+ extra_dict = process_params(target_vld, vim_info)
+
+ self._assign_vim(vim_info["vim_account_id"])
+ db_ro_tasks.append(_create_ro_task(
+ vim_info["vim_account_id"], item, "CREATE",
+ target_record="{}.{}.vim_info.{}".format(db_record, vld_index, vim_index),
+ target_record_id="{}.{}".format(db_record, target_vld["id"]),
+ extra_dict=extra_dict))
+
+ db_update[db_path + ".{}".format(vld_index)] = target_vld
+
+ def _process_action(indata):
+ nonlocal db_ro_tasks
+ nonlocal db_new_tasks
+ nonlocal task_index
+ nonlocal db_vnfrs
+ nonlocal db_ro_nsr
+
+ if indata["action"] == "inject_ssh_key":
+ key = indata.get("key")
+ user = indata.get("user")
+ password = indata.get("password")
+ for vnf in indata.get("vnf", ()):
+ if vnf.get("_id") not in db_vnfrs:
+ raise NsException("Invalid vnf={}".format(vnf["_id"]))
+ db_vnfr = db_vnfrs[vnf["_id"]]
+ for target_vdu in vnf.get("vdur", ()):
+ vdu_index, vdur = next((i_v for i_v in enumerate(db_vnfr["vdur"]) if
+ i_v[1]["id"] == target_vdu["id"]), (None, None))
+ if not vdur:
+ raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
+ vim_info = vdur["vim_info"][0]
+ self._assign_vim(vim_info["vim_account_id"])
+ target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
+ extra_dict = {
+ "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],
+ "params": {
+ "ip_address": vdur.gt("ip_address"),
+ "user": user,
+ "key": key,
+ "password": password,
+ "private_key": db_ro_nsr["private_key"],
+ "salt": db_ro_nsr["_id"],
+ "schema_version": db_ro_nsr["_admin"]["schema_version"]
+ }
+ }
+ db_ro_tasks.append(_create_ro_task(vim_info["vim_account_id"], "vdu", "EXEC",
+ target_record=target_record,
+ target_record_id=None,
+ extra_dict=extra_dict))
+
+ with self.write_lock:
+ if indata.get("action"):
+ _process_action(indata)
+ else:
+ # compute network differences
+ # NS.vld
+ step = "process NS VLDs"
+ _process_items(target_list=indata["ns"]["vld"] or [], existing_list=db_nsr.get("vld") or [],
+ db_record="nsrs:{}:vld".format(nsr_id), db_update=db_nsr_update,
+ db_path="vld", item="net", process_params=_process_net_params)
+
+ step = "process NS images"
+ _process_items(target_list=indata["image"] or [], existing_list=db_nsr.get("image") or [],
+ db_record="nsrs:{}:image".format(nsr_id),
+ db_update=db_nsr_update, db_path="image", item="image",
+ process_params=_process_image_params)
+
+ step = "process NS flavors"
+ _process_items(target_list=indata["flavor"] or [], existing_list=db_nsr.get("flavor") or [],
+ db_record="nsrs:{}:flavor".format(nsr_id),
+ db_update=db_nsr_update, db_path="flavor", item="flavor",
+ process_params=_process_flavor_params)
+
+ # VNF.vld
+ for vnfr_id, vnfr in db_vnfrs.items():
+ # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
+ step = "process VNF={} VLDs".format(vnfr_id)
+ target_vnf = next((vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id), None)
+ target_list = target_vnf.get("vld") if target_vnf else None
+ _process_items(target_list=target_list or [], existing_list=vnfr.get("vld") or [],
+ db_record="vnfrs:{}:vld".format(vnfr_id), db_update=db_vnfrs_update[vnfr["_id"]],
+ db_path="vld", item="net", process_params=_process_net_params)
+
+ target_list = target_vnf.get("vdur") if target_vnf else None
+ step = "process VNF={} VDUs".format(vnfr_id)
+ _process_items(target_list=target_list or [], existing_list=vnfr.get("vdur") or [],
+ db_record="vnfrs:{}:vdur".format(vnfr_id),
+ db_update=db_vnfrs_update[vnfr["_id"]], db_path="vdur", item="vdu",
+ process_params=_process_vdu_params)
+
+ step = "Updating database, Creating ro_tasks"
+ if db_ro_tasks:
+ self.db.create_list("ro_tasks", db_ro_tasks)
+ step = "Updating database, Appending tasks to ro_tasks"
+ for task in db_new_tasks:
+ if not self.db.set_one("ro_tasks", q_filter={"tasks.target_record": task["target_record"]},
+ update_dict={"to_check_at": now, "modified_at": now},
+ push={"tasks": task}, fail_on_empty=False):
+ self.logger.error(logging_text + "Cannot find task for target_record={}".
+ format(task["target_record"]))
+ # TODO something else appart from logging?
+ step = "Updating database, nsrs"
+ if db_nsr_update:
+ self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update)
+ for vnfr_id, db_vnfr_update in db_vnfrs_update.items():
+ if db_vnfr_update:
+ step = "Updating database, vnfrs={}".format(vnfr_id)
+ self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
+
+ self.logger.debug(logging_text + "Exit")
+ return {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, action_id, True
+
+ except Exception as e:
+ if isinstance(e, (DbException, NsException)):
+ self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
+ else:
+ e = traceback_format_exc()
+ self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(step, e), exc_info=True)
+ raise NsException(e)
+
+ def delete(self, session, indata, version, nsr_id, *args, **kwargs):
+ print("ns.delete session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
+ # TODO del when ALL "tasks.nsr_id" are None of nsr_id
+ # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id})
+ retries = 5
+ for retry in range(retries):
+ with self.write_lock:
+ ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
+ if not ro_tasks:
+ break
+ now = time()
+ conflict = False
+ for ro_task in ro_tasks:
+ db_update = {}
+ to_delete = True
+ for index, task in enumerate(ro_task["tasks"]):
+ if not task:
+ pass
+ elif task["nsr_id"] == nsr_id:
+ db_update["tasks.{}".format(index)] = None
+ else:
+ to_delete = False # used by other nsr, cannot be deleted
+ # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
+ if to_delete:
+ if not self.db.del_one("ro_tasks",
+ q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+ fail_on_empty=False):
+ conflict = True
+ elif db_update:
+ db_update["modified_at"] = now
+ if not self.db.set_one("ro_tasks",
+ q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+ update_dict=db_update,
+ fail_on_empty=False):
+ conflict = True
+ if not conflict:
+ break
+ else:
+ raise NsException("Exceeded {} retries".format(retries))
+
+ return None, None, True
+
+ def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+ print("ns.status session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version,
+ nsr_id, action_id))
+ task_list = []
+ done = 0
+ total = 0
+ ro_tasks = self.db.get_list("ro_tasks", {"tasks.action_id": action_id})
+ global_status = "DONE"
+ details = []
+ for ro_task in ro_tasks:
+ for task in ro_task["tasks"]:
+ if task["action_id"] == action_id:
+ task_list.append(task)
+ total += 1
+ if task["status"] == "FAILED":
+ global_status = "FAILED"
+ details.append(ro_task.get("vim_details", ''))
+ elif task["status"] in ("SCHEDULED", "BUILD"):
+ if global_status != "FAILED":
+ global_status = "BUILD"
+ else:
+ done += 1
+ return_data = {
+ "status": global_status,
+ "details": ". ".join(details) if details else "progress {}/{}".format(done, total),
+ "nsr_id": nsr_id,
+ "action_id": action_id,
+ "tasks": task_list
+ }
+ return return_data, None, True
+
+ def cancel(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+ print("ns.cancel session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version,
+ nsr_id, action_id))
+ return None, None, True
+
+ def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+ nsrs = self.db.get_list("nsrs", {})
+ return_data = []
+ for ns in nsrs:
+ return_data.append({"_id": ns["_id"], "name": ns["name"]})
+ return return_data, None, True
+
+ def get_actions(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+ ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
+ return_data = []
+ for ro_task in ro_tasks:
+ for task in ro_task["tasks"]:
+ if task["action_id"] not in return_data:
+ return_data.append(task["action_id"])
+ return return_data, None, True
--- /dev/null
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+##
+
+""""
+This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
+The tasks are stored at database in table ro_tasks
+A single ro_task refers to a VIM element (flavor, image, network, ...).
+A ro_task can contain several 'tasks', each one with a target, where to store the results
+"""
+
+import threading
+import time
+import queue
+import logging
+from pkg_resources import iter_entry_points
+# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
+from osm_common.dbbase import DbException
+# from osm_common.fsbase import FsException
+# from osm_common.msgbase import MsgException
+from osm_ro_plugin.vim_dummy import VimDummyConnector
+from osm_ro_plugin import vimconn
+from copy import deepcopy
+from unittest.mock import Mock
+
+__author__ = "Alfonso Tierno"
+__date__ = "$28-Sep-2017 12:07:15$"
+
+
+def deep_get(target_dict, *args, **kwargs):
+ """
+ Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
+ Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
+ :param target_dict: dictionary to be read
+ :param args: list of keys to read from target_dict
+ :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
+ :return: The wanted value if exist, None or default otherwise
+ """
+ for key in args:
+ if not isinstance(target_dict, dict) or key not in target_dict:
+ return kwargs.get("default")
+ target_dict = target_dict[key]
+ return target_dict
+
+
+class NsWorkerException(Exception):
+ pass
+
+
+class FailingConnector:
+ def __init__(self, error_msg):
+ self.error_msg = error_msg
+ for method in dir(vimconn.VimConnector):
+ if method[0] != "_":
+ setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
+
+
+class NsWorkerExceptionNotFound(NsWorkerException):
+ pass
+
+
+class NsWorker(threading.Thread):
+ REFRESH_BUILD = 5 # 5 seconds
+ REFRESH_ACTIVE = 60 # 1 minute
+ REFRESH_ERROR = 600
+ REFRESH_IMAGE = 3600 * 10
+ REFRESH_DELETE = 3600 * 10
+ QUEUE_SIZE = 2000
+ # TODO delete assigment_lock = Lock()
+ terminate = False
+ # TODO delete assignment = {}
+ MAX_TIME_LOCKED = 3600
+
+ def __init__(self, worker, config, plugins, db):
+ """Init a thread.
+ Arguments:
+ 'id' number of thead
+ 'name' name of thread
+ 'host','user': host ip or name to manage and user
+ 'db', 'db_lock': database class and lock to use it in exclusion
+ """
+ threading.Thread.__init__(self)
+ self.config = config
+ self.plugins = plugins
+ self.plugin_name = "unknown"
+ self.logger = logging.getLogger('ro.worker{}'.format("worker"))
+ self.worker_id = worker
+ self.task_queue = queue.Queue(self.QUEUE_SIZE)
+ self.my_vims = {} # targetvim: vimplugin class
+ self.db_vims = {} # targetvim: vim information from database
+ self.vim_targets = [] # targetvim list
+ self.my_id = config["process_id"] + ":" + str(worker)
+ self.db = db
+ self.item2create = {
+ "net": self.new_net,
+ "vdu": self.new_vm,
+ "image": self.new_image,
+ "flavor": self.new_flavor,
+ }
+ self.item2refresh = {
+ "net": self.refresh_net,
+ "vdu": self.refresh_vm,
+ "image": self.refresh_ok,
+ "flavor": self.refresh_ok,
+ }
+ self.item2delete = {
+ "net": self.del_net,
+ "vdu": self.del_vm,
+ "image": self.delete_ok,
+ "flavor": self.del_flavor,
+ }
+ self.item2action = {
+ "vdu": self.exec_vm,
+ }
+ self.time_last_task_processed = None
+
+ def insert_task(self, task):
+ try:
+ self.task_queue.put(task, False)
+ return None
+ except queue.Full:
+ raise NsWorkerException("timeout inserting a task")
+
+ def terminate(self):
+ self.insert_task("exit")
+
+ def del_task(self, task):
+ with self.task_lock:
+ if task["status"] == "SCHEDULED":
+ task["status"] = "SUPERSEDED"
+ return True
+ else: # task["status"] == "processing"
+ self.task_lock.release()
+ return False
+
+ def _load_plugin(self, name, type="vim"):
+ # type can be vim or sdn
+ if "rovim_dummy" not in self.plugins:
+ self.plugins["rovim_dummy"] = VimDummyConnector
+ if name in self.plugins:
+ return self.plugins[name]
+ try:
+ for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
+ self.plugins[name] = v.load()
+ except Exception as e:
+ self.logger.critical("Cannot load osm_{}: {}".format(name, e))
+ if name:
+ self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e))
+ if name and name not in self.plugins:
+ error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
+ " registered".format(t=type, n=name)
+ self.logger.critical(error_text)
+ self.plugins[name] = FailingConnector(error_text)
+
+ return self.plugins[name]
+
+ def _load_vim(self, vim_account_id):
+ target_id = "vim:" + vim_account_id
+ plugin_name = ""
+ vim = None
+ try:
+ step = "Getting vim={} from db".format(vim_account_id)
+ vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+
+ # if deep_get(vim, "config", "sdn-controller"):
+ # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
+ # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
+
+ step = "Decrypt password"
+ schema_version = vim.get("schema_version")
+ self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
+ schema_version=schema_version, salt=vim_account_id)
+
+ step = "Load plugin 'rovim_{}'".format(vim.get("vim_type"))
+ plugin_name = "rovim_" + vim["vim_type"]
+ vim_module_conn = self._load_plugin(plugin_name)
+ self.my_vims[target_id] = vim_module_conn(
+ uuid=vim['_id'], name=vim['name'],
+ tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
+ url=vim['vim_url'], url_admin=None,
+ user=vim['vim_user'], passwd=vim['vim_password'],
+ config=vim.get('config'), persistent_info={}
+ )
+ self.vim_targets.append(target_id)
+ self.db_vims[target_id] = vim
+ self.error_status = None
+ self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
+ vim_account_id, plugin_name))
+ except Exception as e:
+ self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
+ vim_account_id, plugin_name, step, e))
+ self.db_vims[target_id] = vim or {}
+ self.my_vims[target_id] = FailingConnector(str(e))
+ self.error_status = "Error loading vimconnector: {}".format(e)
+
+ def _get_db_task(self):
+ """
+ Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
+ :return: None
+ """
+ now = time.time()
+ if not self.time_last_task_processed:
+ self.time_last_task_processed = now
+ try:
+ while True:
+ locked = self.db.set_one(
+ "ro_tasks",
+ q_filter={"target_id": self.vim_targets,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at.lt": now - self.MAX_TIME_LOCKED,
+ "to_check_at.lt": self.time_last_task_processed},
+ update_dict={"locked_by": self.my_id, "locked_at": now},
+ fail_on_empty=False)
+ if locked:
+ # read and return
+ ro_task = self.db.get_one(
+ "ro_tasks",
+ q_filter={"target_id": self.vim_targets,
+ "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "locked_at": now})
+ return ro_task
+ if self.time_last_task_processed == now:
+ self.time_last_task_processed = None
+ return None
+ else:
+ self.time_last_task_processed = now
+ # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
+
+ except DbException as e:
+ self.logger.error("Database exception at _get_db_task: {}".format(e))
+ except Exception as e:
+ self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
+ return None
+
+ def _delete_task(self, ro_task, task_index, task_depends, db_update):
+ """
+ Determine if this task need to be done or superseded
+ :return: None
+ """
+ my_task = ro_task["tasks"][task_index]
+ task_id = my_task["task_id"]
+ needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
+ if my_task["status"] == "FAILED":
+ return None, None # TODO need to be retry??
+ try:
+ for index, task in enumerate(ro_task["tasks"]):
+ if index == task_index:
+ continue # own task
+ if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
+ # set to finished
+ db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
+ elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
+ needed_delete = False
+ if needed_delete:
+ return self.item2delete[my_task["item"]](ro_task, task_index)
+ else:
+ return "SUPERSEDED", None
+ except Exception as e:
+ if not isinstance(e, NsWorkerException):
+ self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
+ exc_info=True)
+ return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+
+ def _create_task(self, ro_task, task_index, task_depends, db_update):
+ """
+ Determine if this task need to be created
+ :return: None
+ """
+ my_task = ro_task["tasks"][task_index]
+ task_id = my_task["task_id"]
+ task_status = None
+ if my_task["status"] == "FAILED":
+ return None, None # TODO need to be retry??
+ elif my_task["status"] == "SCHEDULED":
+ # check if already created by another task
+ for index, task in enumerate(ro_task["tasks"]):
+ if index == task_index:
+ continue # own task
+ if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
+ return task["status"], "COPY_VIM_INFO"
+
+ try:
+ task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends)
+ # TODO update other CREATE tasks
+ except Exception as e:
+ if not isinstance(e, NsWorkerException):
+ self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
+ task_status = "FAILED"
+ ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+ # TODO update ro_vim_item_update
+ return task_status, ro_vim_item_update
+ else:
+ return None, None
+
+ def _get_dependency(self, task_id, ro_task=None, target_id=None):
+ if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
+ ro_task_dependency = self.db.get_one(
+ "ro_tasks",
+ q_filter={"target_id": target_id,
+ "tasks.target_record_id": task_id
+ },
+ fail_on_empty=False)
+ if ro_task_dependency:
+ for task_index, task in enumerate(ro_task_dependency["tasks"]):
+ if task["target_record_id"] == task_id:
+ return ro_task_dependency, task_index
+
+ else:
+ if ro_task:
+ for task_index, task in enumerate(ro_task["tasks"]):
+ if task["task_id"] == task_id:
+ return ro_task, task_index
+ ro_task_dependency = self.db.get_one(
+ "ro_tasks",
+ q_filter={"tasks.ANYINDEX.task_id": task_id,
+ "tasks.ANYINDEX.target_record.ne": None
+ },
+ fail_on_empty=False)
+ if ro_task_dependency:
+ for task_index, task in ro_task_dependency["tasks"]:
+ if task["task_id"] == task_id:
+ return ro_task_dependency, task_index
+ raise NsWorkerException("Cannot get depending task {}".format(task_id))
+
+ def _proccess_pending_tasks(self, ro_task):
+ ro_task_id = ro_task["_id"]
+ now = time.time()
+ next_check_at = now + (24*60*60) # one day
+ db_ro_task_update = {}
+
+ def _update_refresh(new_status):
+ # compute next_refresh
+ nonlocal task
+ nonlocal next_check_at
+ nonlocal db_ro_task_update
+ nonlocal ro_task
+
+ next_refresh = time.time()
+ if task["item"] in ("image", "flavor"):
+ next_refresh += self.REFRESH_IMAGE
+ elif new_status == "BUILD":
+ next_refresh += self.REFRESH_BUILD
+ elif new_status == "DONE":
+ next_refresh += self.REFRESH_ACTIVE
+ else:
+ next_refresh += self.REFRESH_ERROR
+ next_check_at = min(next_check_at, next_refresh)
+ db_ro_task_update["vim_info.refresh_at"] = next_refresh
+ ro_task["vim_info"]["refresh_at"] = next_refresh
+
+ try:
+ # 0 get task_status_create
+ task_status_create = None
+ task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and
+ t["status"] in ("BUILD", "DONE")), None)
+ if task_create:
+ task_status_create = task_create["status"]
+ # 1. look for SCHEDULED or if CREATE also DONE,BUILD
+ for task_action in ("DELETE", "CREATE", "EXEC"):
+ db_vim_update = None
+ for task_index, task in enumerate(ro_task["tasks"]):
+ target_update = None
+ if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\
+ task["action"] != task_action or \
+ (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
+ continue
+ task_path = "tasks.{}.status".format(task_index)
+ try:
+ if task["status"] == "SCHEDULED":
+ task_depends = {}
+ # check if tasks that this depends on have been completed
+ dependency_not_completed = False
+ for dependency_task_id in (task.get("depends_on") or ()):
+ dependency_ro_task, dependency_task_index = \
+ self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
+ dependency_task = dependency_ro_task["tasks"][dependency_task_index]
+ if dependency_task["status"] == "SCHEDULED":
+ dependency_not_completed = True
+ next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
+ break
+ elif dependency_task["status"] == "FAILED":
+ error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
+ task["action"], task["item"], dependency_task["action"],
+ dependency_task["item"], dependency_task_id,
+ dependency_ro_task["vim_info"].get("vim_details"))
+ self.logger.error("task={} {}".format(task["task_id"], error_text))
+ raise NsWorkerException(error_text)
+
+ task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
+ task_depends["TASK-{}".format(dependency_task_id)] = \
+ dependency_ro_task["vim_info"]["vim_id"]
+ if dependency_not_completed:
+ # TODO set at vim_info.vim_details that it is waiting
+ continue
+
+ if task["action"] == "DELETE":
+ new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
+ task_depends, db_ro_task_update)
+ new_status = "FINISHED" if new_status == "DONE" else new_status
+ # ^with FINISHED instead of DONE it will not be refreshing
+ if new_status in ("FINISHED", "SUPERSEDED"):
+ target_update = "DELETE"
+ elif task["action"] == "EXEC":
+ self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update)
+ new_status = "FINISHED" if new_status == "DONE" else new_status
+ # ^with FINISHED instead of DONE it will not be refreshing
+ if new_status in ("FINISHED", "SUPERSEDED"):
+ target_update = "DELETE"
+ elif task["action"] == "CREATE":
+ if task["status"] == "SCHEDULED":
+ if task_status_create:
+ new_status = task_status_create
+ target_update = "COPY_VIM_INFO"
+ else:
+ new_status, db_vim_info_update = \
+ self.item2create[task["item"]](ro_task, task_index, task_depends)
+ # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
+ _update_refresh(new_status)
+ else:
+ if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
+ new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task)
+ _update_refresh(new_status)
+ except Exception as e:
+ new_status = "FAILED"
+ db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+ if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
+ self.logger.error("Unexpected exception at _delete_task task={}: {}".
+ format(task["task_id"], e), exc_info=True)
+
+ try:
+ if db_vim_info_update:
+ db_vim_update = db_vim_info_update.copy()
+ db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
+ ro_task["vim_info"].update(db_vim_info_update)
+
+ if new_status:
+ if task_action == "CREATE":
+ task_status_create = new_status
+ db_ro_task_update[task_path] = new_status
+ if target_update or db_vim_update:
+
+ if target_update == "DELETE":
+ self._update_target(task, None)
+ elif target_update == "COPY_VIM_INFO":
+ self._update_target(task, ro_task["vim_info"])
+ else:
+ self._update_target(task, db_vim_update)
+
+ except Exception as e:
+ self.logger.error("Unexpected exception at _update_target task={}: {}".
+ format(task["task_id"], e), exc_info=True)
+
+ # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
+ # outside this task (by ro_nbi) do not update it
+ db_ro_task_update["locked_by"] = None
+ # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
+ db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+ db_ro_task_update["to_check_at"] = next_check_at
+ if not self.db.set_one("ro_tasks",
+ update_dict=db_ro_task_update,
+ q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]},
+ fail_on_empty=False):
+ del db_ro_task_update["to_check_at"]
+ self.db.set_one("ro_tasks",
+ q_filter={"_id": ro_task["_id"]},
+ update_dict=db_ro_task_update,
+ fail_on_empty=True)
+ except DbException as e:
+ self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
+ except Exception as e:
+ self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
+
+ def _update_target(self, task, ro_vim_item_update):
+ try:
+ table, _id, path = task["target_record"].split(":")
+ if ro_vim_item_update:
+ update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in
+ ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
+ if ro_vim_item_update.get("interfaces"):
+ path_vdu = path[:path.rfind(".")]
+ path_vdu = path_vdu[:path_vdu.rfind(".")]
+ path_interfaces = path_vdu + ".interfaces"
+ for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
+ if iface:
+ update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
+ k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
+ if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
+ update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
+ if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
+ update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0]
+
+ self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
+ else:
+ self.db.set_one(table, q_filter={"_id": _id}, update_dict=None,
+ unset={path: None})
+ except DbException as e:
+ self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e))
+
+ def new_image(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ created_items = {}
+ target_vim = self.my_vims[ro_task["target_id"]]
+ try:
+ # FIND
+ if task.get("find_params"):
+ vim_images = target_vim.get_image_list(**task["find_params"])
+ if not vim_images:
+ raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
+ task["find_params"]))
+ elif len(vim_images) > 1:
+ raise NsWorkerException(
+ "More than one network found with this criteria: '{}'".format(task["find_params"]))
+ else:
+ vim_image_id = vim_images[0]["id"]
+
+ ro_vim_item_update = {"vim_id": vim_image_id,
+ "vim_status": "DONE",
+ "created": created,
+ "created_items": created_items,
+ "vim_details": None}
+ self.logger.debug(
+ "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
+ return "DONE", ro_vim_item_update
+ except (NsWorkerException, vimconn.VimConnException) as e:
+ self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+ def del_flavor(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ flavor_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {"vim_status": "DELETED",
+ "created": False,
+ "vim_details": "DELETED",
+ "vim_id": None}
+ try:
+ if flavor_vim_id:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_flavor(flavor_vim_id)
+
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+ except vimconn.VimConnException as e:
+ self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
+ ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "vim_details": "Error while deleting: {}".format(e)}
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug("task={} {} del-flavor={} {}".format(
+ task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
+ return "DONE", ro_vim_item_update_ok
+
+ def refresh_ok(self, ro_task):
+ """skip calling VIM to get image status. Assumes ok"""
+ if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
+ return "FAILED", {}
+ return "DONE", {}
+
+ def delete_ok(self, ro_task):
+ """skip calling VIM to delete image status. Assumes ok"""
+ return "DONE", {}
+
+ def new_flavor(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ created_items = {}
+ target_vim = self.my_vims[ro_task["target_id"]]
+ try:
+ # FIND
+ vim_flavor_id = None
+ if task.get("find_params"):
+ try:
+ flavor_data = task["find_params"]["flavor_data"]
+ vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
+ except vimconn.VimConnNotFoundException:
+ pass
+
+ if not vim_flavor_id and task.get("params"):
+ # CREATE
+ flavor_data = task["params"]["flavor_data"]
+ vim_flavor_id = target_vim.new_flavor(flavor_data)
+ created = True
+
+ ro_vim_item_update = {"vim_id": vim_flavor_id,
+ "vim_status": "DONE",
+ "created": created,
+ "created_items": created_items,
+ "vim_details": None}
+ self.logger.debug(
+ "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
+ return "DONE", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+ def new_net(self, ro_task, task_index, task_depends):
+ vim_net_id = None
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ created_items = {}
+ target_vim = self.my_vims[ro_task["target_id"]]
+ try:
+ # FIND
+ if task.get("find_params"):
+ # if management, get configuration of VIM
+ if task["find_params"].get("filter_dict"):
+ vim_filter = task["find_params"]["filter_dict"]
+ elif task["find_params"].get("mgmt"): # mamagement network
+ if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
+ vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
+ elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
+ vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
+ else:
+ vim_filter = {"name": task["find_params"]["name"]}
+ else:
+ raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
+
+ vim_nets = target_vim.get_network_list(vim_filter)
+ if not vim_nets and not task.get("params"):
+ raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
+ task.get("find_params")))
+ elif len(vim_nets) > 1:
+ raise NsWorkerException(
+ "More than one network found with this criteria: '{}'".format(task["find_params"]))
+ if vim_nets:
+ vim_net_id = vim_nets[0]["id"]
+ else:
+ # CREATE
+ params = task["params"]
+ vim_net_id, created_items = target_vim.new_network(**params)
+ created = True
+
+ ro_vim_item_update = {"vim_id": vim_net_id,
+ "vim_status": "BUILD",
+ "created": created,
+ "created_items": created_items,
+ "vim_details": None}
+ self.logger.debug(
+ "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
+ return "BUILD", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+ def refresh_net(self, ro_task):
+ """Call VIM to get network status"""
+ ro_task_id = ro_task["_id"]
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ vim_id = ro_task["vim_info"]["vim_id"]
+ net_to_refresh_list = [vim_id]
+ try:
+ vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
+ vim_info = vim_dict[vim_id]
+ if vim_info["status"] == "ACTIVE":
+ task_status = "DONE"
+ elif vim_info["status"] == "BUILD":
+ task_status = "BUILD"
+ else:
+ task_status = "FAILED"
+ except vimconn.VimConnException as e:
+ # Mark all tasks at VIM_ERROR status
+ self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+ vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+ task_status = "FAILED"
+
+ ro_vim_item_update = {}
+ if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+ ro_vim_item_update["vim_status"] = vim_info["status"]
+ if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+ ro_vim_item_update["vim_name"] = vim_info.get("name")
+ if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+ if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
+ ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+ elif vim_info["status"] == "DELETED":
+ ro_vim_item_update["vim_id"] = None
+ ro_vim_item_update["vim_details"] = "Deleted externally"
+ else:
+ if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+ ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+ if ro_vim_item_update:
+ self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
+ ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+ ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+ return task_status, ro_vim_item_update
+
+ def del_net(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ net_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {"vim_status": "DELETED",
+ "created": False,
+ "vim_details": "DELETED",
+ "vim_id": None}
+ try:
+ if net_vim_id or ro_task["vim_info"]["created_items"]:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
+
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+ except vimconn.VimConnException as e:
+ self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
+ net_vim_id, e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "vim_details": "Error while deleting: {}".format(e)}
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
+ ro_vim_item_update_ok.get("vim_details", "")))
+ return "DONE", ro_vim_item_update_ok
+
+ def new_vm(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ created_items = {}
+ target_vim = self.my_vims[ro_task["target_id"]]
+ try:
+ created = True
+ params = task["params"]
+ params_copy = deepcopy(params)
+ net_list = params_copy["net_list"]
+ for net in net_list:
+ if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id
+ network_id = task_depends[net["net_id"]]
+ if not network_id:
+ raise NsWorkerException("Cannot create VM because depends on a network not created or found "
+ "for {}".format(net["net_id"]))
+ net["net_id"] = network_id
+ if params_copy["image_id"].startswith("TASK-"):
+ params_copy["image_id"] = task_depends[params_copy["image_id"]]
+ if params_copy["flavor_id"].startswith("TASK-"):
+ params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
+
+ vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
+ interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
+
+ ro_vim_item_update = {"vim_id": vim_vm_id,
+ "vim_status": "BUILD",
+ "created": created,
+ "created_items": created_items,
+ "vim_details": None,
+ "interfaces_vim_ids": interfaces,
+ "interfaces": [],
+ }
+ self.logger.debug(
+ "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
+ return "BUILD", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+ def del_vm(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ vm_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {"vim_status": "DELETED",
+ "created": False,
+ "vim_details": "DELETED",
+ "vim_id": None}
+ try:
+ if vm_vim_id or ro_task["vim_info"]["created_items"]:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
+
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+ except vimconn.VimConnException as e:
+ self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
+ vm_vim_id, e))
+ ro_vim_item_update = {"vim_status": "VIM_ERROR",
+ "vim_details": "Error while deleting: {}".format(e)}
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
+ ro_vim_item_update_ok.get("vim_details", "")))
+ return "DONE", ro_vim_item_update_ok
+
+ def refresh_vm(self, ro_task):
+ """Call VIM to get vm status"""
+ ro_task_id = ro_task["_id"]
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ vim_id = ro_task["vim_info"]["vim_id"]
+ if not vim_id:
+ return None, None
+ vm_to_refresh_list = [vim_id]
+ try:
+ vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
+ vim_info = vim_dict[vim_id]
+ if vim_info["status"] == "ACTIVE":
+ task_status = "DONE"
+ elif vim_info["status"] == "BUILD":
+ task_status = "BUILD"
+ else:
+ task_status = "FAILED"
+ except vimconn.VimConnException as e:
+ # Mark all tasks at VIM_ERROR status
+ self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+ vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+ task_status = "FAILED"
+
+ ro_vim_item_update = {}
+ # TODO check and update interfaces
+ vim_interfaces = []
+ for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
+ iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None)
+ # if iface:
+ # iface.pop("vim_info", None)
+ vim_interfaces.append(iface)
+
+ task = ro_task["tasks"][0] # TODO look for a task CREATE and active
+ if task.get("mgmt_vnf_interface") is not None:
+ vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
+ mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0))
+ vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
+
+ if ro_task["vim_info"]["interfaces"] != vim_interfaces:
+ ro_vim_item_update["interfaces"] = vim_interfaces
+ if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+ ro_vim_item_update["vim_status"] = vim_info["status"]
+ if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+ ro_vim_item_update["vim_name"] = vim_info.get("name")
+ if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+ if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
+ ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+ elif vim_info["status"] == "DELETED":
+ ro_vim_item_update["vim_id"] = None
+ ro_vim_item_update["vim_details"] = "Deleted externally"
+ else:
+ if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+ ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+ if ro_vim_item_update:
+ self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
+ ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+ ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+ return task_status, ro_vim_item_update
+
+ def exec_vm(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ target_vim = self.my_vims[ro_task["target_id"]]
+ try:
+ params = task["params"]
+ params_copy = deepcopy(params)
+ params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"),
+ params_copy.pop("schema_version"), params_copy.pop("salt"))
+
+ target_vim.inject_user_key(**params_copy)
+ self.logger.debug(
+ "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
+ return "DONE", params_copy["key"]
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
+ ro_vim_item_update = {"vim_details": str(e)}
+ return "FAILED", ro_vim_item_update
+
+ def run(self):
+ # load database
+ self.logger.debug("Starting")
+ while True:
+ try:
+ task = self.task_queue.get(block=False if self.my_vims else True)
+ if task[0] == "terminate":
+ break
+ if task[0] == "load_vim":
+ self._load_vim(task[1])
+ continue
+ except queue.Empty:
+ pass
+
+ try:
+ busy = False
+ ro_task = self._get_db_task()
+ if ro_task:
+ self._proccess_pending_tasks(ro_task)
+ busy = True
+ if not busy:
+ time.sleep(5)
+ except Exception as e:
+ self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
+
+ self.logger.debug("Finishing")
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[/]
+# tools.secureheaders.on = True
+tools.sessions.on = True
+# increase security on sessions
+tools.sessions.secure = True
+tools.sessions.httponly = True
+tools.encode.on: True,
+tools.encode.encoding: 'utf-8'
+tools.response_headers.on = True
+
+# tools.auth_basic.on: True,
+# tools.auth_basic.realm: 'localhost',
+# tools.auth_basic.checkpassword: get_tokens
+
+
+[/static]
+# use env OSMRO_STATIC_ON, OSMRO_STATIC_DIR to override
+tools.staticdir.on: True
+tools.staticdir.dir: "/app/RO/RO-NG/osm_ng_ro/html_public"
+
+
+[global]
+# use env OSMRO_SERVER_XXX, OSMRO_LOG_XXX, OSMRO_TEST_XXX or OSMRO_AUTH_XXX to override. Use value in yaml format
+server.socket_host: "0.0.0.0"
+server.socket_port: 9998
+
+# server.ssl_module: "builtin"
+# server.ssl_certificate: "./http/cert.pem"
+# server.ssl_private_key: "./http/privkey.pem"
+# server.ssl_pass_phrase: "osm4u"
+server.thread_pool: 10
+server.ns_threads: 1
+
+# Uncomment for allow basic authentication apart from bearer
+# auth.allow_basic_authentication: True
+
+# comment or set to False to disable /test URL
+server.enable_test: True
+
+log.screen: False
+log.access_file: ""
+log.error_file: ""
+
+log.level: "DEBUG"
+#log.file: /var/log/osm/ro.log
+
+
+[database]
+# use env OSMRO_DATABASE_XXX to override
+driver: "mongo" # mongo or memory
+uri: "mongodb://mongo:27017"
+name: "osm"
+# user: "user"
+# password: "password"
+# commonkey: "commonkey"
+
+[storage]
+# use env OSMRO_STORAGE_XXX to override
+driver: "local" # local filesystem
+# for local provide file path
+path: "/app/storage" #"/home/atierno/OSM/osm/NBI/local/storage"
+
+loglevel: "DEBUG"
+#logfile: /var/log/osm/ro-storage.log
+
+[message]
+# use env OSMRO_MESSAGE_XXX to override
+driver: "kafka" # local or kafka
+# for local provide file path
+path: "/app/storage/kafka"
+host: "kafka"
+port: 9092
+
+loglevel: "DEBUG"
+#logfile: /var/log/osm/ro-message.log
+group_id: "ro-server"
+
+[authentication]
+# use env OSMRO_AUTHENTICATION_XXX to override
+
--- /dev/null
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import cherrypy
+import time
+import json
+import yaml
+import osm_ng_ro.html_out as html
+import logging
+import logging.handlers
+import getopt
+import sys
+
+from osm_ng_ro.ns import Ns, NsException
+from osm_ng_ro.validation import ValidationError
+from osm_common.dbbase import DbException
+from osm_common.fsbase import FsException
+from osm_common.msgbase import MsgException
+from http import HTTPStatus
+from codecs import getreader
+from os import environ, path
+from osm_ng_ro import version as ro_version, version_date as ro_version_date
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+__version__ = "0.1." # file version, not NBI version
+version_date = "May 2020"
+
+database_version = '1.2'
+auth_database_version = '1.0'
+ro_server = None # instance of Server class
+# vim_threads = None # instance of VimThread class
+
+"""
+RO North Bound Interface
+URL: /ro GET POST PUT DELETE PATCH
+ /ns/v1/deploy O
+ /<nsrs_id> O O O
+ /<action_id> O
+ /cancel O
+
+"""
+
+valid_query_string = ("ADMIN", "SET_PROJECT", "FORCE", "PUBLIC")
+# ^ Contains possible administrative query string words:
+# ADMIN=True(by default)|Project|Project-list: See all elements, or elements of a project
+# (not owned by my session project).
+# PUBLIC=True(by default)|False: See/hide public elements. Set/Unset a topic to be public
+# FORCE=True(by default)|False: Force edition/deletion operations
+# SET_PROJECT=Project|Project-list: Add/Delete the topic to the projects portfolio
+
+valid_url_methods = {
+ # contains allowed URL and methods, and the role_permission name
+ "admin": {
+ "v1": {
+ "tokens": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "tokens:",
+ "<ID>": {
+ "METHODS": ("DELETE",),
+ "ROLE_PERMISSION": "tokens:id:"
+ }
+ },
+ }
+ },
+ "ns": {
+ "v1": {
+ "deploy": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "deploy:",
+ "<ID>": {
+ "METHODS": ("GET", "POST", "DELETE"),
+ "ROLE_PERMISSION": "deploy:id:",
+ "<ID>": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "deploy:id:id:",
+ "cancel": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "deploy:id:id:cancel",
+ }
+ }
+ }
+ },
+ }
+ },
+}
+
+
+class RoException(Exception):
+
+ def __init__(self, message, http_code=HTTPStatus.METHOD_NOT_ALLOWED):
+ Exception.__init__(self, message)
+ self.http_code = http_code
+
+
+class AuthException(RoException):
+ pass
+
+
+class Authenticator:
+
+ def __init__(self, valid_url_methods, valid_query_string):
+ self.valid_url_methods = valid_url_methods
+ self.valid_query_string = valid_query_string
+
+ def authorize(self, *args, **kwargs):
+ return {"token": "ok", "id": "ok"}
+
+ def new_token(self, token_info, indata, remote):
+ return {"token": "ok",
+ "id": "ok",
+ "remote": remote}
+
+ def del_token(self, token_id):
+ pass
+
+ def start(self, engine_config):
+ pass
+
+
+class Server(object):
+ instance = 0
+ # to decode bytes to str
+ reader = getreader("utf-8")
+
+ def __init__(self):
+ self.instance += 1
+ self.authenticator = Authenticator(valid_url_methods, valid_query_string)
+ self.ns = Ns()
+ self.map_operation = {
+ "token:post": self.new_token,
+ "token:id:delete": self.del_token,
+ "deploy:get": self.ns.get_deploy,
+ "deploy:id:get": self.ns.get_actions,
+ "deploy:id:post": self.ns.deploy,
+ "deploy:id:delete": self.ns.delete,
+ "deploy:id:id:get": self.ns.status,
+ "deploy:id:id:cancel:post": self.ns.cancel,
+ }
+
+ def _format_in(self, kwargs):
+ try:
+ indata = None
+ if cherrypy.request.body.length:
+ error_text = "Invalid input format "
+
+ if "Content-Type" in cherrypy.request.headers:
+ if "application/json" in cherrypy.request.headers["Content-Type"]:
+ error_text = "Invalid json format "
+ indata = json.load(self.reader(cherrypy.request.body))
+ cherrypy.request.headers.pop("Content-File-MD5", None)
+ elif "application/yaml" in cherrypy.request.headers["Content-Type"]:
+ error_text = "Invalid yaml format "
+ indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ cherrypy.request.headers.pop("Content-File-MD5", None)
+ elif "application/binary" in cherrypy.request.headers["Content-Type"] or \
+ "application/gzip" in cherrypy.request.headers["Content-Type"] or \
+ "application/zip" in cherrypy.request.headers["Content-Type"] or \
+ "text/plain" in cherrypy.request.headers["Content-Type"]:
+ indata = cherrypy.request.body # .read()
+ elif "multipart/form-data" in cherrypy.request.headers["Content-Type"]:
+ if "descriptor_file" in kwargs:
+ filecontent = kwargs.pop("descriptor_file")
+ if not filecontent.file:
+ raise RoException("empty file or content", HTTPStatus.BAD_REQUEST)
+ indata = filecontent.file # .read()
+ if filecontent.content_type.value:
+ cherrypy.request.headers["Content-Type"] = filecontent.content_type.value
+ else:
+ # raise cherrypy.HTTPError(HTTPStatus.Not_Acceptable,
+ # "Only 'Content-Type' of type 'application/json' or
+ # 'application/yaml' for input format are available")
+ error_text = "Invalid yaml format "
+ indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ cherrypy.request.headers.pop("Content-File-MD5", None)
+ else:
+ error_text = "Invalid yaml format "
+ indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ cherrypy.request.headers.pop("Content-File-MD5", None)
+ if not indata:
+ indata = {}
+
+ format_yaml = False
+ if cherrypy.request.headers.get("Query-String-Format") == "yaml":
+ format_yaml = True
+
+ for k, v in kwargs.items():
+ if isinstance(v, str):
+ if v == "":
+ kwargs[k] = None
+ elif format_yaml:
+ try:
+ kwargs[k] = yaml.load(v, Loader=yaml.SafeLoader)
+ except Exception:
+ pass
+ elif k.endswith(".gt") or k.endswith(".lt") or k.endswith(".gte") or k.endswith(".lte"):
+ try:
+ kwargs[k] = int(v)
+ except Exception:
+ try:
+ kwargs[k] = float(v)
+ except Exception:
+ pass
+ elif v.find(",") > 0:
+ kwargs[k] = v.split(",")
+ elif isinstance(v, (list, tuple)):
+ for index in range(0, len(v)):
+ if v[index] == "":
+ v[index] = None
+ elif format_yaml:
+ try:
+ v[index] = yaml.load(v[index], Loader=yaml.SafeLoader)
+ except Exception:
+ pass
+
+ return indata
+ except (ValueError, yaml.YAMLError) as exc:
+ raise RoException(error_text + str(exc), HTTPStatus.BAD_REQUEST)
+ except KeyError as exc:
+ raise RoException("Query string error: " + str(exc), HTTPStatus.BAD_REQUEST)
+ except Exception as exc:
+ raise RoException(error_text + str(exc), HTTPStatus.BAD_REQUEST)
+
+ @staticmethod
+ def _format_out(data, token_info=None, _format=None):
+ """
+ return string of dictionary data according to requested json, yaml, xml. By default json
+ :param data: response to be sent. Can be a dict, text or file
+ :param token_info: Contains among other username and project
+ :param _format: The format to be set as Content-Type if data is a file
+ :return: None
+ """
+ accept = cherrypy.request.headers.get("Accept")
+ if data is None:
+ if accept and "text/html" in accept:
+ return html.format(data, cherrypy.request, cherrypy.response, token_info)
+ # cherrypy.response.status = HTTPStatus.NO_CONTENT.value
+ return
+ elif hasattr(data, "read"): # file object
+ if _format:
+ cherrypy.response.headers["Content-Type"] = _format
+ elif "b" in data.mode: # binariy asssumig zip
+ cherrypy.response.headers["Content-Type"] = 'application/zip'
+ else:
+ cherrypy.response.headers["Content-Type"] = 'text/plain'
+ # TODO check that cherrypy close file. If not implement pending things to close per thread next
+ return data
+ if accept:
+ if "application/json" in accept:
+ cherrypy.response.headers["Content-Type"] = 'application/json; charset=utf-8'
+ a = json.dumps(data, indent=4) + "\n"
+ return a.encode("utf8")
+ elif "text/html" in accept:
+ return html.format(data, cherrypy.request, cherrypy.response, token_info)
+
+ elif "application/yaml" in accept or "*/*" in accept or "text/plain" in accept:
+ pass
+ # if there is not any valid accept, raise an error. But if response is already an error, format in yaml
+ elif cherrypy.response.status >= 400:
+ raise cherrypy.HTTPError(HTTPStatus.NOT_ACCEPTABLE.value,
+ "Only 'Accept' of type 'application/json' or 'application/yaml' "
+ "for output format are available")
+ cherrypy.response.headers["Content-Type"] = 'application/yaml'
+ return yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False, tags=False,
+ encoding='utf-8', allow_unicode=True) # , canonical=True, default_style='"'
+
+ @cherrypy.expose
+ def index(self, *args, **kwargs):
+ token_info = None
+ try:
+ if cherrypy.request.method == "GET":
+ token_info = self.authenticator.authorize()
+ outdata = token_info # Home page
+ else:
+ raise cherrypy.HTTPError(HTTPStatus.METHOD_NOT_ALLOWED.value,
+ "Method {} not allowed for tokens".format(cherrypy.request.method))
+
+ return self._format_out(outdata, token_info)
+
+ except (NsException, AuthException) as e:
+ # cherrypy.log("index Exception {}".format(e))
+ cherrypy.response.status = e.http_code.value
+ return self._format_out("Welcome to OSM!", token_info)
+
+ @cherrypy.expose
+ def version(self, *args, **kwargs):
+ # TODO consider to remove and provide version using the static version file
+ try:
+ if cherrypy.request.method != "GET":
+ raise RoException("Only method GET is allowed", HTTPStatus.METHOD_NOT_ALLOWED)
+ elif args or kwargs:
+ raise RoException("Invalid URL or query string for version", HTTPStatus.METHOD_NOT_ALLOWED)
+ # TODO include version of other modules, pick up from some kafka admin message
+ osm_ng_ro_version = {"version": ro_version, "date": ro_version_date}
+ return self._format_out(osm_ng_ro_version)
+ except RoException as e:
+ cherrypy.response.status = e.http_code.value
+ problem_details = {
+ "code": e.http_code.name,
+ "status": e.http_code.value,
+ "detail": str(e),
+ }
+ return self._format_out(problem_details, None)
+
+ def new_token(self, engine_session, indata, *args, **kwargs):
+ token_info = None
+
+ try:
+ token_info = self.authenticator.authorize()
+ except Exception:
+ token_info = None
+ if kwargs:
+ indata.update(kwargs)
+ # This is needed to log the user when authentication fails
+ cherrypy.request.login = "{}".format(indata.get("username", "-"))
+ token_info = self.authenticator.new_token(token_info, indata, cherrypy.request.remote)
+ cherrypy.session['Authorization'] = token_info["id"]
+ self._set_location_header("admin", "v1", "tokens", token_info["id"])
+ # for logging
+
+ # cherrypy.response.cookie["Authorization"] = outdata["id"]
+ # cherrypy.response.cookie["Authorization"]['expires'] = 3600
+ return token_info, token_info["id"], True
+
+ def del_token(self, engine_session, indata, version, _id, *args, **kwargs):
+ token_id = _id
+ if not token_id and "id" in kwargs:
+ token_id = kwargs["id"]
+ elif not token_id:
+ token_info = self.authenticator.authorize()
+ # for logging
+ token_id = token_info["id"]
+ self.authenticator.del_token(token_id)
+ token_info = None
+ cherrypy.session['Authorization'] = "logout"
+ # cherrypy.response.cookie["Authorization"] = token_id
+ # cherrypy.response.cookie["Authorization"]['expires'] = 0
+ return None, None, True
+
+ @cherrypy.expose
+ def test(self, *args, **kwargs):
+ if not cherrypy.config.get("server.enable_test") or (isinstance(cherrypy.config["server.enable_test"], str) and
+ cherrypy.config["server.enable_test"].lower() == "false"):
+ cherrypy.response.status = HTTPStatus.METHOD_NOT_ALLOWED.value
+ return "test URL is disabled"
+ thread_info = None
+ if args and args[0] == "help":
+ return "<html><pre>\ninit\nfile/<name> download file\ndb-clear/table\nfs-clear[/folder]\nlogin\nlogin2\n"\
+ "sleep/<time>\nmessage/topic\n</pre></html>"
+
+ elif args and args[0] == "init":
+ try:
+ # self.ns.load_dbase(cherrypy.request.app.config)
+ self.ns.create_admin()
+ return "Done. User 'admin', password 'admin' created"
+ except Exception:
+ cherrypy.response.status = HTTPStatus.FORBIDDEN.value
+ return self._format_out("Database already initialized")
+ elif args and args[0] == "file":
+ return cherrypy.lib.static.serve_file(cherrypy.tree.apps['/ro'].config["storage"]["path"] + "/" + args[1],
+ "text/plain", "attachment")
+ elif args and args[0] == "file2":
+ f_path = cherrypy.tree.apps['/ro'].config["storage"]["path"] + "/" + args[1]
+ f = open(f_path, "r")
+ cherrypy.response.headers["Content-type"] = "text/plain"
+ return f
+
+ elif len(args) == 2 and args[0] == "db-clear":
+ deleted_info = self.ns.db.del_list(args[1], kwargs)
+ return "{} {} deleted\n".format(deleted_info["deleted"], args[1])
+ elif len(args) and args[0] == "fs-clear":
+ if len(args) >= 2:
+ folders = (args[1],)
+ else:
+ folders = self.ns.fs.dir_ls(".")
+ for folder in folders:
+ self.ns.fs.file_delete(folder)
+ return ",".join(folders) + " folders deleted\n"
+ elif args and args[0] == "login":
+ if not cherrypy.request.headers.get("Authorization"):
+ cherrypy.response.headers["WWW-Authenticate"] = 'Basic realm="Access to OSM site", charset="UTF-8"'
+ cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value
+ elif args and args[0] == "login2":
+ if not cherrypy.request.headers.get("Authorization"):
+ cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="Access to OSM site"'
+ cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value
+ elif args and args[0] == "sleep":
+ sleep_time = 5
+ try:
+ sleep_time = int(args[1])
+ except Exception:
+ cherrypy.response.status = HTTPStatus.FORBIDDEN.value
+ return self._format_out("Database already initialized")
+ thread_info = cherrypy.thread_data
+ print(thread_info)
+ time.sleep(sleep_time)
+ # thread_info
+ elif len(args) >= 2 and args[0] == "message":
+ main_topic = args[1]
+ return_text = "<html><pre>{} ->\n".format(main_topic)
+ try:
+ if cherrypy.request.method == 'POST':
+ to_send = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ for k, v in to_send.items():
+ self.ns.msg.write(main_topic, k, v)
+ return_text += " {}: {}\n".format(k, v)
+ elif cherrypy.request.method == 'GET':
+ for k, v in kwargs.items():
+ self.ns.msg.write(main_topic, k, yaml.load(v, Loader=yaml.SafeLoader))
+ return_text += " {}: {}\n".format(k, yaml.load(v, Loader=yaml.SafeLoader))
+ except Exception as e:
+ return_text += "Error: " + str(e)
+ return_text += "</pre></html>\n"
+ return return_text
+
+ return_text = (
+ "<html><pre>\nheaders:\n args: {}\n".format(args) +
+ " kwargs: {}\n".format(kwargs) +
+ " headers: {}\n".format(cherrypy.request.headers) +
+ " path_info: {}\n".format(cherrypy.request.path_info) +
+ " query_string: {}\n".format(cherrypy.request.query_string) +
+ " session: {}\n".format(cherrypy.session) +
+ " cookie: {}\n".format(cherrypy.request.cookie) +
+ " method: {}\n".format(cherrypy.request.method) +
+ " session: {}\n".format(cherrypy.session.get('fieldname')) +
+ " body:\n")
+ return_text += " length: {}\n".format(cherrypy.request.body.length)
+ if cherrypy.request.body.length:
+ return_text += " content: {}\n".format(
+ str(cherrypy.request.body.read(int(cherrypy.request.headers.get('Content-Length', 0)))))
+ if thread_info:
+ return_text += "thread: {}\n".format(thread_info)
+ return_text += "</pre></html>"
+ return return_text
+
+ @staticmethod
+ def _check_valid_url_method(method, *args):
+ if len(args) < 3:
+ raise RoException("URL must contain at least 'main_topic/version/topic'", HTTPStatus.METHOD_NOT_ALLOWED)
+
+ reference = valid_url_methods
+ for arg in args:
+ if arg is None:
+ break
+ if not isinstance(reference, dict):
+ raise RoException("URL contains unexpected extra items '{}'".format(arg),
+ HTTPStatus.METHOD_NOT_ALLOWED)
+
+ if arg in reference:
+ reference = reference[arg]
+ elif "<ID>" in reference:
+ reference = reference["<ID>"]
+ elif "*" in reference:
+ # reference = reference["*"]
+ break
+ else:
+ raise RoException("Unexpected URL item {}".format(arg), HTTPStatus.METHOD_NOT_ALLOWED)
+ if "TODO" in reference and method in reference["TODO"]:
+ raise RoException("Method {} not supported yet for this URL".format(method), HTTPStatus.NOT_IMPLEMENTED)
+ elif "METHODS" not in reference or method not in reference["METHODS"]:
+ raise RoException("Method {} not supported for this URL".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
+ return reference["ROLE_PERMISSION"] + method.lower()
+
+ @staticmethod
+ def _set_location_header(main_topic, version, topic, id):
+ """
+ Insert response header Location with the URL of created item base on URL params
+ :param main_topic:
+ :param version:
+ :param topic:
+ :param id:
+ :return: None
+ """
+ # Use cherrypy.request.base for absoluted path and make use of request.header HOST just in case behind aNAT
+ cherrypy.response.headers["Location"] = "/ro/{}/{}/{}/{}".format(main_topic, version, topic, id)
+ return
+
+ @cherrypy.expose
+ def default(self, main_topic=None, version=None, topic=None, _id=None, _id2=None, *args, **kwargs):
+ token_info = None
+ outdata = None
+ _format = None
+ method = "DONE"
+ rollback = []
+ engine_session = None
+ try:
+ if not main_topic or not version or not topic:
+ raise RoException("URL must contain at least 'main_topic/version/topic'",
+ HTTPStatus.METHOD_NOT_ALLOWED)
+ if main_topic not in ("admin", "ns",):
+ raise RoException("URL main_topic '{}' not supported".format(main_topic),
+ HTTPStatus.METHOD_NOT_ALLOWED)
+ if version != 'v1':
+ raise RoException("URL version '{}' not supported".format(version), HTTPStatus.METHOD_NOT_ALLOWED)
+
+ if kwargs and "METHOD" in kwargs and kwargs["METHOD"] in ("PUT", "POST", "DELETE", "GET", "PATCH"):
+ method = kwargs.pop("METHOD")
+ else:
+ method = cherrypy.request.method
+
+ role_permission = self._check_valid_url_method(method, main_topic, version, topic, _id, _id2, *args,
+ **kwargs)
+ # skip token validation if requesting a token
+ indata = self._format_in(kwargs)
+ if main_topic != "admin" or topic != "tokens":
+ token_info = self.authenticator.authorize(role_permission, _id)
+ outdata, created_id, done = self.map_operation[role_permission](
+ engine_session, indata, version, _id, _id2, *args, *kwargs)
+ if created_id:
+ self._set_location_header(main_topic, version, topic, _id)
+ cherrypy.response.status = HTTPStatus.ACCEPTED.value if not done else HTTPStatus.OK.value if \
+ outdata is not None else HTTPStatus.NO_CONTENT.value
+ return self._format_out(outdata, token_info, _format)
+ except Exception as e:
+ if isinstance(e, (RoException, NsException, DbException, FsException, MsgException, AuthException,
+ ValidationError)):
+ http_code_value = cherrypy.response.status = e.http_code.value
+ http_code_name = e.http_code.name
+ cherrypy.log("Exception {}".format(e))
+ else:
+ http_code_value = cherrypy.response.status = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
+ cherrypy.log("CRITICAL: Exception {}".format(e), traceback=True)
+ http_code_name = HTTPStatus.BAD_REQUEST.name
+ if hasattr(outdata, "close"): # is an open file
+ outdata.close()
+ error_text = str(e)
+ rollback.reverse()
+ for rollback_item in rollback:
+ try:
+ if rollback_item.get("operation") == "set":
+ self.ns.db.set_one(rollback_item["topic"], {"_id": rollback_item["_id"]},
+ rollback_item["content"], fail_on_empty=False)
+ else:
+ self.ns.db.del_one(rollback_item["topic"], {"_id": rollback_item["_id"]},
+ fail_on_empty=False)
+ except Exception as e2:
+ rollback_error_text = "Rollback Exception {}: {}".format(rollback_item, e2)
+ cherrypy.log(rollback_error_text)
+ error_text += ". " + rollback_error_text
+ # if isinstance(e, MsgException):
+ # error_text = "{} has been '{}' but other modules cannot be informed because an error on bus".format(
+ # engine_topic[:-1], method, error_text)
+ problem_details = {
+ "code": http_code_name,
+ "status": http_code_value,
+ "detail": error_text,
+ }
+ return self._format_out(problem_details, token_info)
+ # raise cherrypy.HTTPError(e.http_code.value, str(e))
+ finally:
+ if token_info:
+ if method in ("PUT", "PATCH", "POST") and isinstance(outdata, dict):
+ for logging_id in ("id", "op_id", "nsilcmop_id", "nslcmop_id"):
+ if outdata.get(logging_id):
+ cherrypy.request.login += ";{}={}".format(logging_id, outdata[logging_id][:36])
+
+
+def _start_service():
+ """
+ Callback function called when cherrypy.engine starts
+ Override configuration with env variables
+ Set database, storage, message configuration
+ Init database with admin/admin user password
+ """
+ global ro_server
+ # global vim_threads
+ cherrypy.log.error("Starting osm_ng_ro")
+ # update general cherrypy configuration
+ update_dict = {}
+
+ engine_config = cherrypy.tree.apps['/ro'].config
+ for k, v in environ.items():
+ if not k.startswith("OSMRO_"):
+ continue
+ k1, _, k2 = k[6:].lower().partition("_")
+ if not k2:
+ continue
+ try:
+ if k1 in ("server", "test", "auth", "log"):
+ # update [global] configuration
+ update_dict[k1 + '.' + k2] = yaml.safe_load(v)
+ elif k1 == "static":
+ # update [/static] configuration
+ engine_config["/static"]["tools.staticdir." + k2] = yaml.safe_load(v)
+ elif k1 == "tools":
+ # update [/] configuration
+ engine_config["/"]["tools." + k2.replace('_', '.')] = yaml.safe_load(v)
+ elif k1 in ("message", "database", "storage", "authentication"):
+ # update [message], [database], ... configuration
+ if k2 in ("port", "db_port"):
+ engine_config[k1][k2] = int(v)
+ else:
+ engine_config[k1][k2] = v
+
+ except Exception as e:
+ raise RoException("Cannot load env '{}': {}".format(k, e))
+
+ if update_dict:
+ cherrypy.config.update(update_dict)
+ engine_config["global"].update(update_dict)
+
+ # logging cherrypy
+ log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
+ log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
+ logger_server = logging.getLogger("cherrypy.error")
+ logger_access = logging.getLogger("cherrypy.access")
+ logger_cherry = logging.getLogger("cherrypy")
+ logger_nbi = logging.getLogger("ro")
+
+ if "log.file" in engine_config["global"]:
+ file_handler = logging.handlers.RotatingFileHandler(engine_config["global"]["log.file"],
+ maxBytes=100e6, backupCount=9, delay=0)
+ file_handler.setFormatter(log_formatter_simple)
+ logger_cherry.addHandler(file_handler)
+ logger_nbi.addHandler(file_handler)
+ # log always to standard output
+ for format_, logger in {"ro.server %(filename)s:%(lineno)s": logger_server,
+ "ro.access %(filename)s:%(lineno)s": logger_access,
+ "%(name)s %(filename)s:%(lineno)s": logger_nbi
+ }.items():
+ log_format_cherry = "%(asctime)s %(levelname)s {} %(message)s".format(format_)
+ log_formatter_cherry = logging.Formatter(log_format_cherry, datefmt='%Y-%m-%dT%H:%M:%S')
+ str_handler = logging.StreamHandler()
+ str_handler.setFormatter(log_formatter_cherry)
+ logger.addHandler(str_handler)
+
+ if engine_config["global"].get("log.level"):
+ logger_cherry.setLevel(engine_config["global"]["log.level"])
+ logger_nbi.setLevel(engine_config["global"]["log.level"])
+
+ # logging other modules
+ for k1, logname in {"message": "ro.msg", "database": "ro.db", "storage": "ro.fs"}.items():
+ engine_config[k1]["logger_name"] = logname
+ logger_module = logging.getLogger(logname)
+ if "logfile" in engine_config[k1]:
+ file_handler = logging.handlers.RotatingFileHandler(engine_config[k1]["logfile"],
+ maxBytes=100e6, backupCount=9, delay=0)
+ file_handler.setFormatter(log_formatter_simple)
+ logger_module.addHandler(file_handler)
+ if "loglevel" in engine_config[k1]:
+ logger_module.setLevel(engine_config[k1]["loglevel"])
+ # TODO add more entries, e.g.: storage
+
+ engine_config["assignment"] = {}
+ # ^ each VIM, SDNc will be assigned one worker id. Ns class will add items and VimThread will auto-assign
+ cherrypy.tree.apps['/ro'].root.ns.start(engine_config)
+ cherrypy.tree.apps['/ro'].root.authenticator.start(engine_config)
+ cherrypy.tree.apps['/ro'].root.ns.init_db(target_version=database_version)
+
+ # # start subscriptions thread:
+ # vim_threads = []
+ # for thread_id in range(engine_config["global"]["server.ns_threads"]):
+ # vim_thread = VimThread(thread_id, config=engine_config, engine=ro_server.ns)
+ # vim_thread.start()
+ # vim_threads.append(vim_thread)
+ # # Do not capture except SubscriptionException
+
+ backend = engine_config["authentication"]["backend"]
+ cherrypy.log.error("Starting OSM NBI Version '{} {}' with '{}' authentication backend"
+ .format(ro_version, ro_version_date, backend))
+
+
+def _stop_service():
+ """
+ Callback function called when cherrypy.engine stops
+ TODO: Ending database connections.
+ """
+ # global vim_threads
+ # if vim_threads:
+ # for vim_thread in vim_threads:
+ # vim_thread.terminate()
+ # vim_threads = None
+ cherrypy.tree.apps['/ro'].root.ns.stop()
+ cherrypy.log.error("Stopping osm_ng_ro")
+
+
+def ro_main(config_file):
+ global ro_server
+ ro_server = Server()
+ cherrypy.engine.subscribe('start', _start_service)
+ cherrypy.engine.subscribe('stop', _stop_service)
+ cherrypy.quickstart(ro_server, '/ro', config_file)
+
+
+def usage():
+ print("""Usage: {} [options]
+ -c|--config [configuration_file]: loads the configuration file (default: ./ro.cfg)
+ -h|--help: shows this help
+ """.format(sys.argv[0]))
+ # --log-socket-host HOST: send logs to this host")
+ # --log-socket-port PORT: send logs using this port (default: 9022)")
+
+
+if __name__ == '__main__':
+ try:
+ # load parameters and configuration
+ opts, args = getopt.getopt(sys.argv[1:], "hvc:", ["config=", "help"])
+ # TODO add "log-socket-host=", "log-socket-port=", "log-file="
+ config_file = None
+ for o, a in opts:
+ if o in ("-h", "--help"):
+ usage()
+ sys.exit()
+ elif o in ("-c", "--config"):
+ config_file = a
+ else:
+ assert False, "Unhandled option"
+ if config_file:
+ if not path.isfile(config_file):
+ print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
+ exit(1)
+ else:
+ for config_file in (path.dirname(__file__) + "/ro.cfg", "./ro.cfg", "/etc/osm/ro.cfg"):
+ if path.isfile(config_file):
+ break
+ else:
+ print("No configuration file 'ro.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
+ exit(1)
+ ro_main(config_file)
+ except getopt.GetoptError as e:
+ print(str(e), file=sys.stderr)
+ # usage()
+ exit(1)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from jsonschema import validate as js_v, exceptions as js_e
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+__version__ = "0.1"
+version_date = "Jun 2020"
+
+"""
+Validator of input data using JSON schemas
+"""
+
+# Basis schemas
+name_schema = {"type": "string", "minLength": 1, "maxLength": 255, "pattern": "^[^,;()'\"]+$"}
+string_schema = {"type": "string", "minLength": 1, "maxLength": 255}
+ssh_key_schema = {"type": "string", "minLength": 1}
+id_schema = {"type": "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"}
+bool_schema = {"type": "boolean"}
+null_schema = {"type": "null"}
+
+image_schema = {
+ "title": "image input validation",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ # TODO
+}
+
+flavor_schema = {
+ "title": "image input validation",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ # TODO
+}
+
+ns_schema = {
+ "title": "image input validation",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ # TODO
+}
+
+deploy_schema = {
+ "title": "deploy input validation",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "action_id": string_schema,
+ "name": name_schema,
+ "action": {"enum" ["inject_ssh_key"]},
+ "key": ssh_key_schema,
+ "user": name_schema,
+ "password": string_schema,
+ "vnf": {
+ "type": "object",
+ "properties": {
+ "_id": id_schema,
+ # TODO
+ },
+ "required": ["_id"],
+ "additionalProperties": True,
+ },
+ "image": {
+ "type": "array",
+ "minItems": 1,
+ "items": image_schema
+ },
+ "flavor": {
+ "type": "array",
+ "minItems": 1,
+ "items": flavor_schema
+ },
+ "ns": ns_schema,
+ },
+
+ "required": ["name"],
+ "additionalProperties": False
+}
+
+
+class ValidationError(Exception):
+ def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY):
+ self.http_code = http_code
+ Exception.__init__(self, message)
+
+
+def validate_input(indata, schema_to_use):
+ """
+ Validates input data against json schema
+ :param indata: user input data. Should be a dictionary
+ :param schema_to_use: jsonschema to test
+ :return: None if ok, raises ValidationError exception on error
+ """
+ try:
+ if schema_to_use:
+ js_v(indata, schema_to_use)
+ return None
+ except js_e.ValidationError as e:
+ if e.path:
+ error_pos = "at '" + ":".join(map(str, e.path)) + "'"
+ else:
+ error_pos = ""
+ raise ValidationError("Format error {} '{}' ".format(error_pos, e.message))
+ except js_e.SchemaError:
+ raise ValidationError("Bad json schema {}".format(schema_to_use), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
--- /dev/null
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+##
+
+PyYAML
+CherryPy==18.1.2
+osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
+requests
+cryptography
+osm-im @ git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im
+osm-ro-plugin @ git+https://osm.etsi.org/gerrit/osm/RO.git#egg=osm-ro-plugin&subdirectory=RO-plugin
+logutils
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright 2020 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from setuptools import setup, find_packages
+
+_name = "osm_ng_ro"
+_readme = "osm-ng-ro is the New Generation Resource Orchestrator for OSM"
+setup(
+ name=_name,
+ description='OSM Resource Orchestrator',
+ long_description=_readme,
+ version_command=('git describe --match v* --tags --long --dirty', 'pep440-git-full'),
+ author='ETSI OSM',
+ author_email='alfonso.tiernosepulveda@telefonica.com',
+ maintainer='Alfonso Tierno',
+ maintainer_email='alfonso.tiernosepulveda@telefonica.com',
+ url='https://osm.etsi.org/gitweb/?p=osm/RO.git;a=summary',
+ license='Apache 2.0',
+
+ packages=find_packages(exclude=["temp", "local"]),
+ include_package_data=True,
+ install_requires=[
+ 'CherryPy==18.1.2',
+ 'osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common',
+ 'jsonschema',
+ 'PyYAML',
+ 'requests',
+ 'cryptography',
+ 'osm-im @ git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im',
+ "osm-ro-plugin @ git+https://osm.etsi.org/gerrit/osm/RO.git#egg=osm-ro-plugin&subdirectory=RO-plugin",
+ ],
+ setup_requires=['setuptools-version-command'],
+)
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+[DEFAULT]
+X-Python3-Version : >= 3.5
+Depends3 : python3-osm-common, python3-osm-im, python3-cherrypy3, python3-yaml, python3-jsonschema,
+ python3-pip, python3-requests, python3-osm-ro-plugin
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[tox]
+envlist = flake8
+toxworkdir={toxinidir}/../.tox
+
+[testenv]
+usedevelop = True
+basepython = python3
+install_command = python3 -m pip install -r requirements.txt -U {opts} {packages}
+
+[testenv:flake8]
+basepython = python3
+deps = flake8
+commands = flake8 osm_ng_ro --max-line-length 120 \
+ --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,osm_im --ignore W291,W293,E226,E402,W504
+
+[testenv:build]
+basepython = python3
+deps = stdeb
+ setuptools-version-command
+commands = python3 setup.py --command-packages=stdeb.command bdist_deb
vim_id: filled/added by this function
floating_ip: True/False (or it can be None)
'cloud_config': (optional) dictionary with:
- 'key-pairs': (optional) list of strings with the public key to be inserted to the default user
- 'users': (optional) list of users to be inserted, each item is a dict with:
- 'name': (mandatory) user name,
- 'key-pairs': (optional) list of strings with the public key to be inserted to the user
- 'user-data': (optional) string is a text script to be passed directly to cloud-init
- 'config-files': (optional). List of files to be transferred. Each item is a dict with:
- 'dest': (mandatory) string with the destination absolute path
- 'encoding': (optional, by default text). Can be one of:
- 'b64', 'base64', 'gz', 'gz+b64', 'gz+base64', 'gzip+b64', 'gzip+base64'
- 'content' (mandatory): string with the content of the file
- 'permissions': (optional) string with file permissions, typically octal notation '0644'
- 'owner': (optional) file owner, string with the format 'owner:group'
- 'boot-data-drive': boolean to indicate if user-data must be passed using a boot drive (hard disk)
+ 'key-pairs': (optional) list of strings with the public key to be inserted to the default user
+ 'users': (optional) list of users to be inserted, each item is a dict with:
+ 'name': (mandatory) user name,
+ 'key-pairs': (optional) list of strings with the public key to be inserted to the user
+ 'user-data': (optional) string is a text script to be passed directly to cloud-init
+ 'config-files': (optional). List of files to be transferred. Each item is a dict with:
+ 'dest': (mandatory) string with the destination absolute path
+ 'encoding': (optional, by default text). Can be one of:
+ 'b64', 'base64', 'gz', 'gz+b64', 'gz+base64', 'gzip+b64', 'gzip+base64'
+ 'content' (mandatory): string with the content of the file
+ 'permissions': (optional) string with file permissions, typically octal notation '0644'
+ 'owner': (optional) file owner, string with the format 'owner:group'
+ 'boot-data-drive': boolean to indicate if user-data must be passed using a boot drive (hard disk)
'disk_list': (optional) list with additional disks to the VM. Each item is a dict with:
'image_id': (optional). VIM id of an existing image. If not provided an empty disk must be mounted
'size': (mandatory) string with the size of the disk in GB
[ -z "$RO_DB_OVIM_HOST" ] && export RO_DB_OVIM_HOST="$RO_DB_HOST"
[ -z "$RO_DB_OVIM_ROOT_PASSWORD" ] && export RO_DB_OVIM_ROOT_PASSWORD="$RO_DB_ROOT_PASSWORD"
+# IF OSMRO_SERVER_NG use new server that not need any database init
+[ -n "$OSMRO_SERVER_NG" ] && python3 -m osm_ng_ro.ro_main
+
+
function is_db_created() {
db_host=$1
db_port=$2