| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 12 | # implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | """ |
| 17 | This module implements a thread that reads from kafka bus reading VIM messages. |
| 18 | It is based on asyncio. |
| 19 | It is in charge of load tasks assigned to VIMs that nobody is in chage of it |
| 20 | """ |
| 21 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 22 | import asyncio |
| 23 | from http import HTTPStatus |
| sousaedu | 049cbb1 | 2022-01-05 11:39:35 +0000 | [diff] [blame] | 24 | import logging |
| 25 | import threading |
| 26 | from time import time |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 27 | |
| sousaedu | 049cbb1 | 2022-01-05 11:39:35 +0000 | [diff] [blame] | 28 | from osm_common import dbmemory, dbmongo, msgkafka, msglocal |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 29 | from osm_common.dbbase import DbException |
| 30 | from osm_common.msgbase import MsgException |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 31 | |
| 32 | __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
| 33 | |
| 34 | |
| 35 | class VimAdminException(Exception): |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 36 | def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
| 37 | self.http_code = http_code |
| 38 | Exception.__init__(self, message) |
| 39 | |
| 40 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 41 | class LockRenew: |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 42 | renew_list = [] |
| 43 | # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to |
| 44 | # be renewed. The time order is achieved as it is appended at the end |
| 45 | |
| 46 | def __init__(self, config, logger): |
| 47 | """ |
| 48 | Constructor of class |
| 49 | :param config: configuration parameters of database and messaging |
| 50 | """ |
| 51 | self.config = config |
| 52 | self.logger = logger |
| 53 | self.to_terminate = False |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 54 | self.db = None |
| 55 | self.task_locked_time = config["global"]["task_locked_time"] |
| 56 | self.task_relock_time = config["global"]["task_relock_time"] |
| 57 | self.task_max_locked_time = config["global"]["task_max_locked_time"] |
| 58 | |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 59 | def start(self, db): |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 60 | self.db = db |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 61 | |
| 62 | @staticmethod |
| 63 | def add_lock_object(database_table, database_object, thread_object): |
| 64 | """ |
| 65 | Insert a task to renew the locking |
| 66 | :param database_table: database collection where to maintain the lock |
| 67 | :param database_object: database object. '_id' and 'locked_at' are mandatory keys |
| 68 | :param thread_object: Thread object that has locked to check if it is alive |
| 69 | :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at' |
| 70 | """ |
| 71 | lock_object = { |
| 72 | "table": database_table, |
| 73 | "_id": database_object["_id"], |
| 74 | "initial_lock_time": database_object["locked_at"], |
| 75 | "locked_at": database_object["locked_at"], |
| 76 | "thread": thread_object, |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 77 | "unlocked": False, # True when it is not needed any more |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 78 | } |
| 79 | LockRenew.renew_list.append(lock_object) |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 80 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 81 | return lock_object |
| 82 | |
| 83 | @staticmethod |
| 84 | def remove_lock_object(lock_object): |
| 85 | lock_object["unlocked"] = True |
| 86 | |
| 87 | async def renew_locks(self): |
| 88 | while not self.to_terminate: |
| 89 | if not self.renew_list: |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 90 | await asyncio.sleep(self.task_locked_time - self.task_relock_time) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 91 | continue |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 92 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 93 | lock_object = self.renew_list[0] |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 94 | |
| 95 | if ( |
| 96 | lock_object["unlocked"] |
| 97 | or not lock_object["thread"] |
| 98 | or not lock_object["thread"].is_alive() |
| 99 | ): |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 100 | # task has been finished or locker thread is dead, not needed to re-locked. |
| 101 | self.renew_list.pop(0) |
| 102 | continue |
| 103 | |
| 104 | locked_at = lock_object["locked_at"] |
| 105 | now = time() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 106 | time_to_relock = ( |
| 107 | locked_at + self.task_locked_time - self.task_relock_time - now |
| 108 | ) |
| 109 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 110 | if time_to_relock < 1: |
| 111 | if lock_object["initial_lock_time"] + self.task_max_locked_time < now: |
| 112 | self.renew_list.pop(0) |
| 113 | # re-lock |
| 114 | new_locked_at = locked_at + self.task_locked_time |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 115 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 116 | try: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 117 | if self.db.set_one( |
| 118 | lock_object["table"], |
| 119 | update_dict={ |
| 120 | "locked_at": new_locked_at, |
| 121 | "modified_at": now, |
| 122 | }, |
| 123 | q_filter={ |
| 124 | "_id": lock_object["_id"], |
| 125 | "locked_at": locked_at, |
| 126 | }, |
| 127 | fail_on_empty=False, |
| 128 | ): |
| 129 | self.logger.debug( |
| 130 | "Renew lock for {}.{}".format( |
| 131 | lock_object["table"], lock_object["_id"] |
| 132 | ) |
| 133 | ) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 134 | lock_object["locked_at"] = new_locked_at |
| 135 | self.renew_list.append(lock_object) |
| 136 | else: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 137 | self.logger.info( |
| 138 | "Cannot renew lock for {}.{}".format( |
| 139 | lock_object["table"], lock_object["_id"] |
| 140 | ) |
| 141 | ) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 142 | except Exception as e: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 143 | self.logger.error( |
| 144 | "Exception when trying to renew lock for {}.{}: {}".format( |
| 145 | lock_object["table"], lock_object["_id"], e |
| 146 | ) |
| 147 | ) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 148 | else: |
| 149 | # wait until it is time to re-lock it |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 150 | await asyncio.sleep(time_to_relock) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 151 | |
| 152 | def stop(self): |
| 153 | # unlock all locked items |
| 154 | now = time() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 155 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 156 | for lock_object in self.renew_list: |
| 157 | locked_at = lock_object["locked_at"] |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 158 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 159 | if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 160 | self.db.set_one( |
| 161 | lock_object["table"], |
| 162 | update_dict={"locked_at": 0}, |
| 163 | q_filter={"_id": lock_object["_id"], "locked_at": locked_at}, |
| 164 | fail_on_empty=False, |
| 165 | ) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 166 | |
| 167 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 168 | class VimAdminThread(threading.Thread): |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 169 | MAX_TIME_UNATTENDED = 600 # 10min |
| 170 | TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 171 | kafka_topics = ("vim_account", "wim_account", "sdn") |
| 172 | |
| 173 | def __init__(self, config, engine): |
| 174 | """ |
| 175 | Constructor of class |
| 176 | :param config: configuration parameters of database and messaging |
| 177 | :param engine: an instance of Engine class, used for deleting instances |
| 178 | """ |
| 179 | threading.Thread.__init__(self) |
| 180 | self.to_terminate = False |
| 181 | self.config = config |
| 182 | self.db = None |
| 183 | self.msg = None |
| 184 | self.engine = engine |
| 185 | self.loop = None |
| 186 | self.last_rotask_time = 0 |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 187 | self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 188 | self.logger = logging.getLogger("ro.vimadmin") |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 189 | # asyncio task for receiving vim actions from kafka bus |
| 190 | self.aiomain_task_kafka = None |
| 191 | # asyncio task for watching ro_tasks not processed by nobody |
| 192 | self.aiomain_task_vim = None |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 193 | self.aiomain_task_renew_lock = None |
| 194 | # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order |
| 195 | self.lock_renew = LockRenew(config, self.logger) |
| 196 | self.task_locked_time = config["global"]["task_locked_time"] |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 197 | |
| 198 | async def vim_watcher(self): |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 199 | """Reads database periodically looking for tasks not processed by nobody because of a reboot |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 200 | in order to load this vim""" |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 201 | # firstly read VIMS not processed |
| 202 | for target_database in ("vim_accounts", "wim_accounts", "sdns"): |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 203 | unattended_targets = self.db.get_list( |
| 204 | target_database, |
| 205 | q_filter={"_admin.operations.operationState": "PROCESSING"}, |
| 206 | ) |
| 207 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 208 | for target in unattended_targets: |
| 209 | target_id = "{}:{}".format(target_database[:3], target["_id"]) |
| 210 | self.logger.info("ordered to check {}".format(target_id)) |
| 211 | self.engine.check_vim(target_id) |
| 212 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 213 | while not self.to_terminate: |
| 214 | now = time() |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 215 | processed_vims = [] |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 216 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 217 | if not self.last_rotask_time: |
| 218 | self.last_rotask_time = 0 |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 219 | |
| 220 | ro_tasks = self.db.get_list( |
| 221 | "ro_tasks", |
| 222 | q_filter={ |
| 223 | "target_id.ncont": self.engine.get_assigned_vims(), |
| 224 | "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], |
| 225 | "locked_at.lt": now - self.task_locked_time, |
| 226 | "to_check_at.gt": self.last_rotask_time, |
| 227 | "to_check_at.lte": now - self.MAX_TIME_UNATTENDED, |
| 228 | }, |
| 229 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 230 | self.last_rotask_time = now - self.MAX_TIME_UNATTENDED |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 231 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 232 | for ro_task in ro_tasks: |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 233 | # if already checked ignore |
| 234 | if ro_task["target_id"] in processed_vims: |
| 235 | continue |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 236 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 237 | processed_vims.append(ro_task["target_id"]) |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 238 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 239 | # if already assigned ignore |
| 240 | if ro_task["target_id"] in self.engine.get_assigned_vims(): |
| 241 | continue |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 242 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 243 | # if there is some task locked on this VIM, there is an RO working on it, so ignore |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 244 | if self.db.get_list( |
| 245 | "ro_tasks", |
| 246 | q_filter={ |
| 247 | "target_id": ro_task["target_id"], |
| 248 | "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], |
| 249 | "locked_at.gt": now - self.task_locked_time, |
| 250 | }, |
| 251 | ): |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 252 | continue |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 253 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 254 | # unattended, assign vim |
| 255 | self.engine.assign_vim(ro_task["target_id"]) |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 256 | self.logger.debug( |
| 257 | "ordered to load {}. Inactivity detected".format( |
| 258 | ro_task["target_id"] |
| 259 | ) |
| 260 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 261 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 262 | # every 2 hours check if there are vims without any ro_task and unload it |
| 263 | if now > self.next_check_unused_vim: |
| 264 | self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM |
| 265 | self.engine.unload_unused_vims() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 266 | |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 267 | await asyncio.sleep(self.MAX_TIME_UNATTENDED) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 268 | |
| 269 | async def aiomain(self): |
| 270 | kafka_working = True |
| 271 | while not self.to_terminate: |
| 272 | try: |
| 273 | if not self.aiomain_task_kafka: |
| garciadeblas | dc5ab15 | 2021-05-20 13:08:05 +0200 | [diff] [blame] | 274 | for kafka_topic in self.kafka_topics: |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 275 | await self.msg.aiowrite(kafka_topic, "echo", "dummy message") |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 276 | kafka_working = True |
| 277 | self.logger.debug("Starting vim_account subscription task") |
| 278 | self.aiomain_task_kafka = asyncio.ensure_future( |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 279 | self.msg.aioread( |
| 280 | self.kafka_topics, |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 281 | group_id=False, |
| 282 | aiocallback=self._msg_callback, |
| 283 | ), |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 284 | ) |
| 285 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 286 | if not self.aiomain_task_vim: |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 287 | self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher()) |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 288 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 289 | if not self.aiomain_task_renew_lock: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 290 | self.aiomain_task_renew_lock = asyncio.ensure_future( |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 291 | self.lock_renew.renew_locks() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 292 | ) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 293 | |
| 294 | done, _ = await asyncio.wait( |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 295 | [ |
| 296 | self.aiomain_task_kafka, |
| 297 | self.aiomain_task_vim, |
| 298 | self.aiomain_task_renew_lock, |
| 299 | ], |
| 300 | timeout=None, |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 301 | return_when=asyncio.FIRST_COMPLETED, |
| 302 | ) |
| 303 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 304 | try: |
| 305 | if self.aiomain_task_kafka in done: |
| 306 | exc = self.aiomain_task_kafka.exception() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 307 | self.logger.error( |
| 308 | "kafka subscription task exception: {}".format(exc) |
| 309 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 310 | self.aiomain_task_kafka = None |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 311 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 312 | if self.aiomain_task_vim in done: |
| 313 | exc = self.aiomain_task_vim.exception() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 314 | self.logger.error( |
| 315 | "vim_account watcher task exception: {}".format(exc) |
| 316 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 317 | self.aiomain_task_vim = None |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 318 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 319 | if self.aiomain_task_renew_lock in done: |
| 320 | exc = self.aiomain_task_renew_lock.exception() |
| 321 | self.logger.error("renew_locks task exception: {}".format(exc)) |
| 322 | self.aiomain_task_renew_lock = None |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 323 | except asyncio.CancelledError: |
| aticig | 7b521f7 | 2022-07-15 00:43:09 +0300 | [diff] [blame] | 324 | self.logger.exception("asyncio.CancelledError occured.") |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 325 | |
| 326 | except Exception as e: |
| 327 | if self.to_terminate: |
| 328 | return |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 329 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 330 | if kafka_working: |
| 331 | # logging only first time |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 332 | self.logger.critical( |
| 333 | "Error accessing kafka '{}'. Retrying ...".format(e) |
| 334 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 335 | kafka_working = False |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 336 | |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 337 | await asyncio.sleep(10) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 338 | |
| 339 | def run(self): |
| 340 | """ |
| 341 | Start of the thread |
| 342 | :return: None |
| 343 | """ |
| 344 | self.loop = asyncio.new_event_loop() |
| 345 | try: |
| 346 | if not self.db: |
| 347 | if self.config["database"]["driver"] == "mongo": |
| 348 | self.db = dbmongo.DbMongo() |
| 349 | self.db.db_connect(self.config["database"]) |
| 350 | elif self.config["database"]["driver"] == "memory": |
| 351 | self.db = dbmemory.DbMemory() |
| 352 | self.db.db_connect(self.config["database"]) |
| 353 | else: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 354 | raise VimAdminException( |
| 355 | "Invalid configuration param '{}' at '[database]':'driver'".format( |
| 356 | self.config["database"]["driver"] |
| 357 | ) |
| 358 | ) |
| 359 | |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 360 | self.lock_renew.start(self.db) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 361 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 362 | if not self.msg: |
| 363 | config_msg = self.config["message"].copy() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 364 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 365 | if config_msg["driver"] == "local": |
| 366 | self.msg = msglocal.MsgLocal() |
| 367 | self.msg.connect(config_msg) |
| 368 | elif config_msg["driver"] == "kafka": |
| 369 | self.msg = msgkafka.MsgKafka() |
| 370 | self.msg.connect(config_msg) |
| 371 | else: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 372 | raise VimAdminException( |
| 373 | "Invalid configuration param '{}' at '[message]':'driver'".format( |
| 374 | config_msg["driver"] |
| 375 | ) |
| 376 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 377 | except (DbException, MsgException) as e: |
| 378 | raise VimAdminException(str(e), http_code=e.http_code) |
| 379 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 380 | self.logger.info("Starting") |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 381 | while not self.to_terminate: |
| 382 | try: |
| Gulsum Atici | aba1518 | 2023-05-15 11:55:13 +0300 | [diff] [blame] | 383 | asyncio.run(self.main_task()) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 384 | except Exception as e: |
| 385 | if not self.to_terminate: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 386 | self.logger.exception( |
| 387 | "Exception '{}' at messaging read loop".format(e), exc_info=True |
| 388 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 389 | |
| tierno | 8615352 | 2020-12-06 18:27:16 +0000 | [diff] [blame] | 390 | self.logger.info("Finishing") |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 391 | self._stop() |
| 392 | self.loop.close() |
| 393 | |
| Gulsum Atici | a264b7a | 2023-05-09 14:57:22 +0300 | [diff] [blame] | 394 | async def main_task(self): |
| 395 | task = asyncio.ensure_future(self.aiomain()) |
| 396 | await task |
| 397 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 398 | async def _msg_callback(self, topic, command, params): |
| 399 | """ |
| 400 | Callback to process a received message from kafka |
| 401 | :param topic: topic received |
| 402 | :param command: command received |
| 403 | :param params: rest of parameters |
| 404 | :return: None |
| 405 | """ |
| 406 | try: |
| 407 | if command == "echo": |
| 408 | return |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 409 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 410 | if topic in self.kafka_topics: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 411 | target = topic[0:3] # vim, wim or sdn |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 412 | target_id = target + ":" + params["_id"] |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 413 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 414 | if command in ("edited", "edit"): |
| 415 | self.engine.reload_vim(target_id) |
| 416 | self.logger.debug("ordered to reload {}".format(target_id)) |
| 417 | elif command in ("deleted", "delete"): |
| 418 | self.engine.unload_vim(target_id) |
| 419 | self.logger.debug("ordered to unload {}".format(target_id)) |
| 420 | elif command in ("create", "created"): |
| 421 | self.engine.check_vim(target_id) |
| 422 | self.logger.debug("ordered to check {}".format(target_id)) |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 423 | except (DbException, MsgException) as e: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 424 | self.logger.error( |
| 425 | "Error while processing topic={} command={}: {}".format( |
| 426 | topic, command, e |
| 427 | ) |
| 428 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 429 | except Exception as e: |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 430 | self.logger.exception( |
| 431 | "Exception while processing topic={} command={}: {}".format( |
| 432 | topic, command, e |
| 433 | ), |
| 434 | exc_info=True, |
| 435 | ) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 436 | |
| 437 | def _stop(self): |
| 438 | """ |
| 439 | Close all connections |
| 440 | :return: None |
| 441 | """ |
| 442 | try: |
| 443 | if self.db: |
| 444 | self.db.db_disconnect() |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 445 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 446 | if self.msg: |
| 447 | self.msg.disconnect() |
| 448 | except (DbException, MsgException) as e: |
| 449 | raise VimAdminException(str(e), http_code=e.http_code) |
| 450 | |
| 451 | def terminate(self): |
| 452 | """ |
| 453 | This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
| 454 | but not immediately. |
| 455 | :return: None |
| 456 | """ |
| 457 | self.to_terminate = True |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 458 | self.lock_renew.to_terminate = True |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 459 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 460 | if self.aiomain_task_kafka: |
| Gulsum Atici | aba1518 | 2023-05-15 11:55:13 +0300 | [diff] [blame] | 461 | self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel()) |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 462 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 463 | if self.aiomain_task_vim: |
| Gulsum Atici | aba1518 | 2023-05-15 11:55:13 +0300 | [diff] [blame] | 464 | self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel()) |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 465 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 466 | if self.aiomain_task_renew_lock: |
| Gulsum Atici | aba1518 | 2023-05-15 11:55:13 +0300 | [diff] [blame] | 467 | self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel()) |
| sousaedu | 80135b9 | 2021-02-17 15:05:18 +0100 | [diff] [blame] | 468 | |
| tierno | f1b640f | 2020-12-09 15:06:01 +0000 | [diff] [blame] | 469 | self.lock_renew.stop() |