Feature 7184 New Generation RO enhancemnt
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
diff --git a/NG-RO/osm_ng_ro/vim_admin.py b/NG-RO/osm_ng_ro/vim_admin.py
new file mode 100644 (file)
index 0000000..d7a7c1d
--- /dev/null
@@ -0,0 +1,226 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module implements a thread that reads from kafka bus reading VIM messages.
+It is based on asyncio.
+It is in charge of load tasks assigned to VIMs that nobody is in chage of it
+"""
+
+import logging
+import threading
+import asyncio
+from http import HTTPStatus
+
+from osm_common import dbmongo, dbmemory, msglocal, msgkafka
+from osm_common.dbbase import DbException
+from osm_common.msgbase import MsgException
+from osm_ng_ro.ns import NsException
+from time import time
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class VimAdminException(Exception):
+
+    def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
+        self.http_code = http_code
+        Exception.__init__(self, message)
+
+
+class VimAdminThread(threading.Thread):
+    MAX_TIME_LOCKED = 3600  # 1h
+    MAX_TIME_UNATTENDED = 60  # 600  # 10min
+    kafka_topics = ("vim_account", "wim_account", "sdn")
+
+    def __init__(self, config, engine):
+        """
+        Constructor of class
+        :param config: configuration parameters of database and messaging
+        :param engine: an instance of Engine class, used for deleting instances
+        """
+        threading.Thread.__init__(self)
+        self.to_terminate = False
+        self.config = config
+        self.db = None
+        self.msg = None
+        self.engine = engine
+        self.loop = None
+        self.last_rotask_time = 0
+        self.logger = logging.getLogger("ro.vimadmin")
+        self.aiomain_task_kafka = None  # asyncio task for receiving vim actions from kafka bus
+        self.aiomain_task_vim = None  # asyncio task for watching ro_tasks not processed by nobody
+
+    async def vim_watcher(self):
+        """ Reads database periodically looking for tasks not processed by nobody because of a restar
+        in order to load this vim"""
+        while not self.to_terminate:
+            now = time()
+            if not self.last_rotask_time:
+                self.last_rotask_time = 0
+            ro_tasks = self.db.get_list("ro_tasks",
+                                        q_filter={"target_id.ncont": self.engine.assignment_list,
+                                                  "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+                                                  "locked_at.lt": now - self.MAX_TIME_LOCKED,
+                                                  "to_check_at.gt": self.last_rotask_time,
+                                                  "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
+            self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
+            for ro_task in ro_tasks:
+                if ro_task["target_id"] not in self.engine.assignment_list:
+                    self.engine.assign_vim(ro_task["target_id"])
+                    self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
+
+            await asyncio.sleep(300, loop=self.loop)
+
+    async def aiomain(self):
+        kafka_working = True
+        while not self.to_terminate:
+            try:
+                if not self.aiomain_task_kafka:
+                    # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
+                    await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
+                    kafka_working = True
+                    self.logger.debug("Starting vim_account subscription task")
+                    self.aiomain_task_kafka = asyncio.ensure_future(
+                        self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
+                                         aiocallback=self._msg_callback),
+                        loop=self.loop)
+                if not self.aiomain_task_vim:
+                    self.aiomain_task_vim = asyncio.ensure_future(
+                        self.vim_watcher(),
+                        loop=self.loop)
+                done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
+                                             timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                try:
+                    if self.aiomain_task_kafka in done:
+                        exc = self.aiomain_task_kafka.exception()
+                        self.logger.error("kafka subscription task exception: {}".format(exc))
+                        self.aiomain_task_kafka = None
+                    if self.aiomain_task_vim in done:
+                        exc = self.aiomain_task_vim.exception()
+                        self.logger.error("vim_account watcher task exception: {}".format(exc))
+                        self.aiomain_task_vim = None
+                except asyncio.CancelledError:
+                    pass
+
+            except Exception as e:
+                if self.to_terminate:
+                    return
+                if kafka_working:
+                    # logging only first time
+                    self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
+                    kafka_working = False
+            await asyncio.sleep(10, loop=self.loop)
+
+    def run(self):
+        """
+        Start of the thread
+        :return: None
+        """
+        self.loop = asyncio.new_event_loop()
+        try:
+            if not self.db:
+                if self.config["database"]["driver"] == "mongo":
+                    self.db = dbmongo.DbMongo()
+                    self.db.db_connect(self.config["database"])
+                elif self.config["database"]["driver"] == "memory":
+                    self.db = dbmemory.DbMemory()
+                    self.db.db_connect(self.config["database"])
+                else:
+                    raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
+                        self.config["database"]["driver"]))
+            if not self.msg:
+                config_msg = self.config["message"].copy()
+                config_msg["loop"] = self.loop
+                if config_msg["driver"] == "local":
+                    self.msg = msglocal.MsgLocal()
+                    self.msg.connect(config_msg)
+                elif config_msg["driver"] == "kafka":
+                    self.msg = msgkafka.MsgKafka()
+                    self.msg.connect(config_msg)
+                else:
+                    raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
+                        config_msg["driver"]))
+        except (DbException, MsgException) as e:
+            raise VimAdminException(str(e), http_code=e.http_code)
+
+        self.logger.debug("Starting")
+        while not self.to_terminate:
+            try:
+                self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
+            # except asyncio.CancelledError:
+            #     break  # if cancelled it should end, breaking loop
+            except Exception as e:
+                if not self.to_terminate:
+                    self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
+
+        self.logger.debug("Finishing")
+        self._stop()
+        self.loop.close()
+
+    async def _msg_callback(self, topic, command, params):
+        """
+        Callback to process a received message from kafka
+        :param topic:  topic received
+        :param command:  command received
+        :param params: rest of parameters
+        :return: None
+        """
+        try:
+            if command == "echo":
+                return
+            if topic in self.kafka_topics:
+                target = topic[0:3]   # vim, wim or sdn
+                target_id = target + ":" + params["_id"]
+                if command in ("edited", "edit"):
+                    self.engine.reload_vim(target_id)
+                    self.logger.debug("ordered to reload {}".format(target_id))
+                elif command in ("deleted", "delete"):
+                    self.engine.unload_vim(target_id)
+                    self.logger.debug("ordered to unload {}".format(target_id))
+                elif command in ("create", "created"):
+                    self.engine.check_vim(target_id)
+                    self.logger.debug("ordered to check {}".format(target_id))
+
+        except (NsException, DbException, MsgException) as e:
+            self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
+        except Exception as e:
+            self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
+                                  exc_info=True)
+
+    def _stop(self):
+        """
+        Close all connections
+        :return: None
+        """
+        try:
+            if self.db:
+                self.db.db_disconnect()
+            if self.msg:
+                self.msg.disconnect()
+        except (DbException, MsgException) as e:
+            raise VimAdminException(str(e), http_code=e.http_code)
+
+    def terminate(self):
+        """
+        This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
+        but not immediately.
+        :return: None
+        """
+        self.to_terminate = True
+        if self.aiomain_task_kafka:
+            self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
+        if self.aiomain_task_vim:
+            self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)