From: tierno Date: Mon, 28 Jan 2019 17:28:10 +0000 (+0000) Subject: new thread to read from kafka and terminate ns/nsi when autoremove X-Git-Tag: v5.0.3~1 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=932499c09d729d235ccd1fc002156b8b23e9f165;p=osm%2FNBI.git new thread to read from kafka and terminate ns/nsi when autoremove Change-Id: Ibae5e0adb05716d506b1d571e583235f1e14aa40 Signed-off-by: tierno --- diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index f186a4c..b90f713 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -28,7 +28,7 @@ from os import urandom from threading import Lock __author__ = "Alfonso Tierno " -min_common_version = "0.1.8" +min_common_version = "0.1.16" class Engine(object): @@ -98,7 +98,7 @@ class Engine(object): self.msg.connect(config["message"]) else: raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format( - config["storage"]["driver"])) + config["message"]["driver"])) self.write_lock = Lock() # create one class per topic @@ -113,8 +113,8 @@ class Engine(object): self.db.db_disconnect() if self.fs: self.fs.fs_disconnect() - if self.fs: - self.fs.fs_disconnect() + if self.msg: + self.msg.disconnect() self.write_lock = None except (DbException, FsException, MsgException) as e: raise EngineException(str(e), http_code=e.http_code) diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index a1e8b50..0417c9e 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -27,6 +27,7 @@ import sys from authconn import AuthException from auth import Authenticator from engine import Engine, EngineException +from subscriptions import SubscriptionThread from validation import ValidationError from osm_common.dbbase import DbException from osm_common.fsbase import FsException @@ -41,6 +42,9 @@ __version__ = "0.1.3" version_date = "Jan 2019" database_version = '1.0' auth_database_version = '1.0' +nbi_server = None # instance of Server class +subscription_thread = None # instance of SubscriptionThread class + """ North Bound Interface (O: OSM specific; 5,X: SOL005 not implemented yet; O5: SOL005 implemented) @@ -904,6 +908,8 @@ def _start_service(): Set database, storage, message configuration Init database with admin/admin user password """ + global nbi_server + global subscription_thread cherrypy.log.error("Starting osm_nbi") # update general cherrypy configuration update_dict = {} @@ -988,6 +994,11 @@ def _start_service(): cherrypy.tree.apps['/osm'].root.engine.init_db(target_version=database_version) cherrypy.tree.apps['/osm'].root.authenticator.init_db(target_version=auth_database_version) + # start subscriptions thread: + subscription_thread = SubscriptionThread(config=engine_config, engine=nbi_server.engine) + subscription_thread.start() + # Do not capture except SubscriptionException + # load and print version. Ignore possible errors, e.g. file not found try: with open("{}/version".format(engine_config["/static"]['tools.staticdir.dir'])) as version_file: @@ -1002,11 +1013,15 @@ def _stop_service(): Callback function called when cherrypy.engine stops TODO: Ending database connections. """ + global subscription_thread + subscription_thread.terminate() + subscription_thread = None cherrypy.tree.apps['/osm'].root.engine.stop() cherrypy.log.error("Stopping osm_nbi") def nbi(config_file): + global nbi_server # conf = { # '/': { # #'request.dispatch': cherrypy.dispatch.MethodDispatcher(), @@ -1024,9 +1039,10 @@ def nbi(config_file): # cherrypy.config.update({'tools.auth_basic.on': True, # 'tools.auth_basic.realm': 'localhost', # 'tools.auth_basic.checkpassword': validate_password}) + nbi_server = Server() cherrypy.engine.subscribe('start', _start_service) cherrypy.engine.subscribe('stop', _stop_service) - cherrypy.quickstart(Server(), '/osm', config_file) + cherrypy.quickstart(nbi_server, '/osm', config_file) def usage(): diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py new file mode 100644 index 0000000..64fc2bf --- /dev/null +++ b/osm_nbi/subscriptions.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module implements a thread that reads from kafka bus implementing all the subscriptions. +It is based on asyncio. +To avoid race conditions it uses same engine class as the main module for database changes +For the moment this module only deletes NS instances when they are terminated with the autoremove flag +""" + +import logging +import threading +import asyncio +from http import HTTPStatus +from osm_common import dbmongo, dbmemory, msglocal, msgkafka +from osm_common.dbbase import DbException +from osm_common.msgbase import MsgException +from engine import EngineException + +__author__ = "Alfonso Tierno " + + +class SubscriptionException(Exception): + + def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): + self.http_code = http_code + Exception.__init__(self, message) + + +class SubscriptionThread(threading.Thread): + + def __init__(self, config, engine): + """ + Constructor of class + :param config: configuration parameters of database and messaging + :param engine: an instance of Engine class, used for deleting instances + """ + threading.Thread.__init__(self) + + self.config = config + self.db = None + self.msg = None + self.engine = engine + self.loop = None + self.logger = logging.getLogger("nbi.subscriptions") + self.aiomain_task = None # asyncio task for receiving kafka bus + self.internal_session = { # used for a session to the engine methods + "_id": "subscription", + "id": "subscription", + "project_id": "admin", + "admin": True + } + + def run(self): + """ + Start of the thread + :return: None + """ + self.loop = asyncio.new_event_loop() + try: + if not self.db: + if self.config["database"]["driver"] == "mongo": + self.db = dbmongo.DbMongo() + self.db.db_connect(self.config["database"]) + elif self.config["database"]["driver"] == "memory": + self.db = dbmemory.DbMemory() + self.db.db_connect(self.config["database"]) + else: + raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format( + self.config["database"]["driver"])) + if not self.msg: + config_msg = self.config["message"].copy() + config_msg["loop"] = self.loop + if config_msg["driver"] == "local": + self.msg = msglocal.MsgLocal() + self.msg.connect(config_msg) + elif config_msg["driver"] == "kafka": + self.msg = msgkafka.MsgKafka() + self.msg.connect(config_msg) + else: + raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format( + config_msg["driver"])) + + except (DbException, MsgException) as e: + raise SubscriptionException(str(e), http_code=e.http_code) + + self.logger.debug("Starting") + while True: + try: + self.aiomain_task = asyncio.ensure_future(self.msg.aioread("ns", loop=self.loop, + callback=self._msg_callback), + loop=self.loop) + self.loop.run_until_complete(self.aiomain_task) + except asyncio.CancelledError: + break # if cancelled it should end, breaking loop + except Exception as e: + self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True) + + self.logger.debug("Finishing") + self._stop() + self.loop.close() + + def _msg_callback(self, topic, command, params): + """ + Callback to process a received message from kafka + :param topic: topic received + :param command: command received + :param params: rest of parameters + :return: None + """ + try: + if topic == "ns": + if command == "terminated": + self.logger.debug("received ns terminated {}".format(params)) + if params.get("autoremove"): + self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"]) + self.logger.debug("ns={} deleted from database".format(params["nsr_id"])) + return + except (EngineException, DbException, MsgException) as e: + self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e)) + except Exception as e: + self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e), + exc_info=True) + + def _stop(self): + """ + Close all connections + :return: None + """ + try: + if self.db: + self.db.db_disconnect() + if self.msg: + self.msg.disconnect() + except (DbException, MsgException) as e: + raise SubscriptionException(str(e), http_code=e.http_code) + + def terminate(self): + """ + This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, + but not immediately. + :return: None + """ + self.loop.call_soon_threadsafe(self.aiomain_task.cancel) diff --git a/osm_nbi/tests/test.py b/osm_nbi/tests/test.py index 8c5ad14..6bb04d9 100755 --- a/osm_nbi/tests/test.py +++ b/osm_nbi/tests/test.py @@ -1002,6 +1002,29 @@ class TestDeployHackfestCirros(TestDeploy): self.users = {'1': "cirros", '2': "cirros"} self.passwords = {'1': "cubswin:)", '2': "cubswin:)"} + def terminate(self, engine): + # Make a delete in one step, overriding the normal two step of TestDeploy that launched terminate and delete + if test_osm: + engine.test("Terminate and delete NS in one step", "DELETE", "/nslcm/v1/ns_instances_content/{}". + format(self.ns_id), headers_yaml, None, 202, None, "yaml") + + engine .wait_until_delete("/nslcm/v1/ns_instances/{}".format(self.ns_id), timeout_deploy) + else: + engine.test("Delete NS with FORCE", "DELETE", "/nslcm/v1/ns_instances/{}?FORCE=True".format(self.ns_id), + headers_yaml, None, 204, None, 0) + + # check all it is deleted + engine.test("Check NS is deleted", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None, + 404, None, "yaml") + r = engine.test("Check NSLCMOPs are deleted", "GET", + "/nslcm/v1/ns_lcm_op_occs?nsInstanceId={}".format(self.ns_id), headers_json, None, + 200, None, "json") + if not r: + return + nslcmops = r.json() + if not isinstance(nslcmops, list) or nslcmops: + raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_id, nslcmops)) + class TestDeployHackfest1(TestDeploy): description = "Load and deploy Hackfest_1_vnfd example" @@ -1168,52 +1191,49 @@ class TestDeployHackfest4(TestDeploy): self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} self.users = {'1': "ubuntu", '2': "ubuntu"} self.passwords = {'1': "osm4u", '2': "osm4u"} - - def create_descriptors(self, engine): - super().create_descriptors(engine) # Modify VNFD to add scaling - self.descriptor_edit = { - "vnfd0": { - 'vnf-configuration': { - 'config-primitive': [{ - 'name': 'touch', - 'parameter': [{ - 'name': 'filename', - 'data-type': 'STRING', - 'default-value': '/home/ubuntu/touched' - }] - }] - }, - 'scaling-group-descriptor': [{ - 'name': 'scale_dataVM', - 'scaling-policy': [{ - 'threshold-time': 0, - 'name': 'auto_cpu_util_above_threshold', - 'scaling-type': 'automatic', - 'scaling-criteria': [{ - 'name': 'cpu_util_above_threshold', - 'vnf-monitoring-param-ref': 'all_aaa_cpu_util', - 'scale-out-relational-operation': 'GE', - 'scale-in-threshold': 15, - 'scale-out-threshold': 60, - 'scale-in-relational-operation': 'LE' - }], - 'cooldown-time': 60 - }], - 'max-instance-count': 10, - 'scaling-config-action': [ - {'vnf-config-primitive-name-ref': 'touch', - 'trigger': 'post-scale-out'}, - {'vnf-config-primitive-name-ref': 'touch', - 'trigger': 'pre-scale-in'} - ], - 'vdu': [{ - 'vdu-id-ref': 'dataVM', - 'count': 1 - }] - }] - } - } + # self.descriptor_edit = { + # "vnfd0": { + # 'vnf-configuration': { + # 'config-primitive': [{ + # 'name': 'touch', + # 'parameter': [{ + # 'name': 'filename', + # 'data-type': 'STRING', + # 'default-value': '/home/ubuntu/touched' + # }] + # }] + # }, + # 'scaling-group-descriptor': [{ + # 'name': 'scale_dataVM', + # 'scaling-policy': [{ + # 'threshold-time': 0, + # 'name': 'auto_cpu_util_above_threshold', + # 'scaling-type': 'automatic', + # 'scaling-criteria': [{ + # 'name': 'cpu_util_above_threshold', + # 'vnf-monitoring-param-ref': 'all_aaa_cpu_util', + # 'scale-out-relational-operation': 'GE', + # 'scale-in-threshold': 15, + # 'scale-out-threshold': 60, + # 'scale-in-relational-operation': 'LE' + # }], + # 'cooldown-time': 60 + # }], + # 'max-instance-count': 10, + # 'scaling-config-action': [ + # {'vnf-config-primitive-name-ref': 'touch', + # 'trigger': 'post-scale-out'}, + # {'vnf-config-primitive-name-ref': 'touch', + # 'trigger': 'pre-scale-in'} + # ], + # 'vdu': [{ + # 'vdu-id-ref': 'dataVM', + # 'count': 1 + # }] + # }] + # } + # } class TestDeployHackfest3Charmed(TestDeploy):