new thread to read from kafka and terminate ns/nsi when autoremove 31/7131/2
authortierno <alfonso.tiernosepulveda@telefonica.com>
Mon, 28 Jan 2019 17:28:10 +0000 (17:28 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Tue, 29 Jan 2019 08:57:53 +0000 (08:57 +0000)
Change-Id: Ibae5e0adb05716d506b1d571e583235f1e14aa40
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
osm_nbi/engine.py
osm_nbi/nbi.py
osm_nbi/subscriptions.py [new file with mode: 0644]
osm_nbi/tests/test.py

index f186a4c..b90f713 100644 (file)
@@ -28,7 +28,7 @@ from os import urandom
 from threading import Lock
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-min_common_version = "0.1.8"
+min_common_version = "0.1.16"
 
 
 class Engine(object):
@@ -98,7 +98,7 @@ class Engine(object):
                     self.msg.connect(config["message"])
                 else:
                     raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
-                        config["storage"]["driver"]))
+                        config["message"]["driver"]))
 
             self.write_lock = Lock()
             # create one class per topic
@@ -113,8 +113,8 @@ class Engine(object):
                 self.db.db_disconnect()
             if self.fs:
                 self.fs.fs_disconnect()
-            if self.fs:
-                self.fs.fs_disconnect()
+            if self.msg:
+                self.msg.disconnect()
             self.write_lock = None
         except (DbException, FsException, MsgException) as e:
             raise EngineException(str(e), http_code=e.http_code)
index a1e8b50..0417c9e 100644 (file)
@@ -27,6 +27,7 @@ import sys
 from authconn import AuthException
 from auth import Authenticator
 from engine import Engine, EngineException
+from subscriptions import SubscriptionThread
 from validation import ValidationError
 from osm_common.dbbase import DbException
 from osm_common.fsbase import FsException
@@ -41,6 +42,9 @@ __version__ = "0.1.3"
 version_date = "Jan 2019"
 database_version = '1.0'
 auth_database_version = '1.0'
+nbi_server = None           # instance of Server class
+subscription_thread = None  # instance of SubscriptionThread class
+
 
 """
 North Bound Interface  (O: OSM specific; 5,X: SOL005 not implemented yet; O5: SOL005 implemented)
@@ -904,6 +908,8 @@ def _start_service():
     Set database, storage, message configuration
     Init database with admin/admin user password
     """
+    global nbi_server
+    global subscription_thread
     cherrypy.log.error("Starting osm_nbi")
     # update general cherrypy configuration
     update_dict = {}
@@ -988,6 +994,11 @@ def _start_service():
     cherrypy.tree.apps['/osm'].root.engine.init_db(target_version=database_version)
     cherrypy.tree.apps['/osm'].root.authenticator.init_db(target_version=auth_database_version)
 
+    # start subscriptions thread:
+    subscription_thread = SubscriptionThread(config=engine_config, engine=nbi_server.engine)
+    subscription_thread.start()
+    # Do not capture except SubscriptionException
+
     # load and print version. Ignore possible errors, e.g. file not found
     try:
         with open("{}/version".format(engine_config["/static"]['tools.staticdir.dir'])) as version_file:
@@ -1002,11 +1013,15 @@ def _stop_service():
     Callback function called when cherrypy.engine stops
     TODO: Ending database connections.
     """
+    global subscription_thread
+    subscription_thread.terminate()
+    subscription_thread = None
     cherrypy.tree.apps['/osm'].root.engine.stop()
     cherrypy.log.error("Stopping osm_nbi")
 
 
 def nbi(config_file):
+    global nbi_server
     # conf = {
     #     '/': {
     #         #'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
@@ -1024,9 +1039,10 @@ def nbi(config_file):
     # cherrypy.config.update({'tools.auth_basic.on': True,
     #    'tools.auth_basic.realm': 'localhost',
     #    'tools.auth_basic.checkpassword': validate_password})
+    nbi_server = Server()
     cherrypy.engine.subscribe('start', _start_service)
     cherrypy.engine.subscribe('stop', _stop_service)
-    cherrypy.quickstart(Server(), '/osm', config_file)
+    cherrypy.quickstart(nbi_server, '/osm', config_file)
 
 
 def usage():
diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py
new file mode 100644 (file)
index 0000000..64fc2bf
--- /dev/null
@@ -0,0 +1,156 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module implements a thread that reads from kafka bus implementing all the subscriptions.
+It is based on asyncio.
+To avoid race conditions it uses same engine class as the main module for database changes
+For the moment this module only deletes NS instances when they are terminated with the autoremove flag
+"""
+
+import logging
+import threading
+import asyncio
+from http import HTTPStatus
+from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common.dbbase import DbException
+from osm_common.msgbase import MsgException
+from engine import EngineException
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class SubscriptionException(Exception):
+
+    def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
+        self.http_code = http_code
+        Exception.__init__(self, message)
+
+
+class SubscriptionThread(threading.Thread):
+
+    def __init__(self, config, engine):
+        """
+        Constructor of class
+        :param config: configuration parameters of database and messaging
+        :param engine: an instance of Engine class, used for deleting instances
+        """
+        threading.Thread.__init__(self)
+
+        self.config = config
+        self.db = None
+        self.msg = None
+        self.engine = engine
+        self.loop = None
+        self.logger = logging.getLogger("nbi.subscriptions")
+        self.aiomain_task = None  # asyncio task for receiving kafka bus
+        self.internal_session = {  # used for a session to the engine methods
+            "_id": "subscription",
+            "id": "subscription",
+            "project_id": "admin",
+            "admin": True
+        }
+
+    def run(self):
+        """
+        Start of the thread
+        :return: None
+        """
+        self.loop = asyncio.new_event_loop()
+        try:
+            if not self.db:
+                if self.config["database"]["driver"] == "mongo":
+                    self.db = dbmongo.DbMongo()
+                    self.db.db_connect(self.config["database"])
+                elif self.config["database"]["driver"] == "memory":
+                    self.db = dbmemory.DbMemory()
+                    self.db.db_connect(self.config["database"])
+                else:
+                    raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
+                        self.config["database"]["driver"]))
+            if not self.msg:
+                config_msg = self.config["message"].copy()
+                config_msg["loop"] = self.loop
+                if config_msg["driver"] == "local":
+                    self.msg = msglocal.MsgLocal()
+                    self.msg.connect(config_msg)
+                elif config_msg["driver"] == "kafka":
+                    self.msg = msgkafka.MsgKafka()
+                    self.msg.connect(config_msg)
+                else:
+                    raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
+                        config_msg["driver"]))
+
+        except (DbException, MsgException) as e:
+            raise SubscriptionException(str(e), http_code=e.http_code)
+
+        self.logger.debug("Starting")
+        while True:
+            try:
+                self.aiomain_task = asyncio.ensure_future(self.msg.aioread("ns", loop=self.loop,
+                                                                           callback=self._msg_callback),
+                                                          loop=self.loop)
+                self.loop.run_until_complete(self.aiomain_task)
+            except asyncio.CancelledError:
+                break  # if cancelled it should end, breaking loop
+            except Exception as e:
+                self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+
+        self.logger.debug("Finishing")
+        self._stop()
+        self.loop.close()
+
+    def _msg_callback(self, topic, command, params):
+        """
+        Callback to process a received message from kafka
+        :param topic:  topic received
+        :param command:  command received
+        :param params: rest of parameters
+        :return: None
+        """
+        try:
+            if topic == "ns":
+                if command == "terminated":
+                    self.logger.debug("received ns terminated {}".format(params))
+                    if params.get("autoremove"):
+                        self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
+                        self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
+                    return
+        except (EngineException, DbException, MsgException) as e:
+            self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+        except Exception as e:
+            self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
+                                  exc_info=True)
+
+    def _stop(self):
+        """
+        Close all connections
+        :return: None
+        """
+        try:
+            if self.db:
+                self.db.db_disconnect()
+            if self.msg:
+                self.msg.disconnect()
+        except (DbException, MsgException) as e:
+            raise SubscriptionException(str(e), http_code=e.http_code)
+
+    def terminate(self):
+        """
+        This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
+        but not immediately.
+        :return: None
+        """
+        self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
index 8c5ad14..6bb04d9 100755 (executable)
@@ -1002,6 +1002,29 @@ class TestDeployHackfestCirros(TestDeploy):
         self.users = {'1': "cirros", '2': "cirros"}
         self.passwords = {'1': "cubswin:)", '2': "cubswin:)"}
 
+    def terminate(self, engine):
+        # Make a delete in one step, overriding the normal two step of TestDeploy that launched terminate and delete
+        if test_osm:
+            engine.test("Terminate and delete NS in one step", "DELETE", "/nslcm/v1/ns_instances_content/{}".
+                        format(self.ns_id), headers_yaml, None, 202, None, "yaml")
+
+            engine .wait_until_delete("/nslcm/v1/ns_instances/{}".format(self.ns_id), timeout_deploy)
+        else:
+            engine.test("Delete NS with FORCE", "DELETE", "/nslcm/v1/ns_instances/{}?FORCE=True".format(self.ns_id),
+                        headers_yaml, None, 204, None, 0)
+
+        # check all it is deleted
+        engine.test("Check NS is deleted", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None,
+                    404, None, "yaml")
+        r = engine.test("Check NSLCMOPs are deleted", "GET",
+                        "/nslcm/v1/ns_lcm_op_occs?nsInstanceId={}".format(self.ns_id), headers_json, None,
+                        200, None, "json")
+        if not r:
+            return
+        nslcmops = r.json()
+        if not isinstance(nslcmops, list) or nslcmops:
+            raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_id, nslcmops))
+
 
 class TestDeployHackfest1(TestDeploy):
     description = "Load and deploy Hackfest_1_vnfd example"
@@ -1168,52 +1191,49 @@ class TestDeployHackfest4(TestDeploy):
         self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
         self.users = {'1': "ubuntu", '2': "ubuntu"}
         self.passwords = {'1': "osm4u", '2': "osm4u"}
-
-    def create_descriptors(self, engine):
-        super().create_descriptors(engine)
         # Modify VNFD to add scaling
-        self.descriptor_edit = {
-            "vnfd0": {
-                'vnf-configuration': {
-                    'config-primitive': [{
-                        'name': 'touch',
-                        'parameter': [{
-                            'name': 'filename',
-                            'data-type': 'STRING',
-                            'default-value': '/home/ubuntu/touched'
-                        }]
-                    }]
-                },
-                'scaling-group-descriptor': [{
-                    'name': 'scale_dataVM',
-                    'scaling-policy': [{
-                        'threshold-time': 0,
-                        'name': 'auto_cpu_util_above_threshold',
-                        'scaling-type': 'automatic',
-                        'scaling-criteria': [{
-                            'name': 'cpu_util_above_threshold',
-                            'vnf-monitoring-param-ref': 'all_aaa_cpu_util',
-                            'scale-out-relational-operation': 'GE',
-                            'scale-in-threshold': 15,
-                            'scale-out-threshold': 60,
-                            'scale-in-relational-operation': 'LE'
-                        }],
-                        'cooldown-time': 60
-                    }],
-                    'max-instance-count': 10,
-                    'scaling-config-action': [
-                        {'vnf-config-primitive-name-ref': 'touch',
-                         'trigger': 'post-scale-out'},
-                        {'vnf-config-primitive-name-ref': 'touch',
-                         'trigger': 'pre-scale-in'}
-                    ],
-                    'vdu': [{
-                        'vdu-id-ref': 'dataVM',
-                        'count': 1
-                    }]
-                }]
-            }
-        }
+        self.descriptor_edit = {
+            "vnfd0": {
+                'vnf-configuration': {
+                    'config-primitive': [{
+                        'name': 'touch',
+                        'parameter': [{
+                            'name': 'filename',
+                            'data-type': 'STRING',
+                            'default-value': '/home/ubuntu/touched'
+                        }]
+                    }]
+                },
+                'scaling-group-descriptor': [{
+                    'name': 'scale_dataVM',
+                    'scaling-policy': [{
+                        'threshold-time': 0,
+                        'name': 'auto_cpu_util_above_threshold',
+                        'scaling-type': 'automatic',
+                        'scaling-criteria': [{
+                            'name': 'cpu_util_above_threshold',
+                            'vnf-monitoring-param-ref': 'all_aaa_cpu_util',
+                            'scale-out-relational-operation': 'GE',
+                            'scale-in-threshold': 15,
+                            'scale-out-threshold': 60,
+                            'scale-in-relational-operation': 'LE'
+                        }],
+                        'cooldown-time': 60
+                    }],
+                    'max-instance-count': 10,
+                    'scaling-config-action': [
+                        {'vnf-config-primitive-name-ref': 'touch',
+                         'trigger': 'post-scale-out'},
+                        {'vnf-config-primitive-name-ref': 'touch',
+                         'trigger': 'pre-scale-in'}
+                    ],
+                    'vdu': [{
+                        'vdu-id-ref': 'dataVM',
+                        'count': 1
+                    }]
+                }]
+            }
+        }
 
 
 class TestDeployHackfest3Charmed(TestDeploy):