Feature 7184 New Generation RO 14/9214/2
authortierno <alfonso.tiernosepulveda@telefonica.com>
Fri, 24 Apr 2020 14:02:51 +0000 (14:02 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Mon, 29 Jun 2020 11:23:22 +0000 (11:23 +0000)
Generates the package python3-osm-ng-ro for a new RO server
One or other server is controlled by env OSMRO_NG

Change-Id: I1b563006eeb008d05b37d5116f9741dc4f12a9ba
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
20 files changed:
NG-RO/MANIFEST.in [new file with mode: 0644]
NG-RO/Makefile [new file with mode: 0644]
NG-RO/debian/python3-osm-ng-ro.postinst [new file with mode: 0755]
NG-RO/osm_ng_ro/__init__.py [new file with mode: 0644]
NG-RO/osm_ng_ro/html_out.py [new file with mode: 0644]
NG-RO/osm_ng_ro/html_public/OSM-logo.png [new file with mode: 0644]
NG-RO/osm_ng_ro/html_public/delete.png [new file with mode: 0644]
NG-RO/osm_ng_ro/html_public/login.js [new file with mode: 0755]
NG-RO/osm_ng_ro/html_public/style.css [new file with mode: 0644]
NG-RO/osm_ng_ro/ns.py [new file with mode: 0644]
NG-RO/osm_ng_ro/ns_thread.py [new file with mode: 0644]
NG-RO/osm_ng_ro/ro.cfg [new file with mode: 0644]
NG-RO/osm_ng_ro/ro_main.py [new file with mode: 0644]
NG-RO/osm_ng_ro/validation.py [new file with mode: 0644]
NG-RO/requirements.txt [new file with mode: 0644]
NG-RO/setup.py [new file with mode: 0644]
NG-RO/stdeb.cfg [new file with mode: 0644]
NG-RO/tox.ini [new file with mode: 0644]
RO-VIM-openstack/osm_rovim_openstack/vimconn_openstack.py
RO/osm_ro/scripts/RO-start.sh

diff --git a/NG-RO/MANIFEST.in b/NG-RO/MANIFEST.in
new file mode 100644 (file)
index 0000000..b774a75
--- /dev/null
@@ -0,0 +1,17 @@
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+##
+
+recursive-include osm_ng_ro *.py *.sh *.cfg *.yml
+recursive-include osm_ng_ro/html_public *
diff --git a/NG-RO/Makefile b/NG-RO/Makefile
new file mode 100644 (file)
index 0000000..ee09e5c
--- /dev/null
@@ -0,0 +1,24 @@
+# Copyright 2020 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+all: clean package
+
+clean:
+       rm -rf dist deb_dist osm_ng_ro-*.tar.gz osm_ng_ro.egg-info .eggs .temp-tox
+
+package:
+       python3 setup.py --command-packages=stdeb.command sdist_dsc
+       cp debian/python3-osm-ng-ro.postinst deb_dist/osm-ng-ro*/debian
+       cd deb_dist/osm-ng-ro*/ && dpkg-buildpackage -rfakeroot -uc -us
diff --git a/NG-RO/debian/python3-osm-ng-ro.postinst b/NG-RO/debian/python3-osm-ng-ro.postinst
new file mode 100755 (executable)
index 0000000..09b59d2
--- /dev/null
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: OSM_TECH@list.etsi.org
+##
+
+echo "POST INSTALL OSM-RO-NG"
+echo "Installing python dependencies via pip..."
+
+# python3 -m pip install -U pip
+python3 -m pip install cherrypy==18.1.2
+
diff --git a/NG-RO/osm_ng_ro/__init__.py b/NG-RO/osm_ng_ro/__init__.py
new file mode 100644 (file)
index 0000000..d2ac4c4
--- /dev/null
@@ -0,0 +1,23 @@
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+##
+
+version = '8.0.1.post0'
+version_date = '2020-06-29'
+
+# Obtain installed package version. Ignore if error, e.g. pkg_resources not installed
+try:
+    from pkg_resources import get_distribution
+    version = get_distribution("osm_ng_ro").version
+except Exception:
+    pass
diff --git a/NG-RO/osm_ng_ro/html_out.py b/NG-RO/osm_ng_ro/html_out.py
new file mode 100644 (file)
index 0000000..4059400
--- /dev/null
@@ -0,0 +1,182 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Contains html text in variables to make and html response
+"""
+
+import yaml
+from http import HTTPStatus
+from html import escape as html_escape
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+html_start = """
+ <!DOCTYPE html>
+<html>
+<head>
+  <link href="/ro/static/style.css" rel="stylesheet">
+<title>Welcome to OSM</title>
+</head>
+<body>
+  <div id="osm_topmenu">
+    <div>
+      <a href="https://osm.etsi.org"> <img src="/ro/static/OSM-logo.png" height="42" width="100"
+        style="vertical-align:middle"> </a>
+      <a>( {} )</a>
+      <a href="/ro/ns/v1/deploy">NSs </a>
+      <a href="/ro/admin/v1/k8srepos">K8s_repos </a>
+      <a href="/ro/admin/v1/tokens?METHOD=DELETE">logout </a>
+    </div>
+  </div>
+"""
+
+html_body = """
+<h1>{item}</h1>
+"""
+
+html_end = """
+</body>
+</html>
+"""
+
+html_body_error = "<h2> Error <pre>{}</pre> </h2>"
+
+
+html_auth2 = """
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
+<html>
+<head><META http-equiv="Content-Type" content="text/html; charset=UTF-8">
+  <link href="/ro/static/style.css" rel="stylesheet">
+  <title>OSM Login</title>
+</head>
+<body>
+  <div id="osm_header">
+    <div>
+      <a href="https://osm.etsi.org"> <h1><img src="/ro/static/OSM-logo.png" style="vertical-align:middle"></h1> </a>
+    </div>
+  </div>
+  <div id="osm_error_message">
+    <h1>{error}</h1>
+  </div>
+  <div class="gerritBody" id="osm_body">
+    <h1>Sign in to OSM</h1>
+    <form action="/ro/admin/v1/tokens" id="login_form" method="POST">
+      <table style="border: 0;">
+        <tr><th>Username</th><td><input id="f_user" name="username" size="25" tabindex="1" type="text"></td></tr>
+        <tr><th>Password</th><td><input id="f_pass" name="password" size="25" tabindex="2" type="password"></td></tr>
+        <tr><td><input tabindex="3" type="submit" value="Sign In"></td></tr>
+      </table>
+    </form>
+    <div style="clear: both; margin-top: 15px; padding-top: 2px; margin-bottom: 15px;">
+      <div id="osm_footer">
+        <div></div>
+      </div>
+    </div>
+  </div>
+  <script src="/ro/static/login.js"> </script>
+</body>
+</html>
+"""
+
+
+html_nslcmop_body = """
+<a href="/ro/nslcm/v1/ns_lcm_op_occs?nsInstanceId={id}">nslcm operations </a>
+<a href="/ro/nslcm/v1/vnf_instances?nsr-id-ref={id}">VNFRS </a>
+<form action="/ro/nslcm/v1/ns_instances/{id}/terminate" method="post" enctype="multipart/form-data">
+    <h3> <table style="border: 0;"> <tr>
+        <td> <input type="submit" value="Terminate"/> </td>
+    </tr> </table> </h3>
+</form>
+"""
+
+html_nsilcmop_body = """
+<a href="/ro/nsilcm/v1/nsi_lcm_op_occs?netsliceInstanceId={id}">nsilcm operations </a>
+<form action="/ro/nsilcm/v1/netslice_instances/{id}/terminate" method="post" enctype="multipart/form-data">
+    <h3> <table style="border: 0;"> <tr>
+        <td> <input type="submit" value="Terminate"/> </td>
+    </tr> </table> </h3>
+</form>
+"""
+
+
+def format(data, request, response, toke_info):
+    """
+    Format a nice html response, depending on the data
+    :param data:
+    :param request: cherrypy request
+    :param response: cherrypy response
+    :return: string with teh html response
+    """
+    response.headers["Content-Type"] = 'text/html'
+    if response.status == HTTPStatus.UNAUTHORIZED.value:
+        if response.headers.get("WWW-Authenticate") and request.config.get("auth.allow_basic_authentication"):
+            response.headers["WWW-Authenticate"] = "Basic" + response.headers["WWW-Authenticate"][6:]
+            return
+        else:
+            return html_auth2.format(error=data)
+    if request.path_info in ("/version", "/system"):
+        return "<pre>" + yaml.safe_dump(data, explicit_start=False, indent=4, default_flow_style=False) + "</pre>"
+    body = html_body.format(item=request.path_info)
+    if response.status and response.status > 202:
+        body += html_body_error.format(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False))
+    elif isinstance(data, (list, tuple)):
+        # if request.path_info == "/ns/v1/deploy":
+        #     body += html_upload_body.format(request.path_info + "_content", "VNFD")
+        # elif request.path_info == "/nsd/v1/ns_descriptors":
+        #     body += html_upload_body.format(request.path_info + "_content", "NSD")
+        # elif request.path_info == "/nst/v1/nst_templates":
+        #     body += html_upload_body.format(request.path_info + "_content", "NSTD")
+        for k in data:
+            if isinstance(k, dict):
+                data_id = k.pop("_id", None)
+            elif isinstance(k, str):
+                data_id = k
+            if request.path_info == "/ns/v1/deploy":
+                body += '<p> <a href="/ro/{url}/{id}?METHOD=DELETE"> <img src="/ro/static/delete.png" height="25"' \
+                        ' width="25"> </a><a href="/ro/{url}/{id}">{id}</a>: {t} </p>' \
+                    .format(url=request.path_info, id=data_id, t=html_escape(str(k)))
+            else:
+                body += '<p> <a href="/ro/{url}/{id}">{id}</a>: {t} </p>'.format(url=request.path_info, id=data_id,
+                                                                                 t=html_escape(str(k)))
+    elif isinstance(data, dict):
+        if "Location" in response.headers:
+            body += '<a href="{}"> show </a>'.format(response.headers["Location"])
+        else:
+            body += '<a href="/ro/{}?METHOD=DELETE"> <img src="/ro/static/delete.png" height="25" width="25"> </a>'\
+                .format(request.path_info[:request.path_info.rfind("/")])
+            if request.path_info.startswith("/nslcm/v1/ns_instances_content/") or \
+                    request.path_info.startswith("/nslcm/v1/ns_instances/"):
+                _id = request.path_info[request.path_info.rfind("/")+1:]
+                body += html_nslcmop_body.format(id=_id)
+            elif request.path_info.startswith("/nsilcm/v1/netslice_instances_content/") or \
+                    request.path_info.startswith("/nsilcm/v1/netslice_instances/"):
+                _id = request.path_info[request.path_info.rfind("/")+1:]
+                body += html_nsilcmop_body.format(id=_id)
+        body += "<pre>" + html_escape(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)) + \
+                "</pre>"
+    elif data is None:
+        if request.method == "DELETE" or "METHOD=DELETE" in request.query_string:
+            body += "<pre> deleted </pre>"
+    else:
+        body = html_escape(str(data))
+    user_text = "    "
+    if toke_info:
+        if toke_info.get("username"):
+            user_text += "user: {}".format(toke_info.get("username"))
+        if toke_info.get("project_id"):
+            user_text += ", project: {}".format(toke_info.get("project_name"))
+    return html_start.format(user_text) + body + html_end
+    # yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)
+    # tags=False,
+    # encoding='utf-8', allow_unicode=True)
diff --git a/NG-RO/osm_ng_ro/html_public/OSM-logo.png b/NG-RO/osm_ng_ro/html_public/OSM-logo.png
new file mode 100644 (file)
index 0000000..7de447c
Binary files /dev/null and b/NG-RO/osm_ng_ro/html_public/OSM-logo.png differ
diff --git a/NG-RO/osm_ng_ro/html_public/delete.png b/NG-RO/osm_ng_ro/html_public/delete.png
new file mode 100644 (file)
index 0000000..d8fc8e9
Binary files /dev/null and b/NG-RO/osm_ng_ro/html_public/delete.png differ
diff --git a/NG-RO/osm_ng_ro/html_public/login.js b/NG-RO/osm_ng_ro/html_public/login.js
new file mode 100755 (executable)
index 0000000..20c4c97
--- /dev/null
@@ -0,0 +1,30 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+    var login_form = document.getElementById('login_form');
+    var f_user = document.getElementById('f_user');
+    var f_pass = document.getElementById('f_pass');
+    f_user.onkeydown = function(e) {
+      if (e.keyCode == 13) {
+        f_pass.focus();
+        return false;
+      }
+    }
+    f_pass.onkeydown = function(e) {
+      if (e.keyCode == 13) {
+        login_form.submit();
+        return false;
+      }
+    }
+    f_user.focus();
+
diff --git a/NG-RO/osm_ng_ro/html_public/style.css b/NG-RO/osm_ng_ro/html_public/style.css
new file mode 100644 (file)
index 0000000..da3c296
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+Licensed under the Apache License, Version 2.0 (the "License"); you may
+not use this file except in compliance with the License. You may obtain
+a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+#osm_header{
+        display: block;
+        position: relative;
+        top: 0px;
+        left: 160px;
+        margin-bottom: -60px;
+        width: 140px;
+        padding-left: 17px;
+    }
+#osm_topmenu {
+        background: none;
+        position: relative;
+        top: 0px;
+        left: 10px;
+        margin-right: 10px;
+}
+#osm_error_message {
+        padding: 5px;
+        margin: 2em;
+        width: 200em;
+        color: red;
+        font-weight: bold;
+}
+
diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py
new file mode 100644 (file)
index 0000000..eda6c48
--- /dev/null
@@ -0,0 +1,806 @@
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import logging
+# import yaml
+from traceback import format_exc as traceback_format_exc
+from osm_ng_ro.ns_thread import NsWorker
+from osm_ng_ro.validation import validate_input, deploy_schema
+from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
+from osm_common.dbbase import DbException
+from osm_common.fsbase import FsException
+from osm_common.msgbase import MsgException
+from http import HTTPStatus
+from uuid import uuid4
+from threading import Lock
+from random import choice as random_choice
+from time import time
+from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
+from cryptography.hazmat.primitives import serialization as crypto_serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.backends import default_backend as crypto_default_backend
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+min_common_version = "0.1.16"
+
+
+class NsException(Exception):
+
+    def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
+        self.http_code = http_code
+        super(Exception, self).__init__(message)
+
+
+def get_process_id():
+    """
+    Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
+    will provide a random one
+    :return: Obtained ID
+    """
+    # Try getting docker id. If fails, get pid
+    try:
+        with open("/proc/self/cgroup", "r") as f:
+            text_id_ = f.readline()
+            _, _, text_id = text_id_.rpartition("/")
+            text_id = text_id.replace("\n", "")[:12]
+            if text_id:
+                return text_id
+    except Exception:
+        pass
+    # Return a random id
+    return "".join(random_choice("0123456789abcdef") for _ in range(12))
+
+
+def versiontuple(v):
+    """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
+    filled = []
+    for point in v.split("."):
+        filled.append(point.zfill(8))
+    return tuple(filled)
+
+
+class Ns(object):
+
+    def __init__(self):
+        self.db = None
+        self.fs = None
+        self.msg = None
+        self.config = None
+        # self.operations = None
+        self.logger = logging.getLogger("ro.ns")
+        self.map_topic = {}
+        self.write_lock = None
+        self.assignment = {}
+        self.next_worker = 0
+        self.plugins = {}
+        self.workers = []
+
+    def init_db(self, target_version):
+        pass
+
+    def start(self, config):
+        """
+        Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :param config: Configuration of db, storage, etc
+        :return: None
+        """
+        self.config = config
+        self.config["process_id"] = get_process_id()  # used for HA identity
+        # check right version of common
+        if versiontuple(common_version) < versiontuple(min_common_version):
+            raise NsException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
+                common_version, min_common_version))
+
+        try:
+            if not self.db:
+                if config["database"]["driver"] == "mongo":
+                    self.db = dbmongo.DbMongo()
+                    self.db.db_connect(config["database"])
+                elif config["database"]["driver"] == "memory":
+                    self.db = dbmemory.DbMemory()
+                    self.db.db_connect(config["database"])
+                else:
+                    raise NsException("Invalid configuration param '{}' at '[database]':'driver'".format(
+                        config["database"]["driver"]))
+            if not self.fs:
+                if config["storage"]["driver"] == "local":
+                    self.fs = fslocal.FsLocal()
+                    self.fs.fs_connect(config["storage"])
+                elif config["storage"]["driver"] == "mongo":
+                    self.fs = fsmongo.FsMongo()
+                    self.fs.fs_connect(config["storage"])
+                else:
+                    raise NsException("Invalid configuration param '{}' at '[storage]':'driver'".format(
+                        config["storage"]["driver"]))
+            if not self.msg:
+                if config["message"]["driver"] == "local":
+                    self.msg = msglocal.MsgLocal()
+                    self.msg.connect(config["message"])
+                elif config["message"]["driver"] == "kafka":
+                    self.msg = msgkafka.MsgKafka()
+                    self.msg.connect(config["message"])
+                else:
+                    raise NsException("Invalid configuration param '{}' at '[message]':'driver'".format(
+                        config["message"]["driver"]))
+
+            # TODO load workers to deal with exising database tasks
+
+            self.write_lock = Lock()
+        except (DbException, FsException, MsgException) as e:
+            raise NsException(str(e), http_code=e.http_code)
+
+    def stop(self):
+        try:
+            if self.db:
+                self.db.db_disconnect()
+            if self.fs:
+                self.fs.fs_disconnect()
+            if self.msg:
+                self.msg.disconnect()
+            self.write_lock = None
+        except (DbException, FsException, MsgException) as e:
+            raise NsException(str(e), http_code=e.http_code)
+        for worker in self.workers:
+            worker.insert_task(("terminate",))
+
+    def _create_worker(self, vim_account_id):
+        # TODO make use of the limit self.config["global"]["server.ns_threads"]
+        worker_id = next((i for i in range(len(self.workers)) if not self.workers[i].is_alive()), None)
+        if worker_id is None:
+            worker_id = len(self.workers)
+            self.workers.append(NsWorker(worker_id, self.config, self.plugins, self.db))
+            self.workers[worker_id].start()
+        self.workers[worker_id].insert_task(("load_vim", vim_account_id))
+        return worker_id
+
+    def _assign_vim(self, vim_account_id):
+        if vim_account_id not in self.assignment:
+            self.assignment[vim_account_id] = self._create_worker(vim_account_id)
+
+    def _get_cloud_init(self, where):
+        """
+
+        :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
+        :return:
+        """
+        vnfd_id, _, other = where.partition(":")
+        _type, _, name = other.partition(":")
+        vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+        if _type == "file":
+            base_folder = vnfd["_admin"]["storage"]
+            cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], name)
+            with self.fs.file_open(cloud_init_file, "r") as ci_file:
+                cloud_init_content = ci_file.read()
+        elif _type == "vdu":
+            cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"]
+        else:
+            raise NsException("Mismatch descriptor for cloud init: {}".format(where))
+        return cloud_init_content
+
+    def _parse_jinja2(self, cloud_init_content, params, context):
+        try:
+            env = Environment()
+            ast = env.parse(cloud_init_content)
+            mandatory_vars = meta.find_undeclared_variables(ast)
+            if mandatory_vars:
+                for var in mandatory_vars:
+                    if not params or var not in params:
+                        raise NsException(
+                            "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
+                            "inside the 'additionalParamsForVnf' block".format(var, context))
+            template = Template(cloud_init_content)
+            return template.render(params or {})
+
+        except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
+            raise NsException("Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(context, e))
+
+    def _create_db_ro_nsrs(self, nsr_id, now):
+        try:
+            key = rsa.generate_private_key(
+                backend=crypto_default_backend(),
+                public_exponent=65537,
+                key_size=2048
+            )
+            private_key = key.private_bytes(
+                crypto_serialization.Encoding.PEM,
+                crypto_serialization.PrivateFormat.PKCS8,
+                crypto_serialization.NoEncryption())
+            public_key = key.public_key().public_bytes(
+                crypto_serialization.Encoding.OpenSSH,
+                crypto_serialization.PublicFormat.OpenSSH
+            )
+            private_key = private_key.decode('utf8')
+            public_key = public_key.decode('utf8')
+        except Exception as e:
+            raise NsException("Cannot create ssh-keys: {}".format(e))
+
+        schema_version = "1.1"
+        private_key_encrypted = self.db.encrypt(private_key, schema_version=schema_version, salt=nsr_id)
+        db_content = {
+            "_id": nsr_id,
+            "_admin": {
+                "created": now,
+                "modified": now,
+                "schema_version": schema_version
+            },
+            "public_key": public_key,
+            "private_key": private_key_encrypted,
+            "actions": [],
+        }
+        self.db.create("ro_nsrs", db_content)
+        return db_content
+
+    def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
+        print("ns.deploy session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
+        validate_input(indata, deploy_schema)
+        action_id = indata.get("action_id", str(uuid4()))
+        task_index = 0
+        # get current deployment
+        db_nsr = None
+        # db_nslcmop = None
+        db_nsr_update = {}        # update operation on nsrs
+        db_vnfrs_update = {}
+        # db_nslcmop_update = {}        # update operation on nslcmops
+        db_vnfrs = {}     # vnf's info indexed by _id
+        vdu2cloud_init = {}
+        step = ''
+        logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
+        self.logger.debug(logging_text + "Enter")
+        try:
+            step = "Getting ns and vnfr record from db"
+            # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+            db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+            db_ro_tasks = []
+            db_new_tasks = []
+            # read from db: vnf's of this ns
+            step = "Getting vnfrs from db"
+            db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+            if not db_vnfrs_list:
+                raise NsException("Cannot obtain associated VNF for ns")
+            for vnfr in db_vnfrs_list:
+                db_vnfrs[vnfr["_id"]] = vnfr
+                db_vnfrs_update[vnfr["_id"]] = {}
+            now = time()
+            db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False)
+            if not db_ro_nsr:
+                db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
+            ro_nsr_public_key = db_ro_nsr["public_key"]
+
+            # check that action_id is not in the list of actions. Suffixed with :index
+            if action_id in db_ro_nsr["actions"]:
+                index = 1
+                while True:
+                    new_action_id = "{}:{}".format(action_id, index)
+                    if new_action_id not in db_ro_nsr["actions"]:
+                        action_id = new_action_id
+                        self.logger.debug(logging_text + "Changing action_id in use to {}".format(action_id))
+                        break
+                    index += 1
+
+            def _create_task(item, action, target_record, target_record_id, extra_dict=None):
+                nonlocal task_index
+                nonlocal action_id
+                nonlocal nsr_id
+
+                task = {
+                    "action_id": action_id,
+                    "nsr_id": nsr_id,
+                    "task_id": "{}:{}".format(action_id, task_index),
+                    "status": "SCHEDULED",
+                    "action": action,
+                    "item": item,
+                    "target_record": target_record,
+                    "target_record_id": target_record_id,
+                }
+                if extra_dict:
+                    task.update(extra_dict)   # params, find_params, depends_on
+                task_index += 1
+                return task
+
+            def _create_ro_task(vim_account_id, item, action, target_record, target_record_id, extra_dict=None):
+                nonlocal action_id
+                nonlocal task_index
+                nonlocal now
+
+                _id = action_id + ":" + str(task_index)
+                db_ro_task = {
+                    "_id": _id,
+                    "locked_by": None,
+                    "locked_at": 0.0,
+                    "target_id": "vim:" + vim_account_id,
+                    "vim_info": {
+                        "created": False,
+                        "created_items": None,
+                        "vim_id": None,
+                        "vim_name": None,
+                        "vim_status": None,
+                        "vim_details": None,
+                        "refresh_at": None,
+                    },
+                    "modified_at": now,
+                    "created_at": now,
+                    "to_check_at": now,
+                    "tasks": [_create_task(item, action, target_record, target_record_id, extra_dict)],
+                }
+                return db_ro_task
+
+            def _process_image_params(target_image, vim_info):
+                find_params = {}
+                if target_image.get("image"):
+                    find_params["filter_dict"] = {"name": target_image.get("image")}
+                if target_image.get("vim_image_id"):
+                    find_params["filter_dict"] = {"id": target_image.get("vim_image_id")}
+                if target_image.get("image_checksum"):
+                    find_params["filter_dict"] = {"checksum": target_image.get("image_checksum")}
+                return {"find_params": find_params}
+
+            def _process_flavor_params(target_flavor, vim_info):
+
+                def _get_resource_allocation_params(quota_descriptor):
+                    """
+                    read the quota_descriptor from vnfd and fetch the resource allocation properties from the
+                     descriptor object
+                    :param quota_descriptor: cpu/mem/vif/disk-io quota descriptor
+                    :return: quota params for limit, reserve, shares from the descriptor object
+                    """
+                    quota = {}
+                    if quota_descriptor.get("limit"):
+                        quota["limit"] = int(quota_descriptor["limit"])
+                    if quota_descriptor.get("reserve"):
+                        quota["reserve"] = int(quota_descriptor["reserve"])
+                    if quota_descriptor.get("shares"):
+                        quota["shares"] = int(quota_descriptor["shares"])
+                    return quota
+
+                flavor_data = {
+                    "disk": int(target_flavor["storage-gb"]),
+                    # "ram": max(int(target_flavor["memory-mb"]) // 1024, 1),
+                    # ^ TODO manage at vim_connectors MB instead of GB
+                    "ram": int(target_flavor["memory-mb"]),
+                    "vcpus": target_flavor["vcpu-count"],
+                }
+                if target_flavor.get("guest-epa"):
+                    extended = {}
+                    numa = {}
+                    epa_vcpu_set = False
+                    if target_flavor["guest-epa"].get("numa-node-policy"):
+                        numa_node_policy = target_flavor["guest-epa"].get("numa-node-policy")
+                        if numa_node_policy.get("node"):
+                            numa_node = numa_node_policy["node"][0]
+                            if numa_node.get("num-cores"):
+                                numa["cores"] = numa_node["num-cores"]
+                                epa_vcpu_set = True
+                            if numa_node.get("paired-threads"):
+                                if numa_node["paired-threads"].get("num-paired-threads"):
+                                    numa["paired-threads"] = int(numa_node["paired-threads"]["num-paired-threads"])
+                                    epa_vcpu_set = True
+                                if len(numa_node["paired-threads"].get("paired-thread-ids")):
+                                    numa["paired-threads-id"] = []
+                                    for pair in numa_node["paired-threads"]["paired-thread-ids"]:
+                                        numa["paired-threads-id"].append(
+                                            (str(pair["thread-a"]), str(pair["thread-b"]))
+                                        )
+                            if numa_node.get("num-threads"):
+                                numa["threads"] = int(numa_node["num-threads"])
+                                epa_vcpu_set = True
+                            if numa_node.get("memory-mb"):
+                                numa["memory"] = max(int(numa_node["memory-mb"] / 1024), 1)
+                    if target_flavor["guest-epa"].get("mempage-size"):
+                        extended["mempage-size"] = target_flavor["guest-epa"].get("mempage-size")
+                    if target_flavor["guest-epa"].get("cpu-pinning-policy") and not epa_vcpu_set:
+                        if target_flavor["guest-epa"]["cpu-pinning-policy"] == "DEDICATED":
+                            if target_flavor["guest-epa"].get("cpu-thread-pinning-policy") and \
+                                    target_flavor["guest-epa"]["cpu-thread-pinning-policy"] != "PREFER":
+                                numa["cores"] = max(flavor_data["vcpus"], 1)
+                            else:
+                                numa["threads"] = max(flavor_data["vcpus"], 1)
+                            epa_vcpu_set = True
+                    if target_flavor["guest-epa"].get("cpu-quota") and not epa_vcpu_set:
+                        cpuquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("cpu-quota"))
+                        if cpuquota:
+                            extended["cpu-quota"] = cpuquota
+                    if target_flavor["guest-epa"].get("mem-quota"):
+                        vduquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("mem-quota"))
+                        if vduquota:
+                            extended["mem-quota"] = vduquota
+                    if target_flavor["guest-epa"].get("disk-io-quota"):
+                        diskioquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("disk-io-quota"))
+                        if diskioquota:
+                            extended["disk-io-quota"] = diskioquota
+                    if target_flavor["guest-epa"].get("vif-quota"):
+                        vifquota = _get_resource_allocation_params(target_flavor["guest-epa"].get("vif-quota"))
+                        if vifquota:
+                            extended["vif-quota"] = vifquota
+                if numa:
+                    extended["numas"] = [numa]
+                if extended:
+                    flavor_data["extended"] = extended
+
+                extra_dict = {"find_params": {"flavor_data": flavor_data}}
+                flavor_data_name = flavor_data.copy()
+                flavor_data_name["name"] = target_flavor["name"]
+                extra_dict["params"] = {"flavor_data": flavor_data_name}
+                return extra_dict
+
+            def _process_net_params(target_vld, vim_info):
+                nonlocal indata
+                extra_dict = {}
+                if vim_info.get("vim_network_name"):
+                    extra_dict["find_params"] = {"filter_dict": {"name": vim_info.get("vim_network_name")}}
+                elif vim_info.get("vim_network_id"):
+                    extra_dict["find_params"] = {"filter_dict": {"id": vim_info.get("vim_network_id")}}
+                elif target_vld.get("mgmt-network"):
+                    extra_dict["find_params"] = {"mgmt": True, "name": target_vld["id"]}
+                else:
+                    # create
+                    extra_dict["params"] = {
+                        "net_name": "{}-{}".format(indata["name"][:16], target_vld.get("name", target_vld["id"])[:16]),
+                        "ip_profile": vim_info.get('ip_profile'),
+                        "provider_network_profile": vim_info.get('provider_network'),
+                    }
+                    if not target_vld.get("underlay"):
+                        extra_dict["params"]["net_type"] = "bridge"
+                    else:
+                        extra_dict["params"]["net_type"] = "ptp" if target_vld.get("type") == "ELINE" else "data"
+                return extra_dict
+
+            def _process_vdu_params(target_vdu, vim_info):
+                nonlocal vnfr_id
+                nonlocal nsr_id
+                nonlocal indata
+                nonlocal vnfr
+                nonlocal vdu2cloud_init
+                vnf_preffix = "vnfrs:{}".format(vnfr_id)
+                ns_preffix = "nsrs:{}".format(nsr_id)
+                image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
+                flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
+                extra_dict = {"depends_on": [image_text, flavor_text]}
+                net_list = []
+                for iface_index, interface in enumerate(target_vdu["interfaces"]):
+                    if interface.get("ns-vld-id"):
+                        net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
+                    else:
+                        net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
+                    extra_dict["depends_on"].append(net_text)
+                    net_item = {
+                        "name": interface["name"],
+                        "net_id": "TASK-" + net_text,
+                        "vpci": interface.get("vpci"),
+                        "type": "virtual",
+                        # TODO mac_address: used for  SR-IOV ifaces #TODO for other types
+                        # TODO floating_ip: True/False (or it can be None)
+                    }
+                    if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+                        net_item["use"] = "data"
+                        net_item["model"] = interface["type"]
+                        net_item["type"] = interface["type"]
+                    elif interface.get("type") == "OM-MGMT" or interface.get("mgmt-interface") or \
+                            interface.get("mgmt-vnf"):
+                        net_item["use"] = "mgmt"
+                    else:   # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
+                        net_item["use"] = "bridge"
+                        net_item["model"] = interface.get("type")
+                    net_list.append(net_item)
+                    if interface.get("mgmt-vnf"):
+                        extra_dict["mgmt_vnf_interface"] = iface_index
+                    elif interface.get("mgmt-interface"):
+                        extra_dict["mgmt_vdu_interface"] = iface_index
+
+                # cloud config
+                cloud_config = {}
+                if target_vdu.get("cloud-init"):
+                    if target_vdu["cloud-init"] not in vdu2cloud_init:
+                        vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init(target_vdu["cloud-init"])
+                    cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
+                    cloud_config["user-data"] = self._parse_jinja2(cloud_content_, target_vdu.get("additionalParams"),
+                                                                   target_vdu["cloud-init"])
+                if target_vdu.get("boot-data-drive"):
+                    cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
+                ssh_keys = []
+                if target_vdu.get("ssh-keys"):
+                    ssh_keys += target_vdu.get("ssh-keys")
+                if target_vdu.get("ssh-access-required"):
+                    ssh_keys.append(ro_nsr_public_key)
+                if ssh_keys:
+                    cloud_config["key-pairs"] = ssh_keys
+
+                extra_dict["params"] = {
+                    "name": "{}-{}-{}-{}".format(indata["name"][:16], vnfr["member-vnf-index-ref"][:16],
+                                                 target_vdu["vdu-name"][:32], target_vdu.get("count-index") or 0),
+                    "description": target_vdu["vdu-name"],
+                    "start": True,
+                    "image_id": "TASK-" + image_text,
+                    "flavor_id": "TASK-" + flavor_text,
+                    "net_list": net_list,
+                    "cloud_config": cloud_config or None,
+                    "disk_list": None,  # TODO
+                    "availability_zone_index": None,  # TODO
+                    "availability_zone_list": None,  # TODO
+                }
+                return extra_dict
+
+            def _process_items(target_list, existing_list, db_record, db_update, db_path, item, process_params):
+                nonlocal db_ro_tasks
+                nonlocal db_new_tasks
+                nonlocal task_index
+
+                # ensure all the target_list elements has an "id". If not assign the index
+                for target_index, tl in enumerate(target_list):
+                    if tl and not tl.get("id"):
+                        tl["id"] = str(target_index)
+
+                # step 1 networks to be deleted/updated
+                for vld_index, existing_vld in enumerate(existing_list):
+                    target_vld = next((vld for vld in target_list if vld["id"] == existing_vld["id"]), None)
+                    for existing_vim_index, existing_vim_info in enumerate(existing_vld.get("vim_info", ())):
+                        if not existing_vim_info:
+                            continue
+                        if target_vld:
+                            target_viminfo = next((target_viminfo for target_viminfo in target_vld.get("vim_info", ())
+                                                   if existing_vim_info["vim_account_id"] == target_viminfo[
+                                                       "vim_account_id"]), None)
+                        else:
+                            target_viminfo = None
+                        if not target_viminfo:
+                            # must be deleted
+                            self._assign_vim(existing_vim_info["vim_account_id"])
+                            db_new_tasks.append(_create_task(
+                                item, "DELETE",
+                                target_record="{}.{}.vim_info.{}".format(db_record, vld_index, existing_vim_index),
+                                target_record_id="{}.{}".format(db_record, existing_vld["id"])))
+                            # TODO delete
+                    # TODO check one by one the vims to be created/deleted
+
+                # step 2 networks to be created
+                for target_vld in target_list:
+                    vld_index = -1
+                    for vld_index, existing_vld in enumerate(existing_list):
+                        if existing_vld["id"] == target_vld["id"]:
+                            break
+                    else:
+                        vld_index += 1
+                        db_update[db_path + ".{}".format(vld_index)] = target_vld
+                        existing_list.append(target_vld)
+                        existing_vld = None
+
+                    for vim_index, vim_info in enumerate(target_vld["vim_info"]):
+                        existing_viminfo = None
+                        if existing_vld:
+                            existing_viminfo = next(
+                                (existing_viminfo for existing_viminfo in existing_vld.get("vim_info", ())
+                                 if vim_info["vim_account_id"] == existing_viminfo["vim_account_id"]), None)
+                        # TODO check if different. Delete and create???
+                        # TODO delete if not exist
+                        if existing_viminfo:
+                            continue
+
+                        extra_dict = process_params(target_vld, vim_info)
+
+                        self._assign_vim(vim_info["vim_account_id"])
+                        db_ro_tasks.append(_create_ro_task(
+                            vim_info["vim_account_id"], item, "CREATE",
+                            target_record="{}.{}.vim_info.{}".format(db_record, vld_index, vim_index),
+                            target_record_id="{}.{}".format(db_record, target_vld["id"]),
+                            extra_dict=extra_dict))
+
+                        db_update[db_path + ".{}".format(vld_index)] = target_vld
+
+            def _process_action(indata):
+                nonlocal db_ro_tasks
+                nonlocal db_new_tasks
+                nonlocal task_index
+                nonlocal db_vnfrs
+                nonlocal db_ro_nsr
+
+                if indata["action"] == "inject_ssh_key":
+                    key = indata.get("key")
+                    user = indata.get("user")
+                    password = indata.get("password")
+                    for vnf in indata.get("vnf", ()):
+                        if vnf.get("_id") not in db_vnfrs:
+                            raise NsException("Invalid vnf={}".format(vnf["_id"]))
+                        db_vnfr = db_vnfrs[vnf["_id"]]
+                        for target_vdu in vnf.get("vdur", ()):
+                            vdu_index, vdur = next((i_v for i_v in enumerate(db_vnfr["vdur"]) if
+                                                    i_v[1]["id"] == target_vdu["id"]), (None, None))
+                            if not vdur:
+                                raise NsException("Invalid vdu vnf={}.{}".format(vnf["_id"], target_vdu["id"]))
+                            vim_info = vdur["vim_info"][0]
+                            self._assign_vim(vim_info["vim_account_id"])
+                            target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(vnf["_id"], vdu_index)
+                            extra_dict = {
+                                "depends_on": ["vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])],
+                                "params": {
+                                    "ip_address": vdur.gt("ip_address"),
+                                    "user": user,
+                                    "key": key,
+                                    "password": password,
+                                    "private_key": db_ro_nsr["private_key"],
+                                    "salt": db_ro_nsr["_id"],
+                                    "schema_version": db_ro_nsr["_admin"]["schema_version"]
+                                }
+                            }
+                            db_ro_tasks.append(_create_ro_task(vim_info["vim_account_id"], "vdu", "EXEC",
+                                                               target_record=target_record,
+                                                               target_record_id=None,
+                                                               extra_dict=extra_dict))
+
+            with self.write_lock:
+                if indata.get("action"):
+                    _process_action(indata)
+                else:
+                    # compute network differences
+                    # NS.vld
+                    step = "process NS VLDs"
+                    _process_items(target_list=indata["ns"]["vld"] or [], existing_list=db_nsr.get("vld") or [],
+                                   db_record="nsrs:{}:vld".format(nsr_id), db_update=db_nsr_update,
+                                   db_path="vld", item="net", process_params=_process_net_params)
+
+                    step = "process NS images"
+                    _process_items(target_list=indata["image"] or [], existing_list=db_nsr.get("image") or [],
+                                   db_record="nsrs:{}:image".format(nsr_id),
+                                   db_update=db_nsr_update, db_path="image", item="image",
+                                   process_params=_process_image_params)
+
+                    step = "process NS flavors"
+                    _process_items(target_list=indata["flavor"] or [], existing_list=db_nsr.get("flavor") or [],
+                                   db_record="nsrs:{}:flavor".format(nsr_id),
+                                   db_update=db_nsr_update, db_path="flavor", item="flavor",
+                                   process_params=_process_flavor_params)
+
+                    # VNF.vld
+                    for vnfr_id, vnfr in db_vnfrs.items():
+                        # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
+                        step = "process VNF={} VLDs".format(vnfr_id)
+                        target_vnf = next((vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id), None)
+                        target_list = target_vnf.get("vld") if target_vnf else None
+                        _process_items(target_list=target_list or [], existing_list=vnfr.get("vld") or [],
+                                       db_record="vnfrs:{}:vld".format(vnfr_id), db_update=db_vnfrs_update[vnfr["_id"]],
+                                       db_path="vld", item="net", process_params=_process_net_params)
+
+                        target_list = target_vnf.get("vdur") if target_vnf else None
+                        step = "process VNF={} VDUs".format(vnfr_id)
+                        _process_items(target_list=target_list or [], existing_list=vnfr.get("vdur") or [],
+                                       db_record="vnfrs:{}:vdur".format(vnfr_id),
+                                       db_update=db_vnfrs_update[vnfr["_id"]], db_path="vdur", item="vdu",
+                                       process_params=_process_vdu_params)
+
+                step = "Updating database, Creating ro_tasks"
+                if db_ro_tasks:
+                    self.db.create_list("ro_tasks", db_ro_tasks)
+                step = "Updating database, Appending tasks to ro_tasks"
+                for task in db_new_tasks:
+                    if not self.db.set_one("ro_tasks", q_filter={"tasks.target_record": task["target_record"]},
+                                           update_dict={"to_check_at": now, "modified_at": now},
+                                           push={"tasks": task}, fail_on_empty=False):
+                        self.logger.error(logging_text + "Cannot find task for target_record={}".
+                                          format(task["target_record"]))
+                    # TODO something else appart from logging?
+                step = "Updating database, nsrs"
+                if db_nsr_update:
+                    self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update)
+                for vnfr_id, db_vnfr_update in db_vnfrs_update.items():
+                    if db_vnfr_update:
+                        step = "Updating database, vnfrs={}".format(vnfr_id)
+                        self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
+
+            self.logger.debug(logging_text + "Exit")
+            return {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, action_id, True
+
+        except Exception as e:
+            if isinstance(e, (DbException, NsException)):
+                self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
+            else:
+                e = traceback_format_exc()
+                self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(step, e), exc_info=True)
+            raise NsException(e)
+
+    def delete(self, session, indata, version, nsr_id, *args, **kwargs):
+        print("ns.delete session={} indata={} version={} nsr_id={}".format(session, indata, version, nsr_id))
+        # TODO del when ALL "tasks.nsr_id" are None of nsr_id
+        # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id})
+        retries = 5
+        for retry in range(retries):
+            with self.write_lock:
+                ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
+                if not ro_tasks:
+                    break
+                now = time()
+                conflict = False
+                for ro_task in ro_tasks:
+                    db_update = {}
+                    to_delete = True
+                    for index, task in enumerate(ro_task["tasks"]):
+                        if not task:
+                            pass
+                        elif task["nsr_id"] == nsr_id:
+                            db_update["tasks.{}".format(index)] = None
+                        else:
+                            to_delete = False  # used by other nsr, cannot be deleted
+                    # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
+                    if to_delete:
+                        if not self.db.del_one("ro_tasks",
+                                               q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+                                               fail_on_empty=False):
+                            conflict = True
+                    elif db_update:
+                        db_update["modified_at"] = now
+                        if not self.db.set_one("ro_tasks",
+                                               q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
+                                               update_dict=db_update,
+                                               fail_on_empty=False):
+                            conflict = True
+                if not conflict:
+                    break
+        else:
+            raise NsException("Exceeded {} retries".format(retries))
+
+        return None, None, True
+
+    def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+        print("ns.status session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version,
+                                                                                         nsr_id, action_id))
+        task_list = []
+        done = 0
+        total = 0
+        ro_tasks = self.db.get_list("ro_tasks", {"tasks.action_id": action_id})
+        global_status = "DONE"
+        details = []
+        for ro_task in ro_tasks:
+            for task in ro_task["tasks"]:
+                if task["action_id"] == action_id:
+                    task_list.append(task)
+                    total += 1
+                    if task["status"] == "FAILED":
+                        global_status = "FAILED"
+                        details.append(ro_task.get("vim_details", ''))
+                    elif task["status"] in ("SCHEDULED", "BUILD"):
+                        if global_status != "FAILED":
+                            global_status = "BUILD"
+                    else:
+                        done += 1
+        return_data = {
+            "status": global_status,
+            "details": ". ".join(details) if details else "progress {}/{}".format(done, total),
+            "nsr_id": nsr_id,
+            "action_id": action_id,
+            "tasks": task_list
+        }
+        return return_data, None, True
+
+    def cancel(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+        print("ns.cancel session={} indata={} version={} nsr_id={}, action_id={}".format(session, indata, version,
+                                                                                         nsr_id, action_id))
+        return None, None, True
+
+    def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+        nsrs = self.db.get_list("nsrs", {})
+        return_data = []
+        for ns in nsrs:
+            return_data.append({"_id": ns["_id"], "name": ns["name"]})
+        return return_data, None, True
+
+    def get_actions(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
+        ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
+        return_data = []
+        for ro_task in ro_tasks:
+            for task in ro_task["tasks"]:
+                if task["action_id"] not in return_data:
+                    return_data.append(task["action_id"])
+        return return_data, None, True
diff --git a/NG-RO/osm_ng_ro/ns_thread.py b/NG-RO/osm_ng_ro/ns_thread.py
new file mode 100644 (file)
index 0000000..0b96c53
--- /dev/null
@@ -0,0 +1,919 @@
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+##
+
+""""
+This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
+The tasks are stored at database in table ro_tasks
+A single ro_task refers to a VIM element (flavor, image, network, ...).
+A ro_task can contain several 'tasks', each one with a target, where to store the results
+"""
+
+import threading
+import time
+import queue
+import logging
+from pkg_resources import iter_entry_points
+# from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
+from osm_common.dbbase import DbException
+# from osm_common.fsbase import FsException
+# from osm_common.msgbase import MsgException
+from osm_ro_plugin.vim_dummy import VimDummyConnector
+from osm_ro_plugin import vimconn
+from copy import deepcopy
+from unittest.mock import Mock
+
+__author__ = "Alfonso Tierno"
+__date__ = "$28-Sep-2017 12:07:15$"
+
+
+def deep_get(target_dict, *args, **kwargs):
+    """
+    Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
+    Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
+    :param target_dict: dictionary to be read
+    :param args: list of keys to read from  target_dict
+    :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
+    :return: The wanted value if exist, None or default otherwise
+    """
+    for key in args:
+        if not isinstance(target_dict, dict) or key not in target_dict:
+            return kwargs.get("default")
+        target_dict = target_dict[key]
+    return target_dict
+
+
+class NsWorkerException(Exception):
+    pass
+
+
+class FailingConnector:
+    def __init__(self, error_msg):
+        self.error_msg = error_msg
+        for method in dir(vimconn.VimConnector):
+            if method[0] != "_":
+                setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
+
+
+class NsWorkerExceptionNotFound(NsWorkerException):
+    pass
+
+
+class NsWorker(threading.Thread):
+    REFRESH_BUILD = 5  # 5 seconds
+    REFRESH_ACTIVE = 60  # 1 minute
+    REFRESH_ERROR = 600
+    REFRESH_IMAGE = 3600 * 10
+    REFRESH_DELETE = 3600 * 10
+    QUEUE_SIZE = 2000
+    # TODO delete assigment_lock = Lock()
+    terminate = False
+    # TODO delete assignment = {}
+    MAX_TIME_LOCKED = 3600
+
+    def __init__(self, worker, config, plugins, db):
+        """Init a thread.
+        Arguments:
+            'id' number of thead
+            'name' name of thread
+            'host','user':  host ip or name to manage and user
+            'db', 'db_lock': database class and lock to use it in exclusion
+        """
+        threading.Thread.__init__(self)
+        self.config = config
+        self.plugins = plugins
+        self.plugin_name = "unknown"
+        self.logger = logging.getLogger('ro.worker{}'.format("worker"))
+        self.worker_id = worker
+        self.task_queue = queue.Queue(self.QUEUE_SIZE)
+        self.my_vims = {}   # targetvim: vimplugin class
+        self.db_vims = {}   # targetvim: vim information from database
+        self.vim_targets = []   # targetvim list
+        self.my_id = config["process_id"] + ":" + str(worker)
+        self.db = db
+        self.item2create = {
+            "net": self.new_net,
+            "vdu": self.new_vm,
+            "image": self.new_image,
+            "flavor": self.new_flavor,
+        }
+        self.item2refresh = {
+            "net": self.refresh_net,
+            "vdu": self.refresh_vm,
+            "image": self.refresh_ok,
+            "flavor": self.refresh_ok,
+        }
+        self.item2delete = {
+            "net": self.del_net,
+            "vdu": self.del_vm,
+            "image": self.delete_ok,
+            "flavor": self.del_flavor,
+        }
+        self.item2action = {
+            "vdu": self.exec_vm,
+        }
+        self.time_last_task_processed = None
+
+    def insert_task(self, task):
+        try:
+            self.task_queue.put(task, False)
+            return None
+        except queue.Full:
+            raise NsWorkerException("timeout inserting a task")
+
+    def terminate(self):
+        self.insert_task("exit")
+
+    def del_task(self, task):
+        with self.task_lock:
+            if task["status"] == "SCHEDULED":
+                task["status"] = "SUPERSEDED"
+                return True
+            else:  # task["status"] == "processing"
+                self.task_lock.release()
+                return False
+
+    def _load_plugin(self, name, type="vim"):
+        # type can be vim or sdn
+        if "rovim_dummy" not in self.plugins:
+            self.plugins["rovim_dummy"] = VimDummyConnector
+        if name in self.plugins:
+            return self.plugins[name]
+        try:
+            for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
+                self.plugins[name] = v.load()
+        except Exception as e:
+            self.logger.critical("Cannot load osm_{}: {}".format(name, e))
+            if name:
+                self.plugins[name] = FailingConnector("Cannot load osm_{}: {}".format(name, e))
+        if name and name not in self.plugins:
+            error_text = "Cannot load a module for {t} type '{n}'. The plugin 'osm_{n}' has not been" \
+                         " registered".format(t=type, n=name)
+            self.logger.critical(error_text)
+            self.plugins[name] = FailingConnector(error_text)
+
+        return self.plugins[name]
+
+    def _load_vim(self, vim_account_id):
+        target_id = "vim:" + vim_account_id
+        plugin_name = ""
+        vim = None
+        try:
+            step = "Getting vim={} from db".format(vim_account_id)
+            vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+
+            # if deep_get(vim, "config", "sdn-controller"):
+            #     step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
+            #     db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
+
+            step = "Decrypt password"
+            schema_version = vim.get("schema_version")
+            self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
+                                           schema_version=schema_version, salt=vim_account_id)
+
+            step = "Load plugin 'rovim_{}'".format(vim.get("vim_type"))
+            plugin_name = "rovim_" + vim["vim_type"]
+            vim_module_conn = self._load_plugin(plugin_name)
+            self.my_vims[target_id] = vim_module_conn(
+                uuid=vim['_id'], name=vim['name'],
+                tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
+                url=vim['vim_url'], url_admin=None,
+                user=vim['vim_user'], passwd=vim['vim_password'],
+                config=vim.get('config'), persistent_info={}
+            )
+            self.vim_targets.append(target_id)
+            self.db_vims[target_id] = vim
+            self.error_status = None
+            self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
+                vim_account_id, plugin_name))
+        except Exception as e:
+            self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {} {}".format(
+                vim_account_id, plugin_name, step, e))
+            self.db_vims[target_id] = vim or {}
+            self.my_vims[target_id] = FailingConnector(str(e))
+            self.error_status = "Error loading vimconnector: {}".format(e)
+
+    def _get_db_task(self):
+        """
+        Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
+        :return: None
+        """
+        now = time.time()
+        if not self.time_last_task_processed:
+            self.time_last_task_processed = now
+        try:
+            while True:
+                locked = self.db.set_one(
+                    "ro_tasks",
+                    q_filter={"target_id": self.vim_targets,
+                              "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                              "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                              "to_check_at.lt": self.time_last_task_processed},
+                    update_dict={"locked_by": self.my_id, "locked_at": now},
+                    fail_on_empty=False)
+                if locked:
+                    # read and return
+                    ro_task = self.db.get_one(
+                        "ro_tasks",
+                        q_filter={"target_id": self.vim_targets,
+                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                                  "locked_at": now})
+                    return ro_task
+                if self.time_last_task_processed == now:
+                    self.time_last_task_processed = None
+                    return None
+                else:
+                    self.time_last_task_processed = now
+                    # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
+
+        except DbException as e:
+            self.logger.error("Database exception at _get_db_task: {}".format(e))
+        except Exception as e:
+            self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
+        return None
+
+    def _delete_task(self, ro_task, task_index, task_depends, db_update):
+        """
+        Determine if this task need to be done or superseded
+        :return: None
+        """
+        my_task = ro_task["tasks"][task_index]
+        task_id = my_task["task_id"]
+        needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
+        if my_task["status"] == "FAILED":
+            return None, None  # TODO need to be retry??
+        try:
+            for index, task in enumerate(ro_task["tasks"]):
+                if index == task_index:
+                    continue  # own task
+                if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
+                    # set to finished
+                    db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
+                elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
+                    needed_delete = False
+            if needed_delete:
+                return self.item2delete[my_task["item"]](ro_task, task_index)
+            else:
+                return "SUPERSEDED", None
+        except Exception as e:
+            if not isinstance(e, NsWorkerException):
+                self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
+                                     exc_info=True)
+            return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+
+    def _create_task(self, ro_task, task_index, task_depends, db_update):
+        """
+        Determine if this task need to be created
+        :return: None
+        """
+        my_task = ro_task["tasks"][task_index]
+        task_id = my_task["task_id"]
+        task_status = None
+        if my_task["status"] == "FAILED":
+            return None, None  # TODO need to be retry??
+        elif my_task["status"] == "SCHEDULED":
+            # check if already created by another task
+            for index, task in enumerate(ro_task["tasks"]):
+                if index == task_index:
+                    continue  # own task
+                if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
+                    return task["status"], "COPY_VIM_INFO"
+
+            try:
+                task_status, ro_vim_item_update = self.item2create[my_task["item"]](ro_task, task_index, task_depends)
+                # TODO update other CREATE tasks
+            except Exception as e:
+                if not isinstance(e, NsWorkerException):
+                    self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
+                task_status = "FAILED"
+                ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+                # TODO update    ro_vim_item_update
+            return task_status, ro_vim_item_update
+        else:
+            return None, None
+
+    def _get_dependency(self, task_id, ro_task=None, target_id=None):
+        if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
+            ro_task_dependency = self.db.get_one(
+                "ro_tasks",
+                q_filter={"target_id": target_id,
+                          "tasks.target_record_id": task_id
+                          },
+                fail_on_empty=False)
+            if ro_task_dependency:
+                for task_index, task in enumerate(ro_task_dependency["tasks"]):
+                    if task["target_record_id"] == task_id:
+                        return ro_task_dependency, task_index
+
+        else:
+            if ro_task:
+                for task_index, task in enumerate(ro_task["tasks"]):
+                    if task["task_id"] == task_id:
+                        return ro_task, task_index
+            ro_task_dependency = self.db.get_one(
+                "ro_tasks",
+                q_filter={"tasks.ANYINDEX.task_id": task_id,
+                          "tasks.ANYINDEX.target_record.ne": None
+                          },
+                fail_on_empty=False)
+            if ro_task_dependency:
+                for task_index, task in ro_task_dependency["tasks"]:
+                    if task["task_id"] == task_id:
+                        return ro_task_dependency, task_index
+        raise NsWorkerException("Cannot get depending task {}".format(task_id))
+
+    def _proccess_pending_tasks(self, ro_task):
+        ro_task_id = ro_task["_id"]
+        now = time.time()
+        next_check_at = now + (24*60*60)   # one day
+        db_ro_task_update = {}
+
+        def _update_refresh(new_status):
+            # compute next_refresh
+            nonlocal task
+            nonlocal next_check_at
+            nonlocal db_ro_task_update
+            nonlocal ro_task
+
+            next_refresh = time.time()
+            if task["item"] in ("image", "flavor"):
+                next_refresh += self.REFRESH_IMAGE
+            elif new_status == "BUILD":
+                next_refresh += self.REFRESH_BUILD
+            elif new_status == "DONE":
+                next_refresh += self.REFRESH_ACTIVE
+            else:
+                next_refresh += self.REFRESH_ERROR
+            next_check_at = min(next_check_at, next_refresh)
+            db_ro_task_update["vim_info.refresh_at"] = next_refresh
+            ro_task["vim_info"]["refresh_at"] = next_refresh
+
+        try:
+            # 0 get task_status_create
+            task_status_create = None
+            task_create = next((t for t in ro_task["tasks"] if t["action"] == "CREATE" and
+                                t["status"] in ("BUILD", "DONE")), None)
+            if task_create:
+                task_status_create = task_create["status"]
+            # 1. look for SCHEDULED or if CREATE also DONE,BUILD
+            for task_action in ("DELETE", "CREATE", "EXEC"):
+                db_vim_update = None
+                for task_index, task in enumerate(ro_task["tasks"]):
+                    target_update = None
+                    if (task_action in ("DELETE", "EXEC") and task["status"] != "SCHEDULED") or\
+                            task["action"] != task_action or \
+                            (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
+                        continue
+                    task_path = "tasks.{}.status".format(task_index)
+                    try:
+                        if task["status"] == "SCHEDULED":
+                            task_depends = {}
+                            # check if tasks that this depends on have been completed
+                            dependency_not_completed = False
+                            for dependency_task_id in (task.get("depends_on") or ()):
+                                dependency_ro_task, dependency_task_index = \
+                                    self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
+                                dependency_task = dependency_ro_task["tasks"][dependency_task_index]
+                                if dependency_task["status"] == "SCHEDULED":
+                                    dependency_not_completed = True
+                                    next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
+                                    break
+                                elif dependency_task["status"] == "FAILED":
+                                    error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
+                                        task["action"], task["item"], dependency_task["action"],
+                                        dependency_task["item"], dependency_task_id,
+                                        dependency_ro_task["vim_info"].get("vim_details"))
+                                    self.logger.error("task={} {}".format(task["task_id"], error_text))
+                                    raise NsWorkerException(error_text)
+
+                                task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
+                                task_depends["TASK-{}".format(dependency_task_id)] = \
+                                    dependency_ro_task["vim_info"]["vim_id"]
+                            if dependency_not_completed:
+                                # TODO set at vim_info.vim_details that it is waiting
+                                continue
+
+                        if task["action"] == "DELETE":
+                            new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
+                                                                               task_depends, db_ro_task_update)
+                            new_status = "FINISHED" if new_status == "DONE" else new_status
+                            # ^with FINISHED instead of DONE it will not be refreshing
+                            if new_status in ("FINISHED", "SUPERSEDED"):
+                                target_update = "DELETE"
+                        elif task["action"] == "EXEC":
+                            self.item2action[task["item"]](ro_task, task_index, task_depends, db_ro_task_update)
+                            new_status = "FINISHED" if new_status == "DONE" else new_status
+                            # ^with FINISHED instead of DONE it will not be refreshing
+                            if new_status in ("FINISHED", "SUPERSEDED"):
+                                target_update = "DELETE"
+                        elif task["action"] == "CREATE":
+                            if task["status"] == "SCHEDULED":
+                                if task_status_create:
+                                    new_status = task_status_create
+                                    target_update = "COPY_VIM_INFO"
+                                else:
+                                    new_status, db_vim_info_update = \
+                                        self.item2create[task["item"]](ro_task, task_index, task_depends)
+                                    # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
+                                    _update_refresh(new_status)
+                            else:
+                                if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
+                                    new_status, db_vim_info_update = self.item2refresh[task["item"]](ro_task)
+                                    _update_refresh(new_status)
+                    except Exception as e:
+                        new_status = "FAILED"
+                        db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
+                        if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
+                            self.logger.error("Unexpected exception at _delete_task task={}: {}".
+                                              format(task["task_id"], e), exc_info=True)
+
+                    try:
+                        if db_vim_info_update:
+                            db_vim_update = db_vim_info_update.copy()
+                            db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
+                            ro_task["vim_info"].update(db_vim_info_update)
+
+                        if new_status:
+                            if task_action == "CREATE":
+                                task_status_create = new_status
+                            db_ro_task_update[task_path] = new_status
+                        if target_update or db_vim_update:
+
+                            if target_update == "DELETE":
+                                self._update_target(task, None)
+                            elif target_update == "COPY_VIM_INFO":
+                                self._update_target(task, ro_task["vim_info"])
+                            else:
+                                self._update_target(task, db_vim_update)
+
+                    except Exception as e:
+                        self.logger.error("Unexpected exception at _update_target task={}: {}".
+                                          format(task["task_id"], e), exc_info=True)
+
+            # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
+            # outside this task (by ro_nbi) do not update it
+            db_ro_task_update["locked_by"] = None
+            # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
+            db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
+            db_ro_task_update["to_check_at"] = next_check_at
+            if not self.db.set_one("ro_tasks",
+                                   update_dict=db_ro_task_update,
+                                   q_filter={"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"]},
+                                   fail_on_empty=False):
+                del db_ro_task_update["to_check_at"]
+                self.db.set_one("ro_tasks",
+                                q_filter={"_id": ro_task["_id"]},
+                                update_dict=db_ro_task_update,
+                                fail_on_empty=True)
+        except DbException as e:
+            self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
+        except Exception as e:
+            self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
+
+    def _update_target(self, task, ro_vim_item_update):
+        try:
+            table, _id, path = task["target_record"].split(":")
+            if ro_vim_item_update:
+                update_dict = {path + "." + k: v for k, v in ro_vim_item_update.items() if k in
+                               ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
+                if ro_vim_item_update.get("interfaces"):
+                    path_vdu = path[:path.rfind(".")]
+                    path_vdu = path_vdu[:path_vdu.rfind(".")]
+                    path_interfaces = path_vdu + ".interfaces"
+                    for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
+                        if iface:
+                            update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
+                                                k in ('ip_address', 'mac_address', 'vlan', 'compute_node', 'pci')})
+                            if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
+                                update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
+                            if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
+                                update_dict[path_vdu + ".ip-address"] = iface.get("ip_address").split(";")[0]
+
+                self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
+            else:
+                self.db.set_one(table, q_filter={"_id": _id}, update_dict=None,
+                                unset={path: None})
+        except DbException as e:
+            self.logger.error("Cannot update database '{}': '{}'".format(task["target_record"], e))
+
+    def new_image(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            if task.get("find_params"):
+                vim_images = target_vim.get_image_list(**task["find_params"])
+                if not vim_images:
+                    raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
+                        task["find_params"]))
+                elif len(vim_images) > 1:
+                    raise NsWorkerException(
+                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
+                else:
+                    vim_image_id = vim_images[0]["id"]
+
+            ro_vim_item_update = {"vim_id": vim_image_id,
+                                  "vim_status": "DONE",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
+            return "DONE", ro_vim_item_update
+        except (NsWorkerException, vimconn.VimConnException) as e:
+            self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def del_flavor(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        flavor_vim_id = ro_task["vim_info"]["vim_id"]
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
+        try:
+            if flavor_vim_id:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_flavor(flavor_vim_id)
+
+        except vimconn.VimConnNotFoundException:
+            ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+        except vimconn.VimConnException as e:
+            self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
+                ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "vim_details": "Error while deleting: {}".format(e)}
+            return "FAILED", ro_vim_item_update
+
+        self.logger.debug("task={} {} del-flavor={} {}".format(
+            task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
+
+    def refresh_ok(self, ro_task):
+        """skip calling VIM to get image status. Assumes ok"""
+        if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
+            return "FAILED", {}
+        return "DONE", {}
+
+    def delete_ok(self, ro_task):
+        """skip calling VIM to delete image status. Assumes ok"""
+        return "DONE", {}
+
+    def new_flavor(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            vim_flavor_id = None
+            if task.get("find_params"):
+                try:
+                    flavor_data = task["find_params"]["flavor_data"]
+                    vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
+                except vimconn.VimConnNotFoundException:
+                    pass
+
+            if not vim_flavor_id and task.get("params"):
+                # CREATE
+                flavor_data = task["params"]["flavor_data"]
+                vim_flavor_id = target_vim.new_flavor(flavor_data)
+                created = True
+
+            ro_vim_item_update = {"vim_id": vim_flavor_id,
+                                  "vim_status": "DONE",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
+            return "DONE", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def new_net(self, ro_task, task_index, task_depends):
+        vim_net_id = None
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            # FIND
+            if task.get("find_params"):
+                # if management, get configuration of VIM
+                if task["find_params"].get("filter_dict"):
+                    vim_filter = task["find_params"]["filter_dict"]
+                elif task["find_params"].get("mgmt"):   # mamagement network
+                    if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
+                        vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
+                    elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
+                        vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
+                    else:
+                        vim_filter = {"name": task["find_params"]["name"]}
+                else:
+                    raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
+
+                vim_nets = target_vim.get_network_list(vim_filter)
+                if not vim_nets and not task.get("params"):
+                    raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
+                        task.get("find_params")))
+                elif len(vim_nets) > 1:
+                    raise NsWorkerException(
+                        "More than one network found with this criteria: '{}'".format(task["find_params"]))
+                if vim_nets:
+                    vim_net_id = vim_nets[0]["id"]
+            else:
+                # CREATE
+                params = task["params"]
+                vim_net_id, created_items = target_vim.new_network(**params)
+                created = True
+
+            ro_vim_item_update = {"vim_id": vim_net_id,
+                                  "vim_status": "BUILD",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None}
+            self.logger.debug(
+                "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
+            return "BUILD", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def refresh_net(self, ro_task):
+        """Call VIM to get network status"""
+        ro_task_id = ro_task["_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+
+        vim_id = ro_task["vim_info"]["vim_id"]
+        net_to_refresh_list = [vim_id]
+        try:
+            vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
+            vim_info = vim_dict[vim_id]
+            if vim_info["status"] == "ACTIVE":
+                task_status = "DONE"
+            elif vim_info["status"] == "BUILD":
+                task_status = "BUILD"
+            else:
+                task_status = "FAILED"
+        except vimconn.VimConnException as e:
+            # Mark all tasks at VIM_ERROR status
+            self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+            vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+            task_status = "FAILED"
+
+        ro_vim_item_update = {}
+        if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+            ro_vim_item_update["vim_status"] = vim_info["status"]
+        if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+            ro_vim_item_update["vim_name"] = vim_info.get("name")
+        if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+            if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
+                ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+        elif vim_info["status"] == "DELETED":
+            ro_vim_item_update["vim_id"] = None
+            ro_vim_item_update["vim_details"] = "Deleted externally"
+        else:
+            if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+                ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+        if ro_vim_item_update:
+            self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
+                ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+                ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+        return task_status, ro_vim_item_update
+
+    def del_net(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        net_vim_id = ro_task["vim_info"]["vim_id"]
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
+        try:
+            if net_vim_id or ro_task["vim_info"]["created_items"]:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
+
+        except vimconn.VimConnNotFoundException:
+            ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+        except vimconn.VimConnException as e:
+            self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
+                                                                        net_vim_id, e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "vim_details": "Error while deleting: {}".format(e)}
+            return "FAILED", ro_vim_item_update
+
+        self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
+                                                            ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
+
+    def new_vm(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        created = False
+        created_items = {}
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            created = True
+            params = task["params"]
+            params_copy = deepcopy(params)
+            net_list = params_copy["net_list"]
+            for net in net_list:
+                if "net_id" in net and net["net_id"].startswith("TASK-"):  # change task_id into network_id
+                    network_id = task_depends[net["net_id"]]
+                    if not network_id:
+                        raise NsWorkerException("Cannot create VM because depends on a network not created or found "
+                                                "for {}".format(net["net_id"]))
+                    net["net_id"] = network_id
+            if params_copy["image_id"].startswith("TASK-"):
+                params_copy["image_id"] = task_depends[params_copy["image_id"]]
+            if params_copy["flavor_id"].startswith("TASK-"):
+                params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
+
+            vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
+            interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
+
+            ro_vim_item_update = {"vim_id": vim_vm_id,
+                                  "vim_status": "BUILD",
+                                  "created": created,
+                                  "created_items": created_items,
+                                  "vim_details": None,
+                                  "interfaces_vim_ids": interfaces,
+                                  "interfaces": [],
+                                  }
+            self.logger.debug(
+                "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
+            return "BUILD", ro_vim_item_update
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "created": created,
+                                  "vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def del_vm(self, ro_task, task_index):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        vm_vim_id = ro_task["vim_info"]["vim_id"]
+        ro_vim_item_update_ok = {"vim_status": "DELETED",
+                                 "created": False,
+                                 "vim_details": "DELETED",
+                                 "vim_id": None}
+        try:
+            if vm_vim_id or ro_task["vim_info"]["created_items"]:
+                target_vim = self.my_vims[ro_task["target_id"]]
+                target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
+
+        except vimconn.VimConnNotFoundException:
+            ro_vim_item_update_ok["vim_details"] = "already deleted"
+
+        except vimconn.VimConnException as e:
+            self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
+                                                                       vm_vim_id, e))
+            ro_vim_item_update = {"vim_status": "VIM_ERROR",
+                                  "vim_details": "Error while deleting: {}".format(e)}
+            return "FAILED", ro_vim_item_update
+
+        self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
+                                                           ro_vim_item_update_ok.get("vim_details", "")))
+        return "DONE", ro_vim_item_update_ok
+
+    def refresh_vm(self, ro_task):
+        """Call VIM to get vm status"""
+        ro_task_id = ro_task["_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+
+        vim_id = ro_task["vim_info"]["vim_id"]
+        if not vim_id:
+            return None, None
+        vm_to_refresh_list = [vim_id]
+        try:
+            vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
+            vim_info = vim_dict[vim_id]
+            if vim_info["status"] == "ACTIVE":
+                task_status = "DONE"
+            elif vim_info["status"] == "BUILD":
+                task_status = "BUILD"
+            else:
+                task_status = "FAILED"
+        except vimconn.VimConnException as e:
+            # Mark all tasks at VIM_ERROR status
+            self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
+            vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+            task_status = "FAILED"
+
+        ro_vim_item_update = {}
+        # TODO check and update interfaces
+        vim_interfaces = []
+        for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
+            iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]), None)
+            # if iface:
+            #     iface.pop("vim_info", None)
+            vim_interfaces.append(iface)
+
+        task = ro_task["tasks"][0]  # TODO look for a task CREATE and active
+        if task.get("mgmt_vnf_interface") is not None:
+            vim_interfaces[task["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
+        mgmt_vdu_iface = task.get("mgmt_vdu_interface", task.get("mgmt_vnf_interface", 0))
+        vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
+
+        if ro_task["vim_info"]["interfaces"] != vim_interfaces:
+            ro_vim_item_update["interfaces"] = vim_interfaces
+        if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
+            ro_vim_item_update["vim_status"] = vim_info["status"]
+        if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
+            ro_vim_item_update["vim_name"] = vim_info.get("name")
+        if vim_info["status"] in ("ERROR", "VIM_ERROR"):
+            if ro_task["vim_info"]["vim_details"] != vim_info["error_msg"]:
+                ro_vim_item_update["vim_details"] = vim_info["error_msg"]
+        elif vim_info["status"] == "DELETED":
+            ro_vim_item_update["vim_id"] = None
+            ro_vim_item_update["vim_details"] = "Deleted externally"
+        else:
+            if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
+                ro_vim_item_update["vim_details"] = vim_info["vim_info"]
+        if ro_vim_item_update:
+            self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
+                ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
+                ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
+        return task_status, ro_vim_item_update
+
+    def exec_vm(self, ro_task, task_index, task_depends):
+        task = ro_task["tasks"][task_index]
+        task_id = task["task_id"]
+        target_vim = self.my_vims[ro_task["target_id"]]
+        try:
+            params = task["params"]
+            params_copy = deepcopy(params)
+            params_copy["use_pri_key"] = self.db.decrypt(params_copy.pop("private_key"),
+                                                         params_copy.pop("schema_version"), params_copy.pop("salt"))
+
+            target_vim.inject_user_key(**params_copy)
+            self.logger.debug(
+                "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
+            return "DONE", params_copy["key"]
+        except (vimconn.VimConnException, NsWorkerException) as e:
+            self.logger.error("task={} vim={} new-vm: {}".format(task_id, ro_task["target_id"], e))
+            ro_vim_item_update = {"vim_details": str(e)}
+            return "FAILED", ro_vim_item_update
+
+    def run(self):
+        # load database
+        self.logger.debug("Starting")
+        while True:
+            try:
+                task = self.task_queue.get(block=False if self.my_vims else True)
+                if task[0] == "terminate":
+                    break
+                if task[0] == "load_vim":
+                    self._load_vim(task[1])
+                continue
+            except queue.Empty:
+                pass
+
+            try:
+                busy = False
+                ro_task = self._get_db_task()
+                if ro_task:
+                    self._proccess_pending_tasks(ro_task)
+                    busy = True
+                if not busy:
+                    time.sleep(5)
+            except Exception as e:
+                self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
+
+        self.logger.debug("Finishing")
diff --git a/NG-RO/osm_ng_ro/ro.cfg b/NG-RO/osm_ng_ro/ro.cfg
new file mode 100644 (file)
index 0000000..4af2830
--- /dev/null
@@ -0,0 +1,93 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[/]
+# tools.secureheaders.on = True
+tools.sessions.on = True
+# increase security on sessions
+tools.sessions.secure = True
+tools.sessions.httponly = True
+tools.encode.on: True,
+tools.encode.encoding: 'utf-8'
+tools.response_headers.on = True
+
+# tools.auth_basic.on: True,
+# tools.auth_basic.realm: 'localhost',
+# tools.auth_basic.checkpassword: get_tokens
+
+
+[/static]
+# use env OSMRO_STATIC_ON, OSMRO_STATIC_DIR to override
+tools.staticdir.on: True
+tools.staticdir.dir: "/app/RO/RO-NG/osm_ng_ro/html_public"
+
+
+[global]
+# use env OSMRO_SERVER_XXX, OSMRO_LOG_XXX, OSMRO_TEST_XXX or OSMRO_AUTH_XXX to override. Use value in yaml format
+server.socket_host: "0.0.0.0"
+server.socket_port: 9998
+
+# server.ssl_module: "builtin"
+# server.ssl_certificate: "./http/cert.pem"
+# server.ssl_private_key: "./http/privkey.pem"
+# server.ssl_pass_phrase: "osm4u"
+server.thread_pool: 10
+server.ns_threads: 1
+
+# Uncomment for allow basic authentication apart from bearer
+# auth.allow_basic_authentication: True
+
+# comment or set to False to disable /test URL
+server.enable_test: True
+
+log.screen: False
+log.access_file: ""
+log.error_file: ""
+
+log.level: "DEBUG"
+#log.file: /var/log/osm/ro.log
+
+
+[database]
+# use env OSMRO_DATABASE_XXX to override
+driver: "mongo"            # mongo or memory
+uri:    "mongodb://mongo:27017"
+name: "osm"
+# user: "user"
+# password: "password"
+# commonkey: "commonkey"
+
+[storage]
+# use env OSMRO_STORAGE_XXX to override
+driver: "local"            # local filesystem
+# for local provide file path
+path: "/app/storage"       #"/home/atierno/OSM/osm/NBI/local/storage"
+
+loglevel:  "DEBUG"
+#logfile: /var/log/osm/ro-storage.log
+
+[message]
+# use env OSMRO_MESSAGE_XXX to override
+driver: "kafka"             # local or kafka
+# for local provide file path
+path: "/app/storage/kafka"
+host: "kafka"
+port: 9092
+
+loglevel:  "DEBUG"
+#logfile: /var/log/osm/ro-message.log
+group_id: "ro-server"
+
+[authentication]
+# use env OSMRO_AUTHENTICATION_XXX to override
+
diff --git a/NG-RO/osm_ng_ro/ro_main.py b/NG-RO/osm_ng_ro/ro_main.py
new file mode 100644 (file)
index 0000000..35a93fe
--- /dev/null
@@ -0,0 +1,740 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import cherrypy
+import time
+import json
+import yaml
+import osm_ng_ro.html_out as html
+import logging
+import logging.handlers
+import getopt
+import sys
+
+from osm_ng_ro.ns import Ns, NsException
+from osm_ng_ro.validation import ValidationError
+from osm_common.dbbase import DbException
+from osm_common.fsbase import FsException
+from osm_common.msgbase import MsgException
+from http import HTTPStatus
+from codecs import getreader
+from os import environ, path
+from osm_ng_ro import version as ro_version, version_date as ro_version_date
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+__version__ = "0.1."    # file version, not NBI version
+version_date = "May 2020"
+
+database_version = '1.2'
+auth_database_version = '1.0'
+ro_server = None           # instance of Server class
+# vim_threads = None  # instance of VimThread class
+
+"""
+RO North Bound Interface
+URL: /ro                                                       GET     POST    PUT     DELETE  PATCH
+        /ns/v1/deploy                                           O
+            /<nsrs_id>                                          O       O               O
+                /<action_id>                                    O
+                    /cancel                                             O
+
+"""
+
+valid_query_string = ("ADMIN", "SET_PROJECT", "FORCE", "PUBLIC")
+# ^ Contains possible administrative query string words:
+#     ADMIN=True(by default)|Project|Project-list:  See all elements, or elements of a project
+#           (not owned by my session project).
+#     PUBLIC=True(by default)|False: See/hide public elements. Set/Unset a topic to be public
+#     FORCE=True(by default)|False: Force edition/deletion operations
+#     SET_PROJECT=Project|Project-list: Add/Delete the topic to the projects portfolio
+
+valid_url_methods = {
+    # contains allowed URL and methods, and the role_permission name
+    "admin": {
+        "v1": {
+            "tokens": {
+                "METHODS": ("POST",),
+                "ROLE_PERMISSION": "tokens:",
+                "<ID>": {
+                    "METHODS": ("DELETE",),
+                    "ROLE_PERMISSION": "tokens:id:"
+                }
+            },
+        }
+    },
+    "ns": {
+        "v1": {
+            "deploy": {
+                "METHODS": ("GET",),
+                "ROLE_PERMISSION": "deploy:",
+                "<ID>": {
+                    "METHODS": ("GET", "POST", "DELETE"),
+                    "ROLE_PERMISSION": "deploy:id:",
+                    "<ID>": {
+                        "METHODS": ("GET",),
+                        "ROLE_PERMISSION": "deploy:id:id:",
+                        "cancel": {
+                            "METHODS": ("POST",),
+                            "ROLE_PERMISSION": "deploy:id:id:cancel",
+                        }
+                    }
+                }
+            },
+        }
+    },
+}
+
+
+class RoException(Exception):
+
+    def __init__(self, message, http_code=HTTPStatus.METHOD_NOT_ALLOWED):
+        Exception.__init__(self, message)
+        self.http_code = http_code
+
+
+class AuthException(RoException):
+    pass
+
+
+class Authenticator:
+    
+    def __init__(self, valid_url_methods, valid_query_string):
+        self.valid_url_methods = valid_url_methods
+        self.valid_query_string = valid_query_string
+
+    def authorize(self, *args, **kwargs):
+        return {"token": "ok", "id": "ok"}
+    
+    def new_token(self, token_info, indata, remote):
+        return {"token": "ok",
+                "id": "ok",
+                "remote": remote}
+
+    def del_token(self, token_id):
+        pass
+
+    def start(self, engine_config):
+        pass
+
+
+class Server(object):
+    instance = 0
+    # to decode bytes to str
+    reader = getreader("utf-8")
+
+    def __init__(self):
+        self.instance += 1
+        self.authenticator = Authenticator(valid_url_methods, valid_query_string)
+        self.ns = Ns()
+        self.map_operation = {
+            "token:post": self.new_token,
+            "token:id:delete": self.del_token,
+            "deploy:get": self.ns.get_deploy,
+            "deploy:id:get": self.ns.get_actions,
+            "deploy:id:post": self.ns.deploy,
+            "deploy:id:delete": self.ns.delete,
+            "deploy:id:id:get": self.ns.status,
+            "deploy:id:id:cancel:post": self.ns.cancel,
+        }
+
+    def _format_in(self, kwargs):
+        try:
+            indata = None
+            if cherrypy.request.body.length:
+                error_text = "Invalid input format "
+
+                if "Content-Type" in cherrypy.request.headers:
+                    if "application/json" in cherrypy.request.headers["Content-Type"]:
+                        error_text = "Invalid json format "
+                        indata = json.load(self.reader(cherrypy.request.body))
+                        cherrypy.request.headers.pop("Content-File-MD5", None)
+                    elif "application/yaml" in cherrypy.request.headers["Content-Type"]:
+                        error_text = "Invalid yaml format "
+                        indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+                        cherrypy.request.headers.pop("Content-File-MD5", None)
+                    elif "application/binary" in cherrypy.request.headers["Content-Type"] or \
+                         "application/gzip" in cherrypy.request.headers["Content-Type"] or \
+                         "application/zip" in cherrypy.request.headers["Content-Type"] or \
+                         "text/plain" in cherrypy.request.headers["Content-Type"]:
+                        indata = cherrypy.request.body  # .read()
+                    elif "multipart/form-data" in cherrypy.request.headers["Content-Type"]:
+                        if "descriptor_file" in kwargs:
+                            filecontent = kwargs.pop("descriptor_file")
+                            if not filecontent.file:
+                                raise RoException("empty file or content", HTTPStatus.BAD_REQUEST)
+                            indata = filecontent.file  # .read()
+                            if filecontent.content_type.value:
+                                cherrypy.request.headers["Content-Type"] = filecontent.content_type.value
+                    else:
+                        # raise cherrypy.HTTPError(HTTPStatus.Not_Acceptable,
+                        #                          "Only 'Content-Type' of type 'application/json' or
+                        # 'application/yaml' for input format are available")
+                        error_text = "Invalid yaml format "
+                        indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+                        cherrypy.request.headers.pop("Content-File-MD5", None)
+                else:
+                    error_text = "Invalid yaml format "
+                    indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+                    cherrypy.request.headers.pop("Content-File-MD5", None)
+            if not indata:
+                indata = {}
+
+            format_yaml = False
+            if cherrypy.request.headers.get("Query-String-Format") == "yaml":
+                format_yaml = True
+
+            for k, v in kwargs.items():
+                if isinstance(v, str):
+                    if v == "":
+                        kwargs[k] = None
+                    elif format_yaml:
+                        try:
+                            kwargs[k] = yaml.load(v, Loader=yaml.SafeLoader)
+                        except Exception:
+                            pass
+                    elif k.endswith(".gt") or k.endswith(".lt") or k.endswith(".gte") or k.endswith(".lte"):
+                        try:
+                            kwargs[k] = int(v)
+                        except Exception:
+                            try:
+                                kwargs[k] = float(v)
+                            except Exception:
+                                pass
+                    elif v.find(",") > 0:
+                        kwargs[k] = v.split(",")
+                elif isinstance(v, (list, tuple)):
+                    for index in range(0, len(v)):
+                        if v[index] == "":
+                            v[index] = None
+                        elif format_yaml:
+                            try:
+                                v[index] = yaml.load(v[index], Loader=yaml.SafeLoader)
+                            except Exception:
+                                pass
+
+            return indata
+        except (ValueError, yaml.YAMLError) as exc:
+            raise RoException(error_text + str(exc), HTTPStatus.BAD_REQUEST)
+        except KeyError as exc:
+            raise RoException("Query string error: " + str(exc), HTTPStatus.BAD_REQUEST)
+        except Exception as exc:
+            raise RoException(error_text + str(exc), HTTPStatus.BAD_REQUEST)
+
+    @staticmethod
+    def _format_out(data, token_info=None, _format=None):
+        """
+        return string of dictionary data according to requested json, yaml, xml. By default json
+        :param data: response to be sent. Can be a dict, text or file
+        :param token_info: Contains among other username and project
+        :param _format: The format to be set as Content-Type if data is a file
+        :return: None
+        """
+        accept = cherrypy.request.headers.get("Accept")
+        if data is None:
+            if accept and "text/html" in accept:
+                return html.format(data, cherrypy.request, cherrypy.response, token_info)
+            # cherrypy.response.status = HTTPStatus.NO_CONTENT.value
+            return
+        elif hasattr(data, "read"):  # file object
+            if _format:
+                cherrypy.response.headers["Content-Type"] = _format
+            elif "b" in data.mode:  # binariy asssumig zip
+                cherrypy.response.headers["Content-Type"] = 'application/zip'
+            else:
+                cherrypy.response.headers["Content-Type"] = 'text/plain'
+            # TODO check that cherrypy close file. If not implement pending things to close  per thread next
+            return data
+        if accept:
+            if "application/json" in accept:
+                cherrypy.response.headers["Content-Type"] = 'application/json; charset=utf-8'
+                a = json.dumps(data, indent=4) + "\n"
+                return a.encode("utf8")
+            elif "text/html" in accept:
+                return html.format(data, cherrypy.request, cherrypy.response, token_info)
+
+            elif "application/yaml" in accept or "*/*" in accept or "text/plain" in accept:
+                pass
+            # if there is not any valid accept, raise an error. But if response is already an error, format in yaml
+            elif cherrypy.response.status >= 400:
+                raise cherrypy.HTTPError(HTTPStatus.NOT_ACCEPTABLE.value,
+                                         "Only 'Accept' of type 'application/json' or 'application/yaml' "
+                                         "for output format are available")
+        cherrypy.response.headers["Content-Type"] = 'application/yaml'
+        return yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False, tags=False,
+                              encoding='utf-8', allow_unicode=True)  # , canonical=True, default_style='"'
+
+    @cherrypy.expose
+    def index(self, *args, **kwargs):
+        token_info = None
+        try:
+            if cherrypy.request.method == "GET":
+                token_info = self.authenticator.authorize()
+                outdata = token_info   # Home page
+            else:
+                raise cherrypy.HTTPError(HTTPStatus.METHOD_NOT_ALLOWED.value,
+                                         "Method {} not allowed for tokens".format(cherrypy.request.method))
+
+            return self._format_out(outdata, token_info)
+
+        except (NsException, AuthException) as e:
+            # cherrypy.log("index Exception {}".format(e))
+            cherrypy.response.status = e.http_code.value
+            return self._format_out("Welcome to OSM!", token_info)
+
+    @cherrypy.expose
+    def version(self, *args, **kwargs):
+        # TODO consider to remove and provide version using the static version file
+        try:
+            if cherrypy.request.method != "GET":
+                raise RoException("Only method GET is allowed", HTTPStatus.METHOD_NOT_ALLOWED)
+            elif args or kwargs:
+                raise RoException("Invalid URL or query string for version", HTTPStatus.METHOD_NOT_ALLOWED)
+            # TODO include version of other modules, pick up from some kafka admin message
+            osm_ng_ro_version = {"version": ro_version, "date": ro_version_date}
+            return self._format_out(osm_ng_ro_version)
+        except RoException as e:
+            cherrypy.response.status = e.http_code.value
+            problem_details = {
+                "code": e.http_code.name,
+                "status": e.http_code.value,
+                "detail": str(e),
+            }
+            return self._format_out(problem_details, None)
+
+    def new_token(self, engine_session, indata, *args, **kwargs):
+        token_info = None
+
+        try:
+            token_info = self.authenticator.authorize()
+        except Exception:
+            token_info = None
+        if kwargs:
+            indata.update(kwargs)
+        # This is needed to log the user when authentication fails
+        cherrypy.request.login = "{}".format(indata.get("username", "-"))
+        token_info = self.authenticator.new_token(token_info, indata, cherrypy.request.remote)
+        cherrypy.session['Authorization'] = token_info["id"]
+        self._set_location_header("admin", "v1", "tokens", token_info["id"])
+        # for logging
+
+        # cherrypy.response.cookie["Authorization"] = outdata["id"]
+        # cherrypy.response.cookie["Authorization"]['expires'] = 3600
+        return token_info, token_info["id"], True
+
+    def del_token(self, engine_session, indata, version, _id, *args, **kwargs):
+        token_id = _id
+        if not token_id and "id" in kwargs:
+            token_id = kwargs["id"]
+        elif not token_id:
+            token_info = self.authenticator.authorize()
+            # for logging
+            token_id = token_info["id"]
+        self.authenticator.del_token(token_id)
+        token_info = None
+        cherrypy.session['Authorization'] = "logout"
+        # cherrypy.response.cookie["Authorization"] = token_id
+        # cherrypy.response.cookie["Authorization"]['expires'] = 0
+        return None, None, True
+    
+    @cherrypy.expose
+    def test(self, *args, **kwargs):
+        if not cherrypy.config.get("server.enable_test") or (isinstance(cherrypy.config["server.enable_test"], str) and
+                                                             cherrypy.config["server.enable_test"].lower() == "false"):
+            cherrypy.response.status = HTTPStatus.METHOD_NOT_ALLOWED.value
+            return "test URL is disabled"
+        thread_info = None
+        if args and args[0] == "help":
+            return "<html><pre>\ninit\nfile/<name>  download file\ndb-clear/table\nfs-clear[/folder]\nlogin\nlogin2\n"\
+                   "sleep/<time>\nmessage/topic\n</pre></html>"
+
+        elif args and args[0] == "init":
+            try:
+                # self.ns.load_dbase(cherrypy.request.app.config)
+                self.ns.create_admin()
+                return "Done. User 'admin', password 'admin' created"
+            except Exception:
+                cherrypy.response.status = HTTPStatus.FORBIDDEN.value
+                return self._format_out("Database already initialized")
+        elif args and args[0] == "file":
+            return cherrypy.lib.static.serve_file(cherrypy.tree.apps['/ro'].config["storage"]["path"] + "/" + args[1],
+                                                  "text/plain", "attachment")
+        elif args and args[0] == "file2":
+            f_path = cherrypy.tree.apps['/ro'].config["storage"]["path"] + "/" + args[1]
+            f = open(f_path, "r")
+            cherrypy.response.headers["Content-type"] = "text/plain"
+            return f
+
+        elif len(args) == 2 and args[0] == "db-clear":
+            deleted_info = self.ns.db.del_list(args[1], kwargs)
+            return "{} {} deleted\n".format(deleted_info["deleted"], args[1])
+        elif len(args) and args[0] == "fs-clear":
+            if len(args) >= 2:
+                folders = (args[1],)
+            else:
+                folders = self.ns.fs.dir_ls(".")
+            for folder in folders:
+                self.ns.fs.file_delete(folder)
+            return ",".join(folders) + " folders deleted\n"
+        elif args and args[0] == "login":
+            if not cherrypy.request.headers.get("Authorization"):
+                cherrypy.response.headers["WWW-Authenticate"] = 'Basic realm="Access to OSM site", charset="UTF-8"'
+                cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value
+        elif args and args[0] == "login2":
+            if not cherrypy.request.headers.get("Authorization"):
+                cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="Access to OSM site"'
+                cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value
+        elif args and args[0] == "sleep":
+            sleep_time = 5
+            try:
+                sleep_time = int(args[1])
+            except Exception:
+                cherrypy.response.status = HTTPStatus.FORBIDDEN.value
+                return self._format_out("Database already initialized")
+            thread_info = cherrypy.thread_data
+            print(thread_info)
+            time.sleep(sleep_time)
+            # thread_info
+        elif len(args) >= 2 and args[0] == "message":
+            main_topic = args[1]
+            return_text = "<html><pre>{} ->\n".format(main_topic)
+            try:
+                if cherrypy.request.method == 'POST':
+                    to_send = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+                    for k, v in to_send.items():
+                        self.ns.msg.write(main_topic, k, v)
+                        return_text += "  {}: {}\n".format(k, v)
+                elif cherrypy.request.method == 'GET':
+                    for k, v in kwargs.items():
+                        self.ns.msg.write(main_topic, k, yaml.load(v, Loader=yaml.SafeLoader))
+                        return_text += "  {}: {}\n".format(k, yaml.load(v, Loader=yaml.SafeLoader))
+            except Exception as e:
+                return_text += "Error: " + str(e)
+            return_text += "</pre></html>\n"
+            return return_text
+
+        return_text = (
+            "<html><pre>\nheaders:\n  args: {}\n".format(args) +
+            "  kwargs: {}\n".format(kwargs) +
+            "  headers: {}\n".format(cherrypy.request.headers) +
+            "  path_info: {}\n".format(cherrypy.request.path_info) +
+            "  query_string: {}\n".format(cherrypy.request.query_string) +
+            "  session: {}\n".format(cherrypy.session) +
+            "  cookie: {}\n".format(cherrypy.request.cookie) +
+            "  method: {}\n".format(cherrypy.request.method) +
+            "  session: {}\n".format(cherrypy.session.get('fieldname')) +
+            "  body:\n")
+        return_text += "    length: {}\n".format(cherrypy.request.body.length)
+        if cherrypy.request.body.length:
+            return_text += "    content: {}\n".format(
+                str(cherrypy.request.body.read(int(cherrypy.request.headers.get('Content-Length', 0)))))
+        if thread_info:
+            return_text += "thread: {}\n".format(thread_info)
+        return_text += "</pre></html>"
+        return return_text
+
+    @staticmethod
+    def _check_valid_url_method(method, *args):
+        if len(args) < 3:
+            raise RoException("URL must contain at least 'main_topic/version/topic'", HTTPStatus.METHOD_NOT_ALLOWED)
+
+        reference = valid_url_methods
+        for arg in args:
+            if arg is None:
+                break
+            if not isinstance(reference, dict):
+                raise RoException("URL contains unexpected extra items '{}'".format(arg),
+                                  HTTPStatus.METHOD_NOT_ALLOWED)
+
+            if arg in reference:
+                reference = reference[arg]
+            elif "<ID>" in reference:
+                reference = reference["<ID>"]
+            elif "*" in reference:
+                # reference = reference["*"]
+                break
+            else:
+                raise RoException("Unexpected URL item {}".format(arg), HTTPStatus.METHOD_NOT_ALLOWED)
+        if "TODO" in reference and method in reference["TODO"]:
+            raise RoException("Method {} not supported yet for this URL".format(method), HTTPStatus.NOT_IMPLEMENTED)
+        elif "METHODS" not in reference or method not in reference["METHODS"]:
+            raise RoException("Method {} not supported for this URL".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
+        return reference["ROLE_PERMISSION"] + method.lower()
+
+    @staticmethod
+    def _set_location_header(main_topic, version, topic, id):
+        """
+        Insert response header Location with the URL of created item base on URL params
+        :param main_topic:
+        :param version:
+        :param topic:
+        :param id:
+        :return: None
+        """
+        # Use cherrypy.request.base for absoluted path and make use of request.header HOST just in case behind aNAT
+        cherrypy.response.headers["Location"] = "/ro/{}/{}/{}/{}".format(main_topic, version, topic, id)
+        return
+
+    @cherrypy.expose
+    def default(self, main_topic=None, version=None, topic=None, _id=None, _id2=None, *args, **kwargs):
+        token_info = None
+        outdata = None
+        _format = None
+        method = "DONE"
+        rollback = []
+        engine_session = None
+        try:
+            if not main_topic or not version or not topic:
+                raise RoException("URL must contain at least 'main_topic/version/topic'",
+                                  HTTPStatus.METHOD_NOT_ALLOWED)
+            if main_topic not in ("admin", "ns",):
+                raise RoException("URL main_topic '{}' not supported".format(main_topic),
+                                  HTTPStatus.METHOD_NOT_ALLOWED)
+            if version != 'v1':
+                raise RoException("URL version '{}' not supported".format(version), HTTPStatus.METHOD_NOT_ALLOWED)
+
+            if kwargs and "METHOD" in kwargs and kwargs["METHOD"] in ("PUT", "POST", "DELETE", "GET", "PATCH"):
+                method = kwargs.pop("METHOD")
+            else:
+                method = cherrypy.request.method
+
+            role_permission = self._check_valid_url_method(method, main_topic, version, topic, _id, _id2, *args,
+                                                           **kwargs)
+            # skip token validation if requesting a token
+            indata = self._format_in(kwargs)
+            if main_topic != "admin" or topic != "tokens":
+                token_info = self.authenticator.authorize(role_permission, _id)
+            outdata, created_id, done = self.map_operation[role_permission](
+                engine_session, indata, version, _id, _id2, *args, *kwargs)
+            if created_id:
+                self._set_location_header(main_topic, version, topic, _id)
+            cherrypy.response.status = HTTPStatus.ACCEPTED.value if not done else HTTPStatus.OK.value if \
+                outdata is not None else HTTPStatus.NO_CONTENT.value
+            return self._format_out(outdata, token_info, _format)
+        except Exception as e:
+            if isinstance(e, (RoException, NsException, DbException, FsException, MsgException, AuthException,
+                              ValidationError)):
+                http_code_value = cherrypy.response.status = e.http_code.value
+                http_code_name = e.http_code.name
+                cherrypy.log("Exception {}".format(e))
+            else:
+                http_code_value = cherrypy.response.status = HTTPStatus.BAD_REQUEST.value  # INTERNAL_SERVER_ERROR
+                cherrypy.log("CRITICAL: Exception {}".format(e), traceback=True)
+                http_code_name = HTTPStatus.BAD_REQUEST.name
+            if hasattr(outdata, "close"):  # is an open file
+                outdata.close()
+            error_text = str(e)
+            rollback.reverse()
+            for rollback_item in rollback:
+                try:
+                    if rollback_item.get("operation") == "set":
+                        self.ns.db.set_one(rollback_item["topic"], {"_id": rollback_item["_id"]},
+                                           rollback_item["content"], fail_on_empty=False)
+                    else:
+                        self.ns.db.del_one(rollback_item["topic"], {"_id": rollback_item["_id"]},
+                                           fail_on_empty=False)
+                except Exception as e2:
+                    rollback_error_text = "Rollback Exception {}: {}".format(rollback_item, e2)
+                    cherrypy.log(rollback_error_text)
+                    error_text += ". " + rollback_error_text
+            # if isinstance(e, MsgException):
+            #     error_text = "{} has been '{}' but other modules cannot be informed because an error on bus".format(
+            #         engine_topic[:-1], method, error_text)
+            problem_details = {
+                "code": http_code_name,
+                "status": http_code_value,
+                "detail": error_text,
+            }
+            return self._format_out(problem_details, token_info)
+            # raise cherrypy.HTTPError(e.http_code.value, str(e))
+        finally:
+            if token_info:
+                if method in ("PUT", "PATCH", "POST") and isinstance(outdata, dict):
+                    for logging_id in ("id", "op_id", "nsilcmop_id", "nslcmop_id"):
+                        if outdata.get(logging_id):
+                            cherrypy.request.login += ";{}={}".format(logging_id, outdata[logging_id][:36])
+
+
+def _start_service():
+    """
+    Callback function called when cherrypy.engine starts
+    Override configuration with env variables
+    Set database, storage, message configuration
+    Init database with admin/admin user password
+    """
+    global ro_server
+    # global vim_threads
+    cherrypy.log.error("Starting osm_ng_ro")
+    # update general cherrypy configuration
+    update_dict = {}
+
+    engine_config = cherrypy.tree.apps['/ro'].config
+    for k, v in environ.items():
+        if not k.startswith("OSMRO_"):
+            continue
+        k1, _, k2 = k[6:].lower().partition("_")
+        if not k2:
+            continue
+        try:
+            if k1 in ("server", "test", "auth", "log"):
+                # update [global] configuration
+                update_dict[k1 + '.' + k2] = yaml.safe_load(v)
+            elif k1 == "static":
+                # update [/static] configuration
+                engine_config["/static"]["tools.staticdir." + k2] = yaml.safe_load(v)
+            elif k1 == "tools":
+                # update [/] configuration
+                engine_config["/"]["tools." + k2.replace('_', '.')] = yaml.safe_load(v)
+            elif k1 in ("message", "database", "storage", "authentication"):
+                # update [message], [database], ... configuration
+                if k2 in ("port", "db_port"):
+                    engine_config[k1][k2] = int(v)
+                else:
+                    engine_config[k1][k2] = v
+
+        except Exception as e:
+            raise RoException("Cannot load env '{}': {}".format(k, e))
+
+    if update_dict:
+        cherrypy.config.update(update_dict)
+        engine_config["global"].update(update_dict)
+
+    # logging cherrypy
+    log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
+    log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
+    logger_server = logging.getLogger("cherrypy.error")
+    logger_access = logging.getLogger("cherrypy.access")
+    logger_cherry = logging.getLogger("cherrypy")
+    logger_nbi = logging.getLogger("ro")
+
+    if "log.file" in engine_config["global"]:
+        file_handler = logging.handlers.RotatingFileHandler(engine_config["global"]["log.file"],
+                                                            maxBytes=100e6, backupCount=9, delay=0)
+        file_handler.setFormatter(log_formatter_simple)
+        logger_cherry.addHandler(file_handler)
+        logger_nbi.addHandler(file_handler)
+    # log always to standard output
+    for format_, logger in {"ro.server %(filename)s:%(lineno)s": logger_server,
+                            "ro.access %(filename)s:%(lineno)s": logger_access,
+                            "%(name)s %(filename)s:%(lineno)s": logger_nbi
+                            }.items():
+        log_format_cherry = "%(asctime)s %(levelname)s {} %(message)s".format(format_)
+        log_formatter_cherry = logging.Formatter(log_format_cherry, datefmt='%Y-%m-%dT%H:%M:%S')
+        str_handler = logging.StreamHandler()
+        str_handler.setFormatter(log_formatter_cherry)
+        logger.addHandler(str_handler)
+
+    if engine_config["global"].get("log.level"):
+        logger_cherry.setLevel(engine_config["global"]["log.level"])
+        logger_nbi.setLevel(engine_config["global"]["log.level"])
+
+    # logging other modules
+    for k1, logname in {"message": "ro.msg", "database": "ro.db", "storage": "ro.fs"}.items():
+        engine_config[k1]["logger_name"] = logname
+        logger_module = logging.getLogger(logname)
+        if "logfile" in engine_config[k1]:
+            file_handler = logging.handlers.RotatingFileHandler(engine_config[k1]["logfile"],
+                                                                maxBytes=100e6, backupCount=9, delay=0)
+            file_handler.setFormatter(log_formatter_simple)
+            logger_module.addHandler(file_handler)
+        if "loglevel" in engine_config[k1]:
+            logger_module.setLevel(engine_config[k1]["loglevel"])
+    # TODO add more entries, e.g.: storage
+
+    engine_config["assignment"] = {}
+    # ^ each VIM, SDNc will be assigned one worker id. Ns class will add items and VimThread will auto-assign
+    cherrypy.tree.apps['/ro'].root.ns.start(engine_config)
+    cherrypy.tree.apps['/ro'].root.authenticator.start(engine_config)
+    cherrypy.tree.apps['/ro'].root.ns.init_db(target_version=database_version)
+
+    # # start subscriptions thread:
+    # vim_threads = []
+    # for thread_id in range(engine_config["global"]["server.ns_threads"]):
+    #     vim_thread = VimThread(thread_id, config=engine_config, engine=ro_server.ns)
+    #     vim_thread.start()
+    #     vim_threads.append(vim_thread)
+    # # Do not capture except SubscriptionException
+
+    backend = engine_config["authentication"]["backend"]
+    cherrypy.log.error("Starting OSM NBI Version '{} {}' with '{}' authentication backend"
+                       .format(ro_version, ro_version_date, backend))
+
+
+def _stop_service():
+    """
+    Callback function called when cherrypy.engine stops
+    TODO: Ending database connections.
+    """
+    # global vim_threads
+    # if vim_threads:
+    #     for vim_thread in vim_threads:
+    #         vim_thread.terminate()
+    # vim_threads = None
+    cherrypy.tree.apps['/ro'].root.ns.stop()
+    cherrypy.log.error("Stopping osm_ng_ro")
+
+
+def ro_main(config_file):
+    global ro_server
+    ro_server = Server()
+    cherrypy.engine.subscribe('start', _start_service)
+    cherrypy.engine.subscribe('stop', _stop_service)
+    cherrypy.quickstart(ro_server, '/ro', config_file)
+
+
+def usage():
+    print("""Usage: {} [options]
+        -c|--config [configuration_file]: loads the configuration file (default: ./ro.cfg)
+        -h|--help: shows this help
+        """.format(sys.argv[0]))
+    # --log-socket-host HOST: send logs to this host")
+    # --log-socket-port PORT: send logs using this port (default: 9022)")
+
+
+if __name__ == '__main__':
+    try:
+        # load parameters and configuration
+        opts, args = getopt.getopt(sys.argv[1:], "hvc:", ["config=", "help"])
+        # TODO add  "log-socket-host=", "log-socket-port=", "log-file="
+        config_file = None
+        for o, a in opts:
+            if o in ("-h", "--help"):
+                usage()
+                sys.exit()
+            elif o in ("-c", "--config"):
+                config_file = a
+            else:
+                assert False, "Unhandled option"
+        if config_file:
+            if not path.isfile(config_file):
+                print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
+                exit(1)
+        else:
+            for config_file in (path.dirname(__file__) + "/ro.cfg", "./ro.cfg", "/etc/osm/ro.cfg"):
+                if path.isfile(config_file):
+                    break
+            else:
+                print("No configuration file 'ro.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
+                exit(1)
+        ro_main(config_file)
+    except getopt.GetoptError as e:
+        print(str(e), file=sys.stderr)
+        # usage()
+        exit(1)
diff --git a/NG-RO/osm_ng_ro/validation.py b/NG-RO/osm_ng_ro/validation.py
new file mode 100644 (file)
index 0000000..060a3eb
--- /dev/null
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from jsonschema import validate as js_v, exceptions as js_e
+from http import HTTPStatus
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+__version__ = "0.1"
+version_date = "Jun 2020"
+
+"""
+Validator of input data using JSON schemas
+"""
+
+# Basis schemas
+name_schema = {"type": "string", "minLength": 1, "maxLength": 255, "pattern": "^[^,;()'\"]+$"}
+string_schema = {"type": "string", "minLength": 1, "maxLength": 255}
+ssh_key_schema = {"type": "string", "minLength": 1}
+id_schema = {"type": "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"}
+bool_schema = {"type": "boolean"}
+null_schema = {"type": "null"}
+
+image_schema = {
+    "title": "image input validation",
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "type": "object",
+    # TODO
+}
+
+flavor_schema = {
+    "title": "image input validation",
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "type": "object",
+    # TODO
+}
+
+ns_schema = {
+    "title": "image input validation",
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "type": "object",
+    # TODO
+}
+
+deploy_schema = {
+    "title": "deploy input validation",
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "type": "object",
+    "properties": {
+        "action_id": string_schema,
+        "name": name_schema,
+        "action": {"enum" ["inject_ssh_key"]},
+        "key": ssh_key_schema,
+        "user": name_schema,
+        "password": string_schema,
+        "vnf": {
+            "type": "object",
+            "properties": {
+                "_id": id_schema,
+                # TODO
+            },
+            "required": ["_id"],
+            "additionalProperties": True,
+        },
+        "image": {
+            "type": "array",
+            "minItems": 1,
+            "items": image_schema
+        },
+        "flavor": {
+            "type": "array",
+            "minItems": 1,
+            "items": flavor_schema
+        },
+        "ns": ns_schema,
+    },
+
+    "required": ["name"],
+    "additionalProperties": False
+}
+
+
+class ValidationError(Exception):
+    def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY):
+        self.http_code = http_code
+        Exception.__init__(self, message)
+
+
+def validate_input(indata, schema_to_use):
+    """
+    Validates input data against json schema
+    :param indata: user input data. Should be a dictionary
+    :param schema_to_use: jsonschema to test
+    :return: None if ok, raises ValidationError exception on error
+    """
+    try:
+        if schema_to_use:
+            js_v(indata, schema_to_use)
+        return None
+    except js_e.ValidationError as e:
+        if e.path:
+            error_pos = "at '" + ":".join(map(str, e.path)) + "'"
+        else:
+            error_pos = ""
+        raise ValidationError("Format error {} '{}' ".format(error_pos, e.message))
+    except js_e.SchemaError:
+        raise ValidationError("Bad json schema {}".format(schema_to_use), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
diff --git a/NG-RO/requirements.txt b/NG-RO/requirements.txt
new file mode 100644 (file)
index 0000000..1ed0201
--- /dev/null
@@ -0,0 +1,22 @@
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+##
+
+PyYAML
+CherryPy==18.1.2
+osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
+requests
+cryptography
+osm-im @ git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im
+osm-ro-plugin @ git+https://osm.etsi.org/gerrit/osm/RO.git#egg=osm-ro-plugin&subdirectory=RO-plugin
+logutils
diff --git a/NG-RO/setup.py b/NG-RO/setup.py
new file mode 100644 (file)
index 0000000..a001836
--- /dev/null
@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright 2020 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from setuptools import setup, find_packages
+
+_name = "osm_ng_ro"
+_readme = "osm-ng-ro is the New Generation Resource Orchestrator for OSM"
+setup(
+    name=_name,
+    description='OSM Resource Orchestrator',
+    long_description=_readme,
+    version_command=('git describe --match v* --tags --long --dirty', 'pep440-git-full'),
+    author='ETSI OSM',
+    author_email='alfonso.tiernosepulveda@telefonica.com',
+    maintainer='Alfonso Tierno',
+    maintainer_email='alfonso.tiernosepulveda@telefonica.com',
+    url='https://osm.etsi.org/gitweb/?p=osm/RO.git;a=summary',
+    license='Apache 2.0',
+
+    packages=find_packages(exclude=["temp", "local"]),
+    include_package_data=True,
+    install_requires=[
+        'CherryPy==18.1.2',
+        'osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common',
+        'jsonschema',
+        'PyYAML',
+        'requests',
+        'cryptography',
+        'osm-im @ git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im',
+        "osm-ro-plugin @ git+https://osm.etsi.org/gerrit/osm/RO.git#egg=osm-ro-plugin&subdirectory=RO-plugin",
+    ],
+    setup_requires=['setuptools-version-command'],
+)
diff --git a/NG-RO/stdeb.cfg b/NG-RO/stdeb.cfg
new file mode 100644 (file)
index 0000000..f7ebd4a
--- /dev/null
@@ -0,0 +1,16 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+[DEFAULT]
+X-Python3-Version : >= 3.5
+Depends3 : python3-osm-common, python3-osm-im, python3-cherrypy3, python3-yaml, python3-jsonschema,
+    python3-pip, python3-requests, python3-osm-ro-plugin
diff --git a/NG-RO/tox.ini b/NG-RO/tox.ini
new file mode 100644 (file)
index 0000000..081bc1c
--- /dev/null
@@ -0,0 +1,33 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+[tox]
+envlist = flake8
+toxworkdir={toxinidir}/../.tox
+
+[testenv]
+usedevelop = True
+basepython = python3
+install_command = python3 -m pip install -r requirements.txt   -U {opts} {packages}
+
+[testenv:flake8]
+basepython = python3
+deps = flake8
+commands = flake8 osm_ng_ro  --max-line-length 120 \
+    --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,osm_im --ignore W291,W293,E226,E402,W504
+
+[testenv:build]
+basepython = python3
+deps = stdeb
+       setuptools-version-command
+commands = python3 setup.py --command-packages=stdeb.command bdist_deb
index 289c827..7cf5e88 100644 (file)
@@ -1222,19 +1222,19 @@ class vimconnector(vimconn.VimConnector):
                 vim_id: filled/added by this function
                 floating_ip: True/False (or it can be None)
             'cloud_config': (optional) dictionary with:
-            'key-pairs': (optional) list of strings with the public key to be inserted to the default user
-            'users': (optional) list of users to be inserted, each item is a dict with:
-                'name': (mandatory) user name,
-                'key-pairs': (optional) list of strings with the public key to be inserted to the user
-            'user-data': (optional) string is a text script to be passed directly to cloud-init
-            'config-files': (optional). List of files to be transferred. Each item is a dict with:
-                'dest': (mandatory) string with the destination absolute path
-                'encoding': (optional, by default text). Can be one of:
-                    'b64', 'base64', 'gz', 'gz+b64', 'gz+base64', 'gzip+b64', 'gzip+base64'
-                'content' (mandatory): string with the content of the file
-                'permissions': (optional) string with file permissions, typically octal notation '0644'
-                'owner': (optional) file owner, string with the format 'owner:group'
-            'boot-data-drive': boolean to indicate if user-data must be passed using a boot drive (hard disk)
+                'key-pairs': (optional) list of strings with the public key to be inserted to the default user
+                'users': (optional) list of users to be inserted, each item is a dict with:
+                    'name': (mandatory) user name,
+                    'key-pairs': (optional) list of strings with the public key to be inserted to the user
+                'user-data': (optional) string is a text script to be passed directly to cloud-init
+                'config-files': (optional). List of files to be transferred. Each item is a dict with:
+                    'dest': (mandatory) string with the destination absolute path
+                    'encoding': (optional, by default text). Can be one of:
+                        'b64', 'base64', 'gz', 'gz+b64', 'gz+base64', 'gzip+b64', 'gzip+base64'
+                    'content' (mandatory): string with the content of the file
+                    'permissions': (optional) string with file permissions, typically octal notation '0644'
+                    'owner': (optional) file owner, string with the format 'owner:group'
+                'boot-data-drive': boolean to indicate if user-data must be passed using a boot drive (hard disk)
             'disk_list': (optional) list with additional disks to the VM. Each item is a dict with:
                 'image_id': (optional). VIM id of an existing image. If not provided an empty disk must be mounted
                 'size': (mandatory) string with the size of the disk in GB
index 94183e9..abe3c5c 100755 (executable)
 [ -z "$RO_DB_OVIM_HOST" ] && export RO_DB_OVIM_HOST="$RO_DB_HOST"
 [ -z "$RO_DB_OVIM_ROOT_PASSWORD" ] && export RO_DB_OVIM_ROOT_PASSWORD="$RO_DB_ROOT_PASSWORD"
 
+# IF OSMRO_SERVER_NG use new server that not need any database init
+[ -n "$OSMRO_SERVER_NG" ] && python3 -m osm_ng_ro.ro_main
+
+
 function is_db_created() {
     db_host=$1
     db_port=$2