from threading import Lock
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-min_common_version = "0.1.8"
+min_common_version = "0.1.16"
class Engine(object):
self.msg.connect(config["message"])
else:
raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
- config["storage"]["driver"]))
+ config["message"]["driver"]))
self.write_lock = Lock()
# create one class per topic
self.db.db_disconnect()
if self.fs:
self.fs.fs_disconnect()
- if self.fs:
- self.fs.fs_disconnect()
+ if self.msg:
+ self.msg.disconnect()
self.write_lock = None
except (DbException, FsException, MsgException) as e:
raise EngineException(str(e), http_code=e.http_code)
from authconn import AuthException
from auth import Authenticator
from engine import Engine, EngineException
+from subscriptions import SubscriptionThread
from validation import ValidationError
from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
version_date = "Jan 2019"
database_version = '1.0'
auth_database_version = '1.0'
+nbi_server = None # instance of Server class
+subscription_thread = None # instance of SubscriptionThread class
+
"""
North Bound Interface (O: OSM specific; 5,X: SOL005 not implemented yet; O5: SOL005 implemented)
Set database, storage, message configuration
Init database with admin/admin user password
"""
+ global nbi_server
+ global subscription_thread
cherrypy.log.error("Starting osm_nbi")
# update general cherrypy configuration
update_dict = {}
cherrypy.tree.apps['/osm'].root.engine.init_db(target_version=database_version)
cherrypy.tree.apps['/osm'].root.authenticator.init_db(target_version=auth_database_version)
+ # start subscriptions thread:
+ subscription_thread = SubscriptionThread(config=engine_config, engine=nbi_server.engine)
+ subscription_thread.start()
+ # Do not capture except SubscriptionException
+
# load and print version. Ignore possible errors, e.g. file not found
try:
with open("{}/version".format(engine_config["/static"]['tools.staticdir.dir'])) as version_file:
Callback function called when cherrypy.engine stops
TODO: Ending database connections.
"""
+ global subscription_thread
+ subscription_thread.terminate()
+ subscription_thread = None
cherrypy.tree.apps['/osm'].root.engine.stop()
cherrypy.log.error("Stopping osm_nbi")
def nbi(config_file):
+ global nbi_server
# conf = {
# '/': {
# #'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
# cherrypy.config.update({'tools.auth_basic.on': True,
# 'tools.auth_basic.realm': 'localhost',
# 'tools.auth_basic.checkpassword': validate_password})
+ nbi_server = Server()
cherrypy.engine.subscribe('start', _start_service)
cherrypy.engine.subscribe('stop', _stop_service)
- cherrypy.quickstart(Server(), '/osm', config_file)
+ cherrypy.quickstart(nbi_server, '/osm', config_file)
def usage():
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module implements a thread that reads from kafka bus implementing all the subscriptions.
+It is based on asyncio.
+To avoid race conditions it uses same engine class as the main module for database changes
+For the moment this module only deletes NS instances when they are terminated with the autoremove flag
+"""
+
+import logging
+import threading
+import asyncio
+from http import HTTPStatus
+from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common.dbbase import DbException
+from osm_common.msgbase import MsgException
+from engine import EngineException
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class SubscriptionException(Exception):
+
+ def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
+ self.http_code = http_code
+ Exception.__init__(self, message)
+
+
+class SubscriptionThread(threading.Thread):
+
+ def __init__(self, config, engine):
+ """
+ Constructor of class
+ :param config: configuration parameters of database and messaging
+ :param engine: an instance of Engine class, used for deleting instances
+ """
+ threading.Thread.__init__(self)
+
+ self.config = config
+ self.db = None
+ self.msg = None
+ self.engine = engine
+ self.loop = None
+ self.logger = logging.getLogger("nbi.subscriptions")
+ self.aiomain_task = None # asyncio task for receiving kafka bus
+ self.internal_session = { # used for a session to the engine methods
+ "_id": "subscription",
+ "id": "subscription",
+ "project_id": "admin",
+ "admin": True
+ }
+
+ def run(self):
+ """
+ Start of the thread
+ :return: None
+ """
+ self.loop = asyncio.new_event_loop()
+ try:
+ if not self.db:
+ if self.config["database"]["driver"] == "mongo":
+ self.db = dbmongo.DbMongo()
+ self.db.db_connect(self.config["database"])
+ elif self.config["database"]["driver"] == "memory":
+ self.db = dbmemory.DbMemory()
+ self.db.db_connect(self.config["database"])
+ else:
+ raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
+ self.config["database"]["driver"]))
+ if not self.msg:
+ config_msg = self.config["message"].copy()
+ config_msg["loop"] = self.loop
+ if config_msg["driver"] == "local":
+ self.msg = msglocal.MsgLocal()
+ self.msg.connect(config_msg)
+ elif config_msg["driver"] == "kafka":
+ self.msg = msgkafka.MsgKafka()
+ self.msg.connect(config_msg)
+ else:
+ raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
+ config_msg["driver"]))
+
+ except (DbException, MsgException) as e:
+ raise SubscriptionException(str(e), http_code=e.http_code)
+
+ self.logger.debug("Starting")
+ while True:
+ try:
+ self.aiomain_task = asyncio.ensure_future(self.msg.aioread("ns", loop=self.loop,
+ callback=self._msg_callback),
+ loop=self.loop)
+ self.loop.run_until_complete(self.aiomain_task)
+ except asyncio.CancelledError:
+ break # if cancelled it should end, breaking loop
+ except Exception as e:
+ self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+
+ self.logger.debug("Finishing")
+ self._stop()
+ self.loop.close()
+
+ def _msg_callback(self, topic, command, params):
+ """
+ Callback to process a received message from kafka
+ :param topic: topic received
+ :param command: command received
+ :param params: rest of parameters
+ :return: None
+ """
+ try:
+ if topic == "ns":
+ if command == "terminated":
+ self.logger.debug("received ns terminated {}".format(params))
+ if params.get("autoremove"):
+ self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
+ self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
+ return
+ except (EngineException, DbException, MsgException) as e:
+ self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+ except Exception as e:
+ self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
+ exc_info=True)
+
+ def _stop(self):
+ """
+ Close all connections
+ :return: None
+ """
+ try:
+ if self.db:
+ self.db.db_disconnect()
+ if self.msg:
+ self.msg.disconnect()
+ except (DbException, MsgException) as e:
+ raise SubscriptionException(str(e), http_code=e.http_code)
+
+ def terminate(self):
+ """
+ This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
+ but not immediately.
+ :return: None
+ """
+ self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
self.users = {'1': "cirros", '2': "cirros"}
self.passwords = {'1': "cubswin:)", '2': "cubswin:)"}
+ def terminate(self, engine):
+ # Make a delete in one step, overriding the normal two step of TestDeploy that launched terminate and delete
+ if test_osm:
+ engine.test("Terminate and delete NS in one step", "DELETE", "/nslcm/v1/ns_instances_content/{}".
+ format(self.ns_id), headers_yaml, None, 202, None, "yaml")
+
+ engine .wait_until_delete("/nslcm/v1/ns_instances/{}".format(self.ns_id), timeout_deploy)
+ else:
+ engine.test("Delete NS with FORCE", "DELETE", "/nslcm/v1/ns_instances/{}?FORCE=True".format(self.ns_id),
+ headers_yaml, None, 204, None, 0)
+
+ # check all it is deleted
+ engine.test("Check NS is deleted", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None,
+ 404, None, "yaml")
+ r = engine.test("Check NSLCMOPs are deleted", "GET",
+ "/nslcm/v1/ns_lcm_op_occs?nsInstanceId={}".format(self.ns_id), headers_json, None,
+ 200, None, "json")
+ if not r:
+ return
+ nslcmops = r.json()
+ if not isinstance(nslcmops, list) or nslcmops:
+ raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_id, nslcmops))
+
class TestDeployHackfest1(TestDeploy):
description = "Load and deploy Hackfest_1_vnfd example"
self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
self.users = {'1': "ubuntu", '2': "ubuntu"}
self.passwords = {'1': "osm4u", '2': "osm4u"}
-
- def create_descriptors(self, engine):
- super().create_descriptors(engine)
# Modify VNFD to add scaling
- self.descriptor_edit = {
- "vnfd0": {
- 'vnf-configuration': {
- 'config-primitive': [{
- 'name': 'touch',
- 'parameter': [{
- 'name': 'filename',
- 'data-type': 'STRING',
- 'default-value': '/home/ubuntu/touched'
- }]
- }]
- },
- 'scaling-group-descriptor': [{
- 'name': 'scale_dataVM',
- 'scaling-policy': [{
- 'threshold-time': 0,
- 'name': 'auto_cpu_util_above_threshold',
- 'scaling-type': 'automatic',
- 'scaling-criteria': [{
- 'name': 'cpu_util_above_threshold',
- 'vnf-monitoring-param-ref': 'all_aaa_cpu_util',
- 'scale-out-relational-operation': 'GE',
- 'scale-in-threshold': 15,
- 'scale-out-threshold': 60,
- 'scale-in-relational-operation': 'LE'
- }],
- 'cooldown-time': 60
- }],
- 'max-instance-count': 10,
- 'scaling-config-action': [
- {'vnf-config-primitive-name-ref': 'touch',
- 'trigger': 'post-scale-out'},
- {'vnf-config-primitive-name-ref': 'touch',
- 'trigger': 'pre-scale-in'}
- ],
- 'vdu': [{
- 'vdu-id-ref': 'dataVM',
- 'count': 1
- }]
- }]
- }
- }
+ # self.descriptor_edit = {
+ # "vnfd0": {
+ # 'vnf-configuration': {
+ # 'config-primitive': [{
+ # 'name': 'touch',
+ # 'parameter': [{
+ # 'name': 'filename',
+ # 'data-type': 'STRING',
+ # 'default-value': '/home/ubuntu/touched'
+ # }]
+ # }]
+ # },
+ # 'scaling-group-descriptor': [{
+ # 'name': 'scale_dataVM',
+ # 'scaling-policy': [{
+ # 'threshold-time': 0,
+ # 'name': 'auto_cpu_util_above_threshold',
+ # 'scaling-type': 'automatic',
+ # 'scaling-criteria': [{
+ # 'name': 'cpu_util_above_threshold',
+ # 'vnf-monitoring-param-ref': 'all_aaa_cpu_util',
+ # 'scale-out-relational-operation': 'GE',
+ # 'scale-in-threshold': 15,
+ # 'scale-out-threshold': 60,
+ # 'scale-in-relational-operation': 'LE'
+ # }],
+ # 'cooldown-time': 60
+ # }],
+ # 'max-instance-count': 10,
+ # 'scaling-config-action': [
+ # {'vnf-config-primitive-name-ref': 'touch',
+ # 'trigger': 'post-scale-out'},
+ # {'vnf-config-primitive-name-ref': 'touch',
+ # 'trigger': 'pre-scale-in'}
+ # ],
+ # 'vdu': [{
+ # 'vdu-id-ref': 'dataVM',
+ # 'count': 1
+ # }]
+ # }]
+ # }
+ # }
class TestDeployHackfest3Charmed(TestDeploy):