blob: 21e62817321b68f6ebb1b6859199b8e360f9c1b0 [file] [log] [blame]
tierno70eeb182020-10-19 16:38:00 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus reading VIM messages.
18It is based on asyncio.
19It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20"""
21
tierno70eeb182020-10-19 16:38:00 +000022import asyncio
23from http import HTTPStatus
sousaedu049cbb12022-01-05 11:39:35 +000024import logging
25import threading
26from time import time
tierno70eeb182020-10-19 16:38:00 +000027
sousaedu049cbb12022-01-05 11:39:35 +000028from osm_common import dbmemory, dbmongo, msgkafka, msglocal
tierno70eeb182020-10-19 16:38:00 +000029from osm_common.dbbase import DbException
30from osm_common.msgbase import MsgException
tierno70eeb182020-10-19 16:38:00 +000031
32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35class VimAdminException(Exception):
tierno70eeb182020-10-19 16:38:00 +000036 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
37 self.http_code = http_code
38 Exception.__init__(self, message)
39
40
tiernof1b640f2020-12-09 15:06:01 +000041class LockRenew:
tiernof1b640f2020-12-09 15:06:01 +000042 renew_list = []
43 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
44 # be renewed. The time order is achieved as it is appended at the end
45
46 def __init__(self, config, logger):
47 """
48 Constructor of class
49 :param config: configuration parameters of database and messaging
50 """
51 self.config = config
52 self.logger = logger
53 self.to_terminate = False
tiernof1b640f2020-12-09 15:06:01 +000054 self.db = None
55 self.task_locked_time = config["global"]["task_locked_time"]
56 self.task_relock_time = config["global"]["task_relock_time"]
57 self.task_max_locked_time = config["global"]["task_max_locked_time"]
58
Gulsum Aticia264b7a2023-05-09 14:57:22 +030059 def start(self, db):
tiernof1b640f2020-12-09 15:06:01 +000060 self.db = db
tiernof1b640f2020-12-09 15:06:01 +000061
62 @staticmethod
63 def add_lock_object(database_table, database_object, thread_object):
64 """
65 Insert a task to renew the locking
66 :param database_table: database collection where to maintain the lock
67 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
68 :param thread_object: Thread object that has locked to check if it is alive
69 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
70 """
71 lock_object = {
72 "table": database_table,
73 "_id": database_object["_id"],
74 "initial_lock_time": database_object["locked_at"],
75 "locked_at": database_object["locked_at"],
76 "thread": thread_object,
sousaedu80135b92021-02-17 15:05:18 +010077 "unlocked": False, # True when it is not needed any more
tiernof1b640f2020-12-09 15:06:01 +000078 }
79 LockRenew.renew_list.append(lock_object)
sousaedu80135b92021-02-17 15:05:18 +010080
tiernof1b640f2020-12-09 15:06:01 +000081 return lock_object
82
83 @staticmethod
84 def remove_lock_object(lock_object):
85 lock_object["unlocked"] = True
86
87 async def renew_locks(self):
88 while not self.to_terminate:
89 if not self.renew_list:
Gulsum Aticia264b7a2023-05-09 14:57:22 +030090 await asyncio.sleep(self.task_locked_time - self.task_relock_time)
tiernof1b640f2020-12-09 15:06:01 +000091 continue
sousaedu80135b92021-02-17 15:05:18 +010092
tiernof1b640f2020-12-09 15:06:01 +000093 lock_object = self.renew_list[0]
sousaedu80135b92021-02-17 15:05:18 +010094
95 if (
96 lock_object["unlocked"]
97 or not lock_object["thread"]
98 or not lock_object["thread"].is_alive()
99 ):
tiernof1b640f2020-12-09 15:06:01 +0000100 # task has been finished or locker thread is dead, not needed to re-locked.
101 self.renew_list.pop(0)
102 continue
103
104 locked_at = lock_object["locked_at"]
105 now = time()
sousaedu80135b92021-02-17 15:05:18 +0100106 time_to_relock = (
107 locked_at + self.task_locked_time - self.task_relock_time - now
108 )
109
tiernof1b640f2020-12-09 15:06:01 +0000110 if time_to_relock < 1:
111 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
112 self.renew_list.pop(0)
113 # re-lock
114 new_locked_at = locked_at + self.task_locked_time
sousaedu80135b92021-02-17 15:05:18 +0100115
tiernof1b640f2020-12-09 15:06:01 +0000116 try:
sousaedu80135b92021-02-17 15:05:18 +0100117 if self.db.set_one(
118 lock_object["table"],
119 update_dict={
120 "locked_at": new_locked_at,
121 "modified_at": now,
122 },
123 q_filter={
124 "_id": lock_object["_id"],
125 "locked_at": locked_at,
126 },
127 fail_on_empty=False,
128 ):
129 self.logger.debug(
130 "Renew lock for {}.{}".format(
131 lock_object["table"], lock_object["_id"]
132 )
133 )
tiernof1b640f2020-12-09 15:06:01 +0000134 lock_object["locked_at"] = new_locked_at
135 self.renew_list.append(lock_object)
136 else:
sousaedu80135b92021-02-17 15:05:18 +0100137 self.logger.info(
138 "Cannot renew lock for {}.{}".format(
139 lock_object["table"], lock_object["_id"]
140 )
141 )
tiernof1b640f2020-12-09 15:06:01 +0000142 except Exception as e:
sousaedu80135b92021-02-17 15:05:18 +0100143 self.logger.error(
144 "Exception when trying to renew lock for {}.{}: {}".format(
145 lock_object["table"], lock_object["_id"], e
146 )
147 )
tiernof1b640f2020-12-09 15:06:01 +0000148 else:
149 # wait until it is time to re-lock it
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300150 await asyncio.sleep(time_to_relock)
tiernof1b640f2020-12-09 15:06:01 +0000151
152 def stop(self):
153 # unlock all locked items
154 now = time()
sousaedu80135b92021-02-17 15:05:18 +0100155
tiernof1b640f2020-12-09 15:06:01 +0000156 for lock_object in self.renew_list:
157 locked_at = lock_object["locked_at"]
sousaedu80135b92021-02-17 15:05:18 +0100158
tiernof1b640f2020-12-09 15:06:01 +0000159 if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
sousaedu80135b92021-02-17 15:05:18 +0100160 self.db.set_one(
161 lock_object["table"],
162 update_dict={"locked_at": 0},
163 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
164 fail_on_empty=False,
165 )
tiernof1b640f2020-12-09 15:06:01 +0000166
167
tierno70eeb182020-10-19 16:38:00 +0000168class VimAdminThread(threading.Thread):
tierno86153522020-12-06 18:27:16 +0000169 MAX_TIME_UNATTENDED = 600 # 10min
170 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
tierno70eeb182020-10-19 16:38:00 +0000171 kafka_topics = ("vim_account", "wim_account", "sdn")
172
173 def __init__(self, config, engine):
174 """
175 Constructor of class
176 :param config: configuration parameters of database and messaging
177 :param engine: an instance of Engine class, used for deleting instances
178 """
179 threading.Thread.__init__(self)
180 self.to_terminate = False
181 self.config = config
182 self.db = None
183 self.msg = None
184 self.engine = engine
185 self.loop = None
186 self.last_rotask_time = 0
tierno86153522020-12-06 18:27:16 +0000187 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
tierno70eeb182020-10-19 16:38:00 +0000188 self.logger = logging.getLogger("ro.vimadmin")
sousaedu80135b92021-02-17 15:05:18 +0100189 # asyncio task for receiving vim actions from kafka bus
190 self.aiomain_task_kafka = None
191 # asyncio task for watching ro_tasks not processed by nobody
192 self.aiomain_task_vim = None
tiernof1b640f2020-12-09 15:06:01 +0000193 self.aiomain_task_renew_lock = None
194 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
195 self.lock_renew = LockRenew(config, self.logger)
196 self.task_locked_time = config["global"]["task_locked_time"]
tierno70eeb182020-10-19 16:38:00 +0000197
198 async def vim_watcher(self):
sousaedu80135b92021-02-17 15:05:18 +0100199 """Reads database periodically looking for tasks not processed by nobody because of a reboot
tierno70eeb182020-10-19 16:38:00 +0000200 in order to load this vim"""
tierno86153522020-12-06 18:27:16 +0000201 # firstly read VIMS not processed
202 for target_database in ("vim_accounts", "wim_accounts", "sdns"):
sousaedu80135b92021-02-17 15:05:18 +0100203 unattended_targets = self.db.get_list(
204 target_database,
205 q_filter={"_admin.operations.operationState": "PROCESSING"},
206 )
207
tierno86153522020-12-06 18:27:16 +0000208 for target in unattended_targets:
209 target_id = "{}:{}".format(target_database[:3], target["_id"])
210 self.logger.info("ordered to check {}".format(target_id))
211 self.engine.check_vim(target_id)
212
tierno70eeb182020-10-19 16:38:00 +0000213 while not self.to_terminate:
214 now = time()
tierno86153522020-12-06 18:27:16 +0000215 processed_vims = []
sousaedu80135b92021-02-17 15:05:18 +0100216
tierno70eeb182020-10-19 16:38:00 +0000217 if not self.last_rotask_time:
218 self.last_rotask_time = 0
sousaedu80135b92021-02-17 15:05:18 +0100219
220 ro_tasks = self.db.get_list(
221 "ro_tasks",
222 q_filter={
223 "target_id.ncont": self.engine.get_assigned_vims(),
224 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
225 "locked_at.lt": now - self.task_locked_time,
226 "to_check_at.gt": self.last_rotask_time,
227 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
228 },
229 )
tierno70eeb182020-10-19 16:38:00 +0000230 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
sousaedu80135b92021-02-17 15:05:18 +0100231
tierno70eeb182020-10-19 16:38:00 +0000232 for ro_task in ro_tasks:
tierno86153522020-12-06 18:27:16 +0000233 # if already checked ignore
234 if ro_task["target_id"] in processed_vims:
235 continue
sousaedu80135b92021-02-17 15:05:18 +0100236
tierno86153522020-12-06 18:27:16 +0000237 processed_vims.append(ro_task["target_id"])
sousaedu80135b92021-02-17 15:05:18 +0100238
tierno86153522020-12-06 18:27:16 +0000239 # if already assigned ignore
240 if ro_task["target_id"] in self.engine.get_assigned_vims():
241 continue
sousaedu80135b92021-02-17 15:05:18 +0100242
tierno86153522020-12-06 18:27:16 +0000243 # if there is some task locked on this VIM, there is an RO working on it, so ignore
sousaedu80135b92021-02-17 15:05:18 +0100244 if self.db.get_list(
245 "ro_tasks",
246 q_filter={
247 "target_id": ro_task["target_id"],
248 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
249 "locked_at.gt": now - self.task_locked_time,
250 },
251 ):
tierno86153522020-12-06 18:27:16 +0000252 continue
sousaedu80135b92021-02-17 15:05:18 +0100253
tierno86153522020-12-06 18:27:16 +0000254 # unattended, assign vim
255 self.engine.assign_vim(ro_task["target_id"])
sousaedu80135b92021-02-17 15:05:18 +0100256 self.logger.debug(
257 "ordered to load {}. Inactivity detected".format(
258 ro_task["target_id"]
259 )
260 )
tierno70eeb182020-10-19 16:38:00 +0000261
tierno86153522020-12-06 18:27:16 +0000262 # every 2 hours check if there are vims without any ro_task and unload it
263 if now > self.next_check_unused_vim:
264 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
265 self.engine.unload_unused_vims()
sousaedu80135b92021-02-17 15:05:18 +0100266
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300267 await asyncio.sleep(self.MAX_TIME_UNATTENDED)
tierno70eeb182020-10-19 16:38:00 +0000268
269 async def aiomain(self):
270 kafka_working = True
271 while not self.to_terminate:
272 try:
273 if not self.aiomain_task_kafka:
garciadeblasdc5ab152021-05-20 13:08:05 +0200274 for kafka_topic in self.kafka_topics:
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300275 await self.msg.aiowrite(kafka_topic, "echo", "dummy message")
tierno70eeb182020-10-19 16:38:00 +0000276 kafka_working = True
277 self.logger.debug("Starting vim_account subscription task")
278 self.aiomain_task_kafka = asyncio.ensure_future(
sousaedu80135b92021-02-17 15:05:18 +0100279 self.msg.aioread(
280 self.kafka_topics,
sousaedu80135b92021-02-17 15:05:18 +0100281 group_id=False,
282 aiocallback=self._msg_callback,
283 ),
sousaedu80135b92021-02-17 15:05:18 +0100284 )
285
tierno70eeb182020-10-19 16:38:00 +0000286 if not self.aiomain_task_vim:
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300287 self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher())
sousaedu80135b92021-02-17 15:05:18 +0100288
tiernof1b640f2020-12-09 15:06:01 +0000289 if not self.aiomain_task_renew_lock:
sousaedu80135b92021-02-17 15:05:18 +0100290 self.aiomain_task_renew_lock = asyncio.ensure_future(
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300291 self.lock_renew.renew_locks()
sousaedu80135b92021-02-17 15:05:18 +0100292 )
tiernof1b640f2020-12-09 15:06:01 +0000293
294 done, _ = await asyncio.wait(
sousaedu80135b92021-02-17 15:05:18 +0100295 [
296 self.aiomain_task_kafka,
297 self.aiomain_task_vim,
298 self.aiomain_task_renew_lock,
299 ],
300 timeout=None,
sousaedu80135b92021-02-17 15:05:18 +0100301 return_when=asyncio.FIRST_COMPLETED,
302 )
303
tierno70eeb182020-10-19 16:38:00 +0000304 try:
305 if self.aiomain_task_kafka in done:
306 exc = self.aiomain_task_kafka.exception()
sousaedu80135b92021-02-17 15:05:18 +0100307 self.logger.error(
308 "kafka subscription task exception: {}".format(exc)
309 )
tierno70eeb182020-10-19 16:38:00 +0000310 self.aiomain_task_kafka = None
sousaedu80135b92021-02-17 15:05:18 +0100311
tierno70eeb182020-10-19 16:38:00 +0000312 if self.aiomain_task_vim in done:
313 exc = self.aiomain_task_vim.exception()
sousaedu80135b92021-02-17 15:05:18 +0100314 self.logger.error(
315 "vim_account watcher task exception: {}".format(exc)
316 )
tierno70eeb182020-10-19 16:38:00 +0000317 self.aiomain_task_vim = None
sousaedu80135b92021-02-17 15:05:18 +0100318
tiernof1b640f2020-12-09 15:06:01 +0000319 if self.aiomain_task_renew_lock in done:
320 exc = self.aiomain_task_renew_lock.exception()
321 self.logger.error("renew_locks task exception: {}".format(exc))
322 self.aiomain_task_renew_lock = None
tierno70eeb182020-10-19 16:38:00 +0000323 except asyncio.CancelledError:
aticig7b521f72022-07-15 00:43:09 +0300324 self.logger.exception("asyncio.CancelledError occured.")
tierno70eeb182020-10-19 16:38:00 +0000325
326 except Exception as e:
327 if self.to_terminate:
328 return
sousaedu80135b92021-02-17 15:05:18 +0100329
tierno70eeb182020-10-19 16:38:00 +0000330 if kafka_working:
331 # logging only first time
sousaedu80135b92021-02-17 15:05:18 +0100332 self.logger.critical(
333 "Error accessing kafka '{}'. Retrying ...".format(e)
334 )
tierno70eeb182020-10-19 16:38:00 +0000335 kafka_working = False
sousaedu80135b92021-02-17 15:05:18 +0100336
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300337 await asyncio.sleep(10)
tierno70eeb182020-10-19 16:38:00 +0000338
339 def run(self):
340 """
341 Start of the thread
342 :return: None
343 """
344 self.loop = asyncio.new_event_loop()
345 try:
346 if not self.db:
347 if self.config["database"]["driver"] == "mongo":
348 self.db = dbmongo.DbMongo()
349 self.db.db_connect(self.config["database"])
350 elif self.config["database"]["driver"] == "memory":
351 self.db = dbmemory.DbMemory()
352 self.db.db_connect(self.config["database"])
353 else:
sousaedu80135b92021-02-17 15:05:18 +0100354 raise VimAdminException(
355 "Invalid configuration param '{}' at '[database]':'driver'".format(
356 self.config["database"]["driver"]
357 )
358 )
359
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300360 self.lock_renew.start(self.db)
tiernof1b640f2020-12-09 15:06:01 +0000361
tierno70eeb182020-10-19 16:38:00 +0000362 if not self.msg:
363 config_msg = self.config["message"].copy()
sousaedu80135b92021-02-17 15:05:18 +0100364
tierno70eeb182020-10-19 16:38:00 +0000365 if config_msg["driver"] == "local":
366 self.msg = msglocal.MsgLocal()
367 self.msg.connect(config_msg)
368 elif config_msg["driver"] == "kafka":
369 self.msg = msgkafka.MsgKafka()
370 self.msg.connect(config_msg)
371 else:
sousaedu80135b92021-02-17 15:05:18 +0100372 raise VimAdminException(
373 "Invalid configuration param '{}' at '[message]':'driver'".format(
374 config_msg["driver"]
375 )
376 )
tierno70eeb182020-10-19 16:38:00 +0000377 except (DbException, MsgException) as e:
378 raise VimAdminException(str(e), http_code=e.http_code)
379
tierno86153522020-12-06 18:27:16 +0000380 self.logger.info("Starting")
tierno70eeb182020-10-19 16:38:00 +0000381 while not self.to_terminate:
382 try:
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300383 asyncio.run_coroutine_threadsafe(self.main_task(), self.loop)
tierno70eeb182020-10-19 16:38:00 +0000384 except Exception as e:
385 if not self.to_terminate:
sousaedu80135b92021-02-17 15:05:18 +0100386 self.logger.exception(
387 "Exception '{}' at messaging read loop".format(e), exc_info=True
388 )
tierno70eeb182020-10-19 16:38:00 +0000389
tierno86153522020-12-06 18:27:16 +0000390 self.logger.info("Finishing")
tierno70eeb182020-10-19 16:38:00 +0000391 self._stop()
392 self.loop.close()
393
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300394 async def main_task(self):
395 task = asyncio.ensure_future(self.aiomain())
396 await task
397
tierno70eeb182020-10-19 16:38:00 +0000398 async def _msg_callback(self, topic, command, params):
399 """
400 Callback to process a received message from kafka
401 :param topic: topic received
402 :param command: command received
403 :param params: rest of parameters
404 :return: None
405 """
406 try:
407 if command == "echo":
408 return
sousaedu80135b92021-02-17 15:05:18 +0100409
tierno70eeb182020-10-19 16:38:00 +0000410 if topic in self.kafka_topics:
sousaedu80135b92021-02-17 15:05:18 +0100411 target = topic[0:3] # vim, wim or sdn
tierno70eeb182020-10-19 16:38:00 +0000412 target_id = target + ":" + params["_id"]
sousaedu80135b92021-02-17 15:05:18 +0100413
tierno70eeb182020-10-19 16:38:00 +0000414 if command in ("edited", "edit"):
415 self.engine.reload_vim(target_id)
416 self.logger.debug("ordered to reload {}".format(target_id))
417 elif command in ("deleted", "delete"):
418 self.engine.unload_vim(target_id)
419 self.logger.debug("ordered to unload {}".format(target_id))
420 elif command in ("create", "created"):
421 self.engine.check_vim(target_id)
422 self.logger.debug("ordered to check {}".format(target_id))
tiernof1b640f2020-12-09 15:06:01 +0000423 except (DbException, MsgException) as e:
sousaedu80135b92021-02-17 15:05:18 +0100424 self.logger.error(
425 "Error while processing topic={} command={}: {}".format(
426 topic, command, e
427 )
428 )
tierno70eeb182020-10-19 16:38:00 +0000429 except Exception as e:
sousaedu80135b92021-02-17 15:05:18 +0100430 self.logger.exception(
431 "Exception while processing topic={} command={}: {}".format(
432 topic, command, e
433 ),
434 exc_info=True,
435 )
tierno70eeb182020-10-19 16:38:00 +0000436
437 def _stop(self):
438 """
439 Close all connections
440 :return: None
441 """
442 try:
443 if self.db:
444 self.db.db_disconnect()
sousaedu80135b92021-02-17 15:05:18 +0100445
tierno70eeb182020-10-19 16:38:00 +0000446 if self.msg:
447 self.msg.disconnect()
448 except (DbException, MsgException) as e:
449 raise VimAdminException(str(e), http_code=e.http_code)
450
451 def terminate(self):
452 """
453 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
454 but not immediately.
455 :return: None
456 """
457 self.to_terminate = True
tiernof1b640f2020-12-09 15:06:01 +0000458 self.lock_renew.to_terminate = True
sousaedu80135b92021-02-17 15:05:18 +0100459
tierno70eeb182020-10-19 16:38:00 +0000460 if self.aiomain_task_kafka:
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300461 self.aiomain_task_kafka.cancel()
sousaedu80135b92021-02-17 15:05:18 +0100462
tierno70eeb182020-10-19 16:38:00 +0000463 if self.aiomain_task_vim:
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300464 self.aiomain_task_vim.cancel()
sousaedu80135b92021-02-17 15:05:18 +0100465
tiernof1b640f2020-12-09 15:06:01 +0000466 if self.aiomain_task_renew_lock:
Gulsum Aticia264b7a2023-05-09 14:57:22 +0300467 self.aiomain_task_renew_lock.cancel()
sousaedu80135b92021-02-17 15:05:18 +0100468
tiernof1b640f2020-12-09 15:06:01 +0000469 self.lock_renew.stop()