| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 12 | # implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | """ |
| 17 | This module implements a thread that reads from kafka bus implementing all the subscriptions. |
| 18 | It is based on asyncio. |
| 19 | To avoid race conditions it uses same engine class as the main module for database changes |
| 20 | For the moment this module only deletes NS instances when they are terminated with the autoremove flag |
| 21 | """ |
| 22 | |
| 23 | import logging |
| 24 | import threading |
| 25 | import asyncio |
| 26 | from http import HTTPStatus |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 27 | |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 28 | from osm_common import dbmongo, dbmemory, msglocal, msgkafka |
| 29 | from osm_common.dbbase import DbException |
| 30 | from osm_common.msgbase import MsgException |
| tierno | 23acf40 | 2019-08-28 13:36:34 +0000 | [diff] [blame] | 31 | from osm_nbi.engine import EngineException |
| selvi.j | f100459 | 2022-04-29 05:42:35 +0000 | [diff] [blame] | 32 | from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 33 | |
| 34 | __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
| 35 | |
| 36 | |
| 37 | class SubscriptionException(Exception): |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 38 | def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
| 39 | self.http_code = http_code |
| 40 | Exception.__init__(self, message) |
| 41 | |
| 42 | |
| 43 | class SubscriptionThread(threading.Thread): |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 44 | def __init__(self, config, engine): |
| 45 | """ |
| 46 | Constructor of class |
| 47 | :param config: configuration parameters of database and messaging |
| 48 | :param engine: an instance of Engine class, used for deleting instances |
| 49 | """ |
| 50 | threading.Thread.__init__(self) |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 51 | self.to_terminate = False |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 52 | self.config = config |
| 53 | self.db = None |
| 54 | self.msg = None |
| 55 | self.engine = engine |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 56 | self.logger = logging.getLogger("nbi.subscriptions") |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 57 | self.aiomain_task_admin = ( |
| 58 | None # asyncio task for receiving admin actions from kafka bus |
| 59 | ) |
| 60 | self.aiomain_task = ( |
| 61 | None # asyncio task for receiving normal actions from kafka bus |
| 62 | ) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 63 | self.internal_session = { # used for a session to the engine methods |
| tierno | 86e916a | 2019-05-29 21:39:37 +0000 | [diff] [blame] | 64 | "project_id": (), |
| 65 | "set_project": (), |
| 66 | "admin": True, |
| 67 | "force": False, |
| 68 | "public": None, |
| 69 | "method": "delete", |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 70 | } |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 71 | self.nslcm = None |
| selvi.j | f100459 | 2022-04-29 05:42:35 +0000 | [diff] [blame] | 72 | self.vnflcm = None |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 73 | |
| tierno | ee27072 | 2019-06-07 14:44:09 +0000 | [diff] [blame] | 74 | async def start_kafka(self): |
| 75 | # timeout_wait_for_kafka = 3*60 |
| 76 | kafka_working = True |
| 77 | while not self.to_terminate: |
| 78 | try: |
| 79 | # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been |
| 80 | # created. |
| 81 | # Before subscribe, send dummy messages |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 82 | await self.msg.aiowrite( |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 83 | "admin", |
| 84 | "echo", |
| 85 | "dummy message", |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 86 | ) |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 87 | await self.msg.aiowrite("ns", "echo", "dummy message") |
| 88 | await self.msg.aiowrite("nsi", "echo", "dummy message") |
| 89 | await self.msg.aiowrite("vnf", "echo", "dummy message") |
| tierno | ee27072 | 2019-06-07 14:44:09 +0000 | [diff] [blame] | 90 | if not kafka_working: |
| 91 | self.logger.critical("kafka is working again") |
| 92 | kafka_working = True |
| tierno | f55e7ed | 2020-01-21 00:10:09 +0000 | [diff] [blame] | 93 | if not self.aiomain_task_admin: |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 94 | await asyncio.sleep(10) |
| delacruzramo | ad682a5 | 2019-12-10 16:26:34 +0100 | [diff] [blame] | 95 | self.logger.debug("Starting admin subscription task") |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 96 | self.aiomain_task_admin = asyncio.ensure_future( |
| 97 | self.msg.aioread( |
| 98 | ("admin",), |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 99 | group_id=False, |
| 100 | aiocallback=self._msg_callback, |
| 101 | ), |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 102 | ) |
| tierno | f55e7ed | 2020-01-21 00:10:09 +0000 | [diff] [blame] | 103 | if not self.aiomain_task: |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 104 | await asyncio.sleep(10) |
| delacruzramo | ad682a5 | 2019-12-10 16:26:34 +0100 | [diff] [blame] | 105 | self.logger.debug("Starting non-admin subscription task") |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 106 | self.aiomain_task = asyncio.ensure_future( |
| 107 | self.msg.aioread( |
| selvi.j | f100459 | 2022-04-29 05:42:35 +0000 | [diff] [blame] | 108 | ("ns", "nsi", "vnf"), |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 109 | aiocallback=self._msg_callback, |
| 110 | ), |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 111 | ) |
| 112 | done, _ = await asyncio.wait( |
| 113 | [self.aiomain_task, self.aiomain_task_admin], |
| 114 | timeout=None, |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 115 | return_when=asyncio.FIRST_COMPLETED, |
| 116 | ) |
| tierno | f55e7ed | 2020-01-21 00:10:09 +0000 | [diff] [blame] | 117 | try: |
| 118 | if self.aiomain_task_admin in done: |
| 119 | exc = self.aiomain_task_admin.exception() |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 120 | self.logger.error( |
| 121 | "admin subscription task exception: {}".format(exc) |
| 122 | ) |
| tierno | f55e7ed | 2020-01-21 00:10:09 +0000 | [diff] [blame] | 123 | self.aiomain_task_admin = None |
| 124 | if self.aiomain_task in done: |
| 125 | exc = self.aiomain_task.exception() |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 126 | self.logger.error( |
| 127 | "non-admin subscription task exception: {}".format(exc) |
| 128 | ) |
| tierno | f55e7ed | 2020-01-21 00:10:09 +0000 | [diff] [blame] | 129 | self.aiomain_task = None |
| 130 | except asyncio.CancelledError: |
| 131 | pass |
| tierno | ee27072 | 2019-06-07 14:44:09 +0000 | [diff] [blame] | 132 | except Exception as e: |
| 133 | if self.to_terminate: |
| 134 | return |
| 135 | if kafka_working: |
| 136 | # logging only first time |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 137 | self.logger.critical( |
| 138 | "Error accessing kafka '{}'. Retrying ...".format(e) |
| 139 | ) |
| tierno | ee27072 | 2019-06-07 14:44:09 +0000 | [diff] [blame] | 140 | kafka_working = False |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 141 | await asyncio.sleep(10) |
| tierno | ee27072 | 2019-06-07 14:44:09 +0000 | [diff] [blame] | 142 | |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 143 | def run(self): |
| 144 | """ |
| 145 | Start of the thread |
| 146 | :return: None |
| 147 | """ |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 148 | try: |
| 149 | if not self.db: |
| 150 | if self.config["database"]["driver"] == "mongo": |
| 151 | self.db = dbmongo.DbMongo() |
| 152 | self.db.db_connect(self.config["database"]) |
| 153 | elif self.config["database"]["driver"] == "memory": |
| 154 | self.db = dbmemory.DbMemory() |
| 155 | self.db.db_connect(self.config["database"]) |
| 156 | else: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 157 | raise SubscriptionException( |
| 158 | "Invalid configuration param '{}' at '[database]':'driver'".format( |
| 159 | self.config["database"]["driver"] |
| 160 | ) |
| 161 | ) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 162 | if not self.msg: |
| 163 | config_msg = self.config["message"].copy() |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 164 | if config_msg["driver"] == "local": |
| 165 | self.msg = msglocal.MsgLocal() |
| 166 | self.msg.connect(config_msg) |
| 167 | elif config_msg["driver"] == "kafka": |
| 168 | self.msg = msgkafka.MsgKafka() |
| 169 | self.msg.connect(config_msg) |
| 170 | else: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 171 | raise SubscriptionException( |
| 172 | "Invalid configuration param '{}' at '[message]':'driver'".format( |
| 173 | config_msg["driver"] |
| 174 | ) |
| 175 | ) |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 176 | self.nslcm = NsLcmNotification(self.db) |
| selvi.j | f100459 | 2022-04-29 05:42:35 +0000 | [diff] [blame] | 177 | self.vnflcm = VnfLcmNotification(self.db) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 178 | except (DbException, MsgException) as e: |
| 179 | raise SubscriptionException(str(e), http_code=e.http_code) |
| 180 | |
| 181 | self.logger.debug("Starting") |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 182 | while not self.to_terminate: |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 183 | try: |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 184 | asyncio.run(self.start_kafka()) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 185 | except Exception as e: |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 186 | if not self.to_terminate: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 187 | self.logger.exception( |
| 188 | "Exception '{}' at messaging read loop".format(e), exc_info=True |
| 189 | ) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 190 | |
| 191 | self.logger.debug("Finishing") |
| 192 | self._stop() |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 193 | |
| tierno | bee3bad | 2019-12-05 12:26:01 +0000 | [diff] [blame] | 194 | async def _msg_callback(self, topic, command, params): |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 195 | """ |
| 196 | Callback to process a received message from kafka |
| 197 | :param topic: topic received |
| 198 | :param command: command received |
| 199 | :param params: rest of parameters |
| 200 | :return: None |
| 201 | """ |
| tierno | bee3bad | 2019-12-05 12:26:01 +0000 | [diff] [blame] | 202 | msg_to_send = [] |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 203 | try: |
| 204 | if topic == "ns": |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 205 | if command == "terminated" and params["operationState"] in ( |
| 206 | "COMPLETED", |
| 207 | "PARTIALLY_COMPLETED", |
| 208 | ): |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 209 | self.logger.debug("received ns terminated {}".format(params)) |
| 210 | if params.get("autoremove"): |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 211 | self.engine.del_item( |
| 212 | self.internal_session, |
| 213 | "nsrs", |
| 214 | _id=params["nsr_id"], |
| 215 | not_send_msg=msg_to_send, |
| 216 | ) |
| 217 | self.logger.debug( |
| 218 | "ns={} deleted from database".format(params["nsr_id"]) |
| 219 | ) |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 220 | # Check for nslcm notification |
| 221 | if isinstance(params, dict): |
| 222 | # Check availability of operationState and command |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 223 | if ( |
| 224 | (not params.get("operationState")) |
| 225 | or (not command) |
| 226 | or (not params.get("operationParams")) |
| 227 | ): |
| 228 | self.logger.debug( |
| 229 | "Message can not be used for notification of nslcm" |
| 230 | ) |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 231 | else: |
| 232 | nsd_id = params["operationParams"].get("nsdId") |
| 233 | ns_instance_id = params["operationParams"].get("nsInstanceId") |
| 234 | # Any one among nsd_id, ns_instance_id should be present. |
| 235 | if not (nsd_id or ns_instance_id): |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 236 | self.logger.debug( |
| 237 | "Message can not be used for notification of nslcm" |
| 238 | ) |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 239 | else: |
| 240 | op_state = params["operationState"] |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 241 | event_details = { |
| 242 | "topic": topic, |
| 243 | "command": command.upper(), |
| 244 | "params": params, |
| 245 | } |
| 246 | subscribers = self.nslcm.get_subscribers( |
| 247 | nsd_id, |
| 248 | ns_instance_id, |
| 249 | command.upper(), |
| 250 | op_state, |
| 251 | event_details, |
| 252 | ) |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 253 | # self.logger.debug("subscribers list: ") |
| 254 | # self.logger.debug(subscribers) |
| tierno | 2278fa4 | 2020-08-10 13:53:57 +0000 | [diff] [blame] | 255 | if subscribers: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 256 | asyncio.ensure_future( |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 257 | self.nslcm.send_notifications(subscribers), |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 258 | ) |
| K Sai Kiran | bb70c81 | 2020-04-28 14:48:31 +0530 | [diff] [blame] | 259 | else: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 260 | self.logger.debug( |
| 261 | "Message can not be used for notification of nslcm" |
| 262 | ) |
| selvi.j | f100459 | 2022-04-29 05:42:35 +0000 | [diff] [blame] | 263 | elif topic == "vnf": |
| 264 | if isinstance(params, dict): |
| 265 | vnfd_id = params["vnfdId"] |
| 266 | vnf_instance_id = params["vnfInstanceId"] |
| 267 | if command == "create" or command == "delete": |
| 268 | op_state = command |
| 269 | else: |
| 270 | op_state = params["operationState"] |
| 271 | event_details = { |
| garciadeblas | f2af4a1 | 2023-01-24 16:56:54 +0100 | [diff] [blame] | 272 | "topic": topic, |
| 273 | "command": command.upper(), |
| 274 | "params": params, |
| 275 | } |
| selvi.j | f100459 | 2022-04-29 05:42:35 +0000 | [diff] [blame] | 276 | subscribers = self.vnflcm.get_subscribers( |
| garciadeblas | f2af4a1 | 2023-01-24 16:56:54 +0100 | [diff] [blame] | 277 | vnfd_id, |
| 278 | vnf_instance_id, |
| 279 | command.upper(), |
| 280 | op_state, |
| 281 | event_details, |
| 282 | ) |
| selvi.j | f100459 | 2022-04-29 05:42:35 +0000 | [diff] [blame] | 283 | if subscribers: |
| 284 | asyncio.ensure_future( |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 285 | self.vnflcm.send_notifications(subscribers), |
| garciadeblas | f2af4a1 | 2023-01-24 16:56:54 +0100 | [diff] [blame] | 286 | ) |
| tierno | bee3bad | 2019-12-05 12:26:01 +0000 | [diff] [blame] | 287 | elif topic == "nsi": |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 288 | if command == "terminated" and params["operationState"] in ( |
| 289 | "COMPLETED", |
| 290 | "PARTIALLY_COMPLETED", |
| 291 | ): |
| Felipe Vicens | 09e6542 | 2019-01-22 15:06:46 +0100 | [diff] [blame] | 292 | self.logger.debug("received nsi terminated {}".format(params)) |
| 293 | if params.get("autoremove"): |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 294 | self.engine.del_item( |
| 295 | self.internal_session, |
| 296 | "nsis", |
| 297 | _id=params["nsir_id"], |
| 298 | not_send_msg=msg_to_send, |
| 299 | ) |
| 300 | self.logger.debug( |
| 301 | "nsis={} deleted from database".format(params["nsir_id"]) |
| 302 | ) |
| delacruzramo | ad682a5 | 2019-12-10 16:26:34 +0100 | [diff] [blame] | 303 | elif topic == "admin": |
| 304 | self.logger.debug("received {} {} {}".format(topic, command, params)) |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 305 | if command in ["echo", "ping"]: # ignored commands |
| delacruzramo | ad682a5 | 2019-12-10 16:26:34 +0100 | [diff] [blame] | 306 | pass |
| 307 | elif command == "revoke_token": |
| 308 | if params: |
| 309 | if isinstance(params, dict) and "_id" in params: |
| 310 | tid = params.get("_id") |
| 311 | self.engine.authenticator.tokens_cache.pop(tid, None) |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 312 | self.logger.debug( |
| 313 | "token '{}' removed from token_cache".format(tid) |
| 314 | ) |
| delacruzramo | ad682a5 | 2019-12-10 16:26:34 +0100 | [diff] [blame] | 315 | else: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 316 | self.logger.debug( |
| 317 | "unrecognized params in command '{} {}': {}".format( |
| 318 | topic, command, params |
| 319 | ) |
| 320 | ) |
| delacruzramo | ad682a5 | 2019-12-10 16:26:34 +0100 | [diff] [blame] | 321 | else: |
| 322 | self.engine.authenticator.tokens_cache.clear() |
| 323 | self.logger.debug("token_cache cleared") |
| 324 | else: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 325 | self.logger.debug( |
| 326 | "unrecognized command '{} {}'".format(topic, command) |
| 327 | ) |
| delacruzramo | ad682a5 | 2019-12-10 16:26:34 +0100 | [diff] [blame] | 328 | # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, |
| tierno | bee3bad | 2019-12-05 12:26:01 +0000 | [diff] [blame] | 329 | # but content to be written is stored at msg_to_send |
| 330 | for msg in msg_to_send: |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 331 | await self.msg.aiowrite(*msg) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 332 | except (EngineException, DbException, MsgException) as e: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 333 | self.logger.error( |
| 334 | "Error while processing topic={} command={}: {}".format( |
| 335 | topic, command, e |
| 336 | ) |
| 337 | ) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 338 | except Exception as e: |
| garciadeblas | 4568a37 | 2021-03-24 09:19:48 +0100 | [diff] [blame] | 339 | self.logger.exception( |
| 340 | "Exception while processing topic={} command={}: {}".format( |
| 341 | topic, command, e |
| 342 | ), |
| 343 | exc_info=True, |
| 344 | ) |
| tierno | 932499c | 2019-01-28 17:28:10 +0000 | [diff] [blame] | 345 | |
| 346 | def _stop(self): |
| 347 | """ |
| 348 | Close all connections |
| 349 | :return: None |
| 350 | """ |
| 351 | try: |
| 352 | if self.db: |
| 353 | self.db.db_disconnect() |
| 354 | if self.msg: |
| 355 | self.msg.disconnect() |
| 356 | except (DbException, MsgException) as e: |
| 357 | raise SubscriptionException(str(e), http_code=e.http_code) |
| 358 | |
| 359 | def terminate(self): |
| 360 | """ |
| 361 | This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
| 362 | but not immediately. |
| 363 | :return: None |
| 364 | """ |
| tierno | 65ca36d | 2019-02-12 19:27:52 +0100 | [diff] [blame] | 365 | self.to_terminate = True |
| tierno | ee27072 | 2019-06-07 14:44:09 +0000 | [diff] [blame] | 366 | if self.aiomain_task: |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 367 | asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel) |
| tierno | f55e7ed | 2020-01-21 00:10:09 +0000 | [diff] [blame] | 368 | if self.aiomain_task_admin: |
| Mark Beierl | 375aeb2 | 2023-05-10 13:55:55 -0400 | [diff] [blame] | 369 | asyncio.get_event_loop().call_soon_threadsafe( |
| 370 | self.aiomain_task_admin.cancel |
| 371 | ) |