| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 12 | # implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | """ |
| 17 | This module implements a thread that reads from kafka bus implementing all the subscriptions. |
| 18 | It is based on asyncio. |
| 19 | To avoid race conditions it uses same engine class as the main module for database changes |
| 20 | For the moment this module only deletes NS instances when they are terminated with the autoremove flag |
| 21 | """ |
| 22 | |
| 23 | import logging |
| 24 | import threading |
| 25 | import asyncio |
| 26 | from http import HTTPStatus |
| 27 | from osm_common import dbmongo, dbmemory, msglocal, msgkafka |
| 28 | from osm_common.dbbase import DbException |
| 29 | from osm_common.msgbase import MsgException |
| 30 | from engine import EngineException |
| 31 | |
| 32 | __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
| 33 | |
| 34 | |
| 35 | class SubscriptionException(Exception): |
| 36 | |
| 37 | def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
| 38 | self.http_code = http_code |
| 39 | Exception.__init__(self, message) |
| 40 | |
| 41 | |
| 42 | class SubscriptionThread(threading.Thread): |
| 43 | |
| 44 | def __init__(self, config, engine): |
| 45 | """ |
| 46 | Constructor of class |
| 47 | :param config: configuration parameters of database and messaging |
| 48 | :param engine: an instance of Engine class, used for deleting instances |
| 49 | """ |
| 50 | threading.Thread.__init__(self) |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 51 | self.to_terminate = False |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 52 | self.config = config |
| 53 | self.db = None |
| 54 | self.msg = None |
| 55 | self.engine = engine |
| 56 | self.loop = None |
| 57 | self.logger = logging.getLogger("nbi.subscriptions") |
| 58 | self.aiomain_task = None # asyncio task for receiving kafka bus |
| 59 | self.internal_session = { # used for a session to the engine methods |
| tierno | 86e916a | 2019-05-29 21:39:37 +0000 | [diff] [blame] | 60 | "project_id": (), |
| 61 | "set_project": (), |
| 62 | "admin": True, |
| 63 | "force": False, |
| 64 | "public": None, |
| 65 | "method": "delete", |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 66 | } |
| 67 | |
| 68 | def run(self): |
| 69 | """ |
| 70 | Start of the thread |
| 71 | :return: None |
| 72 | """ |
| 73 | self.loop = asyncio.new_event_loop() |
| 74 | try: |
| 75 | if not self.db: |
| 76 | if self.config["database"]["driver"] == "mongo": |
| 77 | self.db = dbmongo.DbMongo() |
| 78 | self.db.db_connect(self.config["database"]) |
| 79 | elif self.config["database"]["driver"] == "memory": |
| 80 | self.db = dbmemory.DbMemory() |
| 81 | self.db.db_connect(self.config["database"]) |
| 82 | else: |
| 83 | raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format( |
| 84 | self.config["database"]["driver"])) |
| 85 | if not self.msg: |
| 86 | config_msg = self.config["message"].copy() |
| 87 | config_msg["loop"] = self.loop |
| 88 | if config_msg["driver"] == "local": |
| 89 | self.msg = msglocal.MsgLocal() |
| 90 | self.msg.connect(config_msg) |
| 91 | elif config_msg["driver"] == "kafka": |
| 92 | self.msg = msgkafka.MsgKafka() |
| 93 | self.msg.connect(config_msg) |
| 94 | else: |
| 95 | raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format( |
| 96 | config_msg["driver"])) |
| 97 | |
| 98 | except (DbException, MsgException) as e: |
| 99 | raise SubscriptionException(str(e), http_code=e.http_code) |
| 100 | |
| 101 | self.logger.debug("Starting") |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 102 | while not self.to_terminate: |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 103 | try: |
| Felipe Vicens | 09e6542 | 2019-01-22 15:06:46 +0100 | [diff] [blame] | 104 | self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop, |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 105 | callback=self._msg_callback), |
| 106 | loop=self.loop) |
| 107 | self.loop.run_until_complete(self.aiomain_task) |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 108 | # except asyncio.CancelledError: |
| 109 | # break # if cancelled it should end, breaking loop |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 110 | except Exception as e: |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 111 | if not self.to_terminate: |
| 112 | self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 113 | |
| 114 | self.logger.debug("Finishing") |
| 115 | self._stop() |
| 116 | self.loop.close() |
| 117 | |
| 118 | def _msg_callback(self, topic, command, params): |
| 119 | """ |
| 120 | Callback to process a received message from kafka |
| 121 | :param topic: topic received |
| 122 | :param command: command received |
| 123 | :param params: rest of parameters |
| 124 | :return: None |
| 125 | """ |
| 126 | try: |
| 127 | if topic == "ns": |
| Felipe Vicens | 09e6542 | 2019-01-22 15:06:46 +0100 | [diff] [blame] | 128 | if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 129 | self.logger.debug("received ns terminated {}".format(params)) |
| 130 | if params.get("autoremove"): |
| 131 | self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"]) |
| 132 | self.logger.debug("ns={} deleted from database".format(params["nsr_id"])) |
| 133 | return |
| Felipe Vicens | 09e6542 | 2019-01-22 15:06:46 +0100 | [diff] [blame] | 134 | if topic == "nsi": |
| 135 | if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): |
| 136 | self.logger.debug("received nsi terminated {}".format(params)) |
| 137 | if params.get("autoremove"): |
| 138 | self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"]) |
| 139 | self.logger.debug("nsis={} deleted from database".format(params["nsir_id"])) |
| 140 | return |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 141 | except (EngineException, DbException, MsgException) as e: |
| 142 | self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e)) |
| 143 | except Exception as e: |
| 144 | self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e), |
| 145 | exc_info=True) |
| 146 | |
| 147 | def _stop(self): |
| 148 | """ |
| 149 | Close all connections |
| 150 | :return: None |
| 151 | """ |
| 152 | try: |
| 153 | if self.db: |
| 154 | self.db.db_disconnect() |
| 155 | if self.msg: |
| 156 | self.msg.disconnect() |
| 157 | except (DbException, MsgException) as e: |
| 158 | raise SubscriptionException(str(e), http_code=e.http_code) |
| 159 | |
| 160 | def terminate(self): |
| 161 | """ |
| 162 | This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
| 163 | but not immediately. |
| 164 | :return: None |
| 165 | """ |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 166 | self.to_terminate = True |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 167 | self.loop.call_soon_threadsafe(self.aiomain_task.cancel) |