| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 12 | # implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | """ |
| 17 | This module implements a thread that reads from kafka bus reading VIM messages. |
| 18 | It is based on asyncio. |
| 19 | It is in charge of load tasks assigned to VIMs that nobody is in chage of it |
| 20 | """ |
| 21 | |
| 22 | import logging |
| 23 | import threading |
| 24 | import asyncio |
| 25 | from http import HTTPStatus |
| 26 | |
| 27 | from osm_common import dbmongo, dbmemory, msglocal, msgkafka |
| 28 | from osm_common.dbbase import DbException |
| 29 | from osm_common.msgbase import MsgException |
| 30 | from osm_ng_ro.ns import NsException |
| 31 | from time import time |
| 32 | |
| 33 | __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
| 34 | |
| 35 | |
| 36 | class VimAdminException(Exception): |
| 37 | |
| 38 | def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
| 39 | self.http_code = http_code |
| 40 | Exception.__init__(self, message) |
| 41 | |
| 42 | |
| 43 | class VimAdminThread(threading.Thread): |
| 44 | MAX_TIME_LOCKED = 3600 # 1h |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 45 | MAX_TIME_UNATTENDED = 600 # 10min |
| 46 | TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 47 | kafka_topics = ("vim_account", "wim_account", "sdn") |
| 48 | |
| 49 | def __init__(self, config, engine): |
| 50 | """ |
| 51 | Constructor of class |
| 52 | :param config: configuration parameters of database and messaging |
| 53 | :param engine: an instance of Engine class, used for deleting instances |
| 54 | """ |
| 55 | threading.Thread.__init__(self) |
| 56 | self.to_terminate = False |
| 57 | self.config = config |
| 58 | self.db = None |
| 59 | self.msg = None |
| 60 | self.engine = engine |
| 61 | self.loop = None |
| 62 | self.last_rotask_time = 0 |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 63 | self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 64 | self.logger = logging.getLogger("ro.vimadmin") |
| 65 | self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus |
| 66 | self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody |
| 67 | |
| 68 | async def vim_watcher(self): |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 69 | """ Reads database periodically looking for tasks not processed by nobody because of a reboot |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 70 | in order to load this vim""" |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 71 | # firstly read VIMS not processed |
| 72 | for target_database in ("vim_accounts", "wim_accounts", "sdns"): |
| 73 | unattended_targets = self.db.get_list(target_database, |
| 74 | q_filter={"_admin.operations.operationState": "PROCESSING"}) |
| 75 | for target in unattended_targets: |
| 76 | target_id = "{}:{}".format(target_database[:3], target["_id"]) |
| 77 | self.logger.info("ordered to check {}".format(target_id)) |
| 78 | self.engine.check_vim(target_id) |
| 79 | |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 80 | while not self.to_terminate: |
| 81 | now = time() |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 82 | processed_vims = [] |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 83 | if not self.last_rotask_time: |
| 84 | self.last_rotask_time = 0 |
| 85 | ro_tasks = self.db.get_list("ro_tasks", |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 86 | q_filter={"target_id.ncont": self.engine.get_assigned_vims(), |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 87 | "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], |
| 88 | "locked_at.lt": now - self.MAX_TIME_LOCKED, |
| 89 | "to_check_at.gt": self.last_rotask_time, |
| 90 | "to_check_at.lte": now - self.MAX_TIME_UNATTENDED}) |
| 91 | self.last_rotask_time = now - self.MAX_TIME_UNATTENDED |
| 92 | for ro_task in ro_tasks: |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 93 | # if already checked ignore |
| 94 | if ro_task["target_id"] in processed_vims: |
| 95 | continue |
| 96 | processed_vims.append(ro_task["target_id"]) |
| 97 | # if already assigned ignore |
| 98 | if ro_task["target_id"] in self.engine.get_assigned_vims(): |
| 99 | continue |
| 100 | # if there is some task locked on this VIM, there is an RO working on it, so ignore |
| 101 | if self.db.get_list("ro_tasks", |
| 102 | q_filter={"target_id": ro_task["target_id"], |
| 103 | "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], |
| 104 | "locked_at.gt": now - self.MAX_TIME_LOCKED}): |
| 105 | continue |
| 106 | # unattended, assign vim |
| 107 | self.engine.assign_vim(ro_task["target_id"]) |
| 108 | self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"])) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 109 | |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 110 | # every 2 hours check if there are vims without any ro_task and unload it |
| 111 | if now > self.next_check_unused_vim: |
| 112 | self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM |
| 113 | self.engine.unload_unused_vims() |
| 114 | await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop) |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 115 | |
| 116 | async def aiomain(self): |
| 117 | kafka_working = True |
| 118 | while not self.to_terminate: |
| 119 | try: |
| 120 | if not self.aiomain_task_kafka: |
| 121 | # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop) |
| 122 | await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop) |
| 123 | kafka_working = True |
| 124 | self.logger.debug("Starting vim_account subscription task") |
| 125 | self.aiomain_task_kafka = asyncio.ensure_future( |
| 126 | self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False, |
| 127 | aiocallback=self._msg_callback), |
| 128 | loop=self.loop) |
| 129 | if not self.aiomain_task_vim: |
| 130 | self.aiomain_task_vim = asyncio.ensure_future( |
| 131 | self.vim_watcher(), |
| 132 | loop=self.loop) |
| 133 | done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim], |
| 134 | timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED) |
| 135 | try: |
| 136 | if self.aiomain_task_kafka in done: |
| 137 | exc = self.aiomain_task_kafka.exception() |
| 138 | self.logger.error("kafka subscription task exception: {}".format(exc)) |
| 139 | self.aiomain_task_kafka = None |
| 140 | if self.aiomain_task_vim in done: |
| 141 | exc = self.aiomain_task_vim.exception() |
| 142 | self.logger.error("vim_account watcher task exception: {}".format(exc)) |
| 143 | self.aiomain_task_vim = None |
| 144 | except asyncio.CancelledError: |
| 145 | pass |
| 146 | |
| 147 | except Exception as e: |
| 148 | if self.to_terminate: |
| 149 | return |
| 150 | if kafka_working: |
| 151 | # logging only first time |
| 152 | self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e)) |
| 153 | kafka_working = False |
| 154 | await asyncio.sleep(10, loop=self.loop) |
| 155 | |
| 156 | def run(self): |
| 157 | """ |
| 158 | Start of the thread |
| 159 | :return: None |
| 160 | """ |
| 161 | self.loop = asyncio.new_event_loop() |
| 162 | try: |
| 163 | if not self.db: |
| 164 | if self.config["database"]["driver"] == "mongo": |
| 165 | self.db = dbmongo.DbMongo() |
| 166 | self.db.db_connect(self.config["database"]) |
| 167 | elif self.config["database"]["driver"] == "memory": |
| 168 | self.db = dbmemory.DbMemory() |
| 169 | self.db.db_connect(self.config["database"]) |
| 170 | else: |
| 171 | raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format( |
| 172 | self.config["database"]["driver"])) |
| 173 | if not self.msg: |
| 174 | config_msg = self.config["message"].copy() |
| 175 | config_msg["loop"] = self.loop |
| 176 | if config_msg["driver"] == "local": |
| 177 | self.msg = msglocal.MsgLocal() |
| 178 | self.msg.connect(config_msg) |
| 179 | elif config_msg["driver"] == "kafka": |
| 180 | self.msg = msgkafka.MsgKafka() |
| 181 | self.msg.connect(config_msg) |
| 182 | else: |
| 183 | raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format( |
| 184 | config_msg["driver"])) |
| 185 | except (DbException, MsgException) as e: |
| 186 | raise VimAdminException(str(e), http_code=e.http_code) |
| 187 | |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 188 | self.logger.info("Starting") |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 189 | while not self.to_terminate: |
| 190 | try: |
| 191 | self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop)) |
| 192 | # except asyncio.CancelledError: |
| 193 | # break # if cancelled it should end, breaking loop |
| 194 | except Exception as e: |
| 195 | if not self.to_terminate: |
| 196 | self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True) |
| 197 | |
| tierno | bc891ce | 2020-12-06 18:27:16 +0000 | [diff] [blame^] | 198 | self.logger.info("Finishing") |
| tierno | 70eeb18 | 2020-10-19 16:38:00 +0000 | [diff] [blame] | 199 | self._stop() |
| 200 | self.loop.close() |
| 201 | |
| 202 | async def _msg_callback(self, topic, command, params): |
| 203 | """ |
| 204 | Callback to process a received message from kafka |
| 205 | :param topic: topic received |
| 206 | :param command: command received |
| 207 | :param params: rest of parameters |
| 208 | :return: None |
| 209 | """ |
| 210 | try: |
| 211 | if command == "echo": |
| 212 | return |
| 213 | if topic in self.kafka_topics: |
| 214 | target = topic[0:3] # vim, wim or sdn |
| 215 | target_id = target + ":" + params["_id"] |
| 216 | if command in ("edited", "edit"): |
| 217 | self.engine.reload_vim(target_id) |
| 218 | self.logger.debug("ordered to reload {}".format(target_id)) |
| 219 | elif command in ("deleted", "delete"): |
| 220 | self.engine.unload_vim(target_id) |
| 221 | self.logger.debug("ordered to unload {}".format(target_id)) |
| 222 | elif command in ("create", "created"): |
| 223 | self.engine.check_vim(target_id) |
| 224 | self.logger.debug("ordered to check {}".format(target_id)) |
| 225 | |
| 226 | except (NsException, DbException, MsgException) as e: |
| 227 | self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e)) |
| 228 | except Exception as e: |
| 229 | self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e), |
| 230 | exc_info=True) |
| 231 | |
| 232 | def _stop(self): |
| 233 | """ |
| 234 | Close all connections |
| 235 | :return: None |
| 236 | """ |
| 237 | try: |
| 238 | if self.db: |
| 239 | self.db.db_disconnect() |
| 240 | if self.msg: |
| 241 | self.msg.disconnect() |
| 242 | except (DbException, MsgException) as e: |
| 243 | raise VimAdminException(str(e), http_code=e.http_code) |
| 244 | |
| 245 | def terminate(self): |
| 246 | """ |
| 247 | This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
| 248 | but not immediately. |
| 249 | :return: None |
| 250 | """ |
| 251 | self.to_terminate = True |
| 252 | if self.aiomain_task_kafka: |
| 253 | self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel) |
| 254 | if self.aiomain_task_vim: |
| 255 | self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel) |