blob: b6e34e111656cc9b5932a1f5efc744b7b525db55 [file] [log] [blame]
tierno70eeb182020-10-19 16:38:00 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus reading VIM messages.
18It is based on asyncio.
19It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20"""
21
tierno70eeb182020-10-19 16:38:00 +000022import asyncio
23from http import HTTPStatus
sousaedu049cbb12022-01-05 11:39:35 +000024import logging
25import threading
26from time import time
tierno70eeb182020-10-19 16:38:00 +000027
sousaedu049cbb12022-01-05 11:39:35 +000028from osm_common import dbmemory, dbmongo, msgkafka, msglocal
tierno70eeb182020-10-19 16:38:00 +000029from osm_common.dbbase import DbException
30from osm_common.msgbase import MsgException
tierno70eeb182020-10-19 16:38:00 +000031
32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35class VimAdminException(Exception):
tierno70eeb182020-10-19 16:38:00 +000036 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
37 self.http_code = http_code
38 Exception.__init__(self, message)
39
40
tiernof1b640f2020-12-09 15:06:01 +000041class LockRenew:
tiernof1b640f2020-12-09 15:06:01 +000042 renew_list = []
43 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
44 # be renewed. The time order is achieved as it is appended at the end
45
46 def __init__(self, config, logger):
47 """
48 Constructor of class
49 :param config: configuration parameters of database and messaging
50 """
51 self.config = config
52 self.logger = logger
53 self.to_terminate = False
54 self.loop = None
55 self.db = None
56 self.task_locked_time = config["global"]["task_locked_time"]
57 self.task_relock_time = config["global"]["task_relock_time"]
58 self.task_max_locked_time = config["global"]["task_max_locked_time"]
59
60 def start(self, db, loop):
61 self.db = db
62 self.loop = loop
63
64 @staticmethod
65 def add_lock_object(database_table, database_object, thread_object):
66 """
67 Insert a task to renew the locking
68 :param database_table: database collection where to maintain the lock
69 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
70 :param thread_object: Thread object that has locked to check if it is alive
71 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
72 """
73 lock_object = {
74 "table": database_table,
75 "_id": database_object["_id"],
76 "initial_lock_time": database_object["locked_at"],
77 "locked_at": database_object["locked_at"],
78 "thread": thread_object,
sousaedu80135b92021-02-17 15:05:18 +010079 "unlocked": False, # True when it is not needed any more
tiernof1b640f2020-12-09 15:06:01 +000080 }
81 LockRenew.renew_list.append(lock_object)
sousaedu80135b92021-02-17 15:05:18 +010082
tiernof1b640f2020-12-09 15:06:01 +000083 return lock_object
84
85 @staticmethod
86 def remove_lock_object(lock_object):
87 lock_object["unlocked"] = True
88
89 async def renew_locks(self):
90 while not self.to_terminate:
91 if not self.renew_list:
sousaedu80135b92021-02-17 15:05:18 +010092 await asyncio.sleep(
93 self.task_locked_time - self.task_relock_time, loop=self.loop
94 )
tiernof1b640f2020-12-09 15:06:01 +000095 continue
sousaedu80135b92021-02-17 15:05:18 +010096
tiernof1b640f2020-12-09 15:06:01 +000097 lock_object = self.renew_list[0]
sousaedu80135b92021-02-17 15:05:18 +010098
99 if (
100 lock_object["unlocked"]
101 or not lock_object["thread"]
102 or not lock_object["thread"].is_alive()
103 ):
tiernof1b640f2020-12-09 15:06:01 +0000104 # task has been finished or locker thread is dead, not needed to re-locked.
105 self.renew_list.pop(0)
106 continue
107
108 locked_at = lock_object["locked_at"]
109 now = time()
sousaedu80135b92021-02-17 15:05:18 +0100110 time_to_relock = (
111 locked_at + self.task_locked_time - self.task_relock_time - now
112 )
113
tiernof1b640f2020-12-09 15:06:01 +0000114 if time_to_relock < 1:
115 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
116 self.renew_list.pop(0)
117 # re-lock
118 new_locked_at = locked_at + self.task_locked_time
sousaedu80135b92021-02-17 15:05:18 +0100119
tiernof1b640f2020-12-09 15:06:01 +0000120 try:
sousaedu80135b92021-02-17 15:05:18 +0100121 if self.db.set_one(
122 lock_object["table"],
123 update_dict={
124 "locked_at": new_locked_at,
125 "modified_at": now,
126 },
127 q_filter={
128 "_id": lock_object["_id"],
129 "locked_at": locked_at,
130 },
131 fail_on_empty=False,
132 ):
133 self.logger.debug(
134 "Renew lock for {}.{}".format(
135 lock_object["table"], lock_object["_id"]
136 )
137 )
tiernof1b640f2020-12-09 15:06:01 +0000138 lock_object["locked_at"] = new_locked_at
139 self.renew_list.append(lock_object)
140 else:
sousaedu80135b92021-02-17 15:05:18 +0100141 self.logger.info(
142 "Cannot renew lock for {}.{}".format(
143 lock_object["table"], lock_object["_id"]
144 )
145 )
tiernof1b640f2020-12-09 15:06:01 +0000146 except Exception as e:
sousaedu80135b92021-02-17 15:05:18 +0100147 self.logger.error(
148 "Exception when trying to renew lock for {}.{}: {}".format(
149 lock_object["table"], lock_object["_id"], e
150 )
151 )
tiernof1b640f2020-12-09 15:06:01 +0000152 else:
153 # wait until it is time to re-lock it
154 await asyncio.sleep(time_to_relock, loop=self.loop)
155
156 def stop(self):
157 # unlock all locked items
158 now = time()
sousaedu80135b92021-02-17 15:05:18 +0100159
tiernof1b640f2020-12-09 15:06:01 +0000160 for lock_object in self.renew_list:
161 locked_at = lock_object["locked_at"]
sousaedu80135b92021-02-17 15:05:18 +0100162
tiernof1b640f2020-12-09 15:06:01 +0000163 if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
sousaedu80135b92021-02-17 15:05:18 +0100164 self.db.set_one(
165 lock_object["table"],
166 update_dict={"locked_at": 0},
167 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
168 fail_on_empty=False,
169 )
tiernof1b640f2020-12-09 15:06:01 +0000170
171
tierno70eeb182020-10-19 16:38:00 +0000172class VimAdminThread(threading.Thread):
tierno86153522020-12-06 18:27:16 +0000173 MAX_TIME_UNATTENDED = 600 # 10min
174 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
tierno70eeb182020-10-19 16:38:00 +0000175 kafka_topics = ("vim_account", "wim_account", "sdn")
176
177 def __init__(self, config, engine):
178 """
179 Constructor of class
180 :param config: configuration parameters of database and messaging
181 :param engine: an instance of Engine class, used for deleting instances
182 """
183 threading.Thread.__init__(self)
184 self.to_terminate = False
185 self.config = config
186 self.db = None
187 self.msg = None
188 self.engine = engine
189 self.loop = None
190 self.last_rotask_time = 0
tierno86153522020-12-06 18:27:16 +0000191 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
tierno70eeb182020-10-19 16:38:00 +0000192 self.logger = logging.getLogger("ro.vimadmin")
sousaedu80135b92021-02-17 15:05:18 +0100193 # asyncio task for receiving vim actions from kafka bus
194 self.aiomain_task_kafka = None
195 # asyncio task for watching ro_tasks not processed by nobody
196 self.aiomain_task_vim = None
tiernof1b640f2020-12-09 15:06:01 +0000197 self.aiomain_task_renew_lock = None
198 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
199 self.lock_renew = LockRenew(config, self.logger)
200 self.task_locked_time = config["global"]["task_locked_time"]
tierno70eeb182020-10-19 16:38:00 +0000201
202 async def vim_watcher(self):
sousaedu80135b92021-02-17 15:05:18 +0100203 """Reads database periodically looking for tasks not processed by nobody because of a reboot
tierno70eeb182020-10-19 16:38:00 +0000204 in order to load this vim"""
tierno86153522020-12-06 18:27:16 +0000205 # firstly read VIMS not processed
206 for target_database in ("vim_accounts", "wim_accounts", "sdns"):
sousaedu80135b92021-02-17 15:05:18 +0100207 unattended_targets = self.db.get_list(
208 target_database,
209 q_filter={"_admin.operations.operationState": "PROCESSING"},
210 )
211
tierno86153522020-12-06 18:27:16 +0000212 for target in unattended_targets:
213 target_id = "{}:{}".format(target_database[:3], target["_id"])
214 self.logger.info("ordered to check {}".format(target_id))
215 self.engine.check_vim(target_id)
216
tierno70eeb182020-10-19 16:38:00 +0000217 while not self.to_terminate:
218 now = time()
tierno86153522020-12-06 18:27:16 +0000219 processed_vims = []
sousaedu80135b92021-02-17 15:05:18 +0100220
tierno70eeb182020-10-19 16:38:00 +0000221 if not self.last_rotask_time:
222 self.last_rotask_time = 0
sousaedu80135b92021-02-17 15:05:18 +0100223
224 ro_tasks = self.db.get_list(
225 "ro_tasks",
226 q_filter={
227 "target_id.ncont": self.engine.get_assigned_vims(),
228 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
229 "locked_at.lt": now - self.task_locked_time,
230 "to_check_at.gt": self.last_rotask_time,
231 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
232 },
233 )
tierno70eeb182020-10-19 16:38:00 +0000234 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
sousaedu80135b92021-02-17 15:05:18 +0100235
tierno70eeb182020-10-19 16:38:00 +0000236 for ro_task in ro_tasks:
tierno86153522020-12-06 18:27:16 +0000237 # if already checked ignore
238 if ro_task["target_id"] in processed_vims:
239 continue
sousaedu80135b92021-02-17 15:05:18 +0100240
tierno86153522020-12-06 18:27:16 +0000241 processed_vims.append(ro_task["target_id"])
sousaedu80135b92021-02-17 15:05:18 +0100242
tierno86153522020-12-06 18:27:16 +0000243 # if already assigned ignore
244 if ro_task["target_id"] in self.engine.get_assigned_vims():
245 continue
sousaedu80135b92021-02-17 15:05:18 +0100246
tierno86153522020-12-06 18:27:16 +0000247 # if there is some task locked on this VIM, there is an RO working on it, so ignore
sousaedu80135b92021-02-17 15:05:18 +0100248 if self.db.get_list(
249 "ro_tasks",
250 q_filter={
251 "target_id": ro_task["target_id"],
252 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
253 "locked_at.gt": now - self.task_locked_time,
254 },
255 ):
tierno86153522020-12-06 18:27:16 +0000256 continue
sousaedu80135b92021-02-17 15:05:18 +0100257
tierno86153522020-12-06 18:27:16 +0000258 # unattended, assign vim
259 self.engine.assign_vim(ro_task["target_id"])
sousaedu80135b92021-02-17 15:05:18 +0100260 self.logger.debug(
261 "ordered to load {}. Inactivity detected".format(
262 ro_task["target_id"]
263 )
264 )
tierno70eeb182020-10-19 16:38:00 +0000265
tierno86153522020-12-06 18:27:16 +0000266 # every 2 hours check if there are vims without any ro_task and unload it
267 if now > self.next_check_unused_vim:
268 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
269 self.engine.unload_unused_vims()
sousaedu80135b92021-02-17 15:05:18 +0100270
tierno86153522020-12-06 18:27:16 +0000271 await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
tierno70eeb182020-10-19 16:38:00 +0000272
273 async def aiomain(self):
274 kafka_working = True
275 while not self.to_terminate:
276 try:
277 if not self.aiomain_task_kafka:
278 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
garciadeblasdc5ab152021-05-20 13:08:05 +0200279 for kafka_topic in self.kafka_topics:
280 await self.msg.aiowrite(
281 kafka_topic, "echo", "dummy message", loop=self.loop
282 )
tierno70eeb182020-10-19 16:38:00 +0000283 kafka_working = True
284 self.logger.debug("Starting vim_account subscription task")
285 self.aiomain_task_kafka = asyncio.ensure_future(
sousaedu80135b92021-02-17 15:05:18 +0100286 self.msg.aioread(
287 self.kafka_topics,
288 loop=self.loop,
289 group_id=False,
290 aiocallback=self._msg_callback,
291 ),
292 loop=self.loop,
293 )
294
tierno70eeb182020-10-19 16:38:00 +0000295 if not self.aiomain_task_vim:
296 self.aiomain_task_vim = asyncio.ensure_future(
sousaedu80135b92021-02-17 15:05:18 +0100297 self.vim_watcher(), loop=self.loop
298 )
299
tiernof1b640f2020-12-09 15:06:01 +0000300 if not self.aiomain_task_renew_lock:
sousaedu80135b92021-02-17 15:05:18 +0100301 self.aiomain_task_renew_lock = asyncio.ensure_future(
302 self.lock_renew.renew_locks(), loop=self.loop
303 )
tiernof1b640f2020-12-09 15:06:01 +0000304
305 done, _ = await asyncio.wait(
sousaedu80135b92021-02-17 15:05:18 +0100306 [
307 self.aiomain_task_kafka,
308 self.aiomain_task_vim,
309 self.aiomain_task_renew_lock,
310 ],
311 timeout=None,
312 loop=self.loop,
313 return_when=asyncio.FIRST_COMPLETED,
314 )
315
tierno70eeb182020-10-19 16:38:00 +0000316 try:
317 if self.aiomain_task_kafka in done:
318 exc = self.aiomain_task_kafka.exception()
sousaedu80135b92021-02-17 15:05:18 +0100319 self.logger.error(
320 "kafka subscription task exception: {}".format(exc)
321 )
tierno70eeb182020-10-19 16:38:00 +0000322 self.aiomain_task_kafka = None
sousaedu80135b92021-02-17 15:05:18 +0100323
tierno70eeb182020-10-19 16:38:00 +0000324 if self.aiomain_task_vim in done:
325 exc = self.aiomain_task_vim.exception()
sousaedu80135b92021-02-17 15:05:18 +0100326 self.logger.error(
327 "vim_account watcher task exception: {}".format(exc)
328 )
tierno70eeb182020-10-19 16:38:00 +0000329 self.aiomain_task_vim = None
sousaedu80135b92021-02-17 15:05:18 +0100330
tiernof1b640f2020-12-09 15:06:01 +0000331 if self.aiomain_task_renew_lock in done:
332 exc = self.aiomain_task_renew_lock.exception()
333 self.logger.error("renew_locks task exception: {}".format(exc))
334 self.aiomain_task_renew_lock = None
tierno70eeb182020-10-19 16:38:00 +0000335 except asyncio.CancelledError:
aticig7b521f72022-07-15 00:43:09 +0300336 self.logger.exception("asyncio.CancelledError occured.")
tierno70eeb182020-10-19 16:38:00 +0000337
338 except Exception as e:
339 if self.to_terminate:
340 return
sousaedu80135b92021-02-17 15:05:18 +0100341
tierno70eeb182020-10-19 16:38:00 +0000342 if kafka_working:
343 # logging only first time
sousaedu80135b92021-02-17 15:05:18 +0100344 self.logger.critical(
345 "Error accessing kafka '{}'. Retrying ...".format(e)
346 )
tierno70eeb182020-10-19 16:38:00 +0000347 kafka_working = False
sousaedu80135b92021-02-17 15:05:18 +0100348
tierno70eeb182020-10-19 16:38:00 +0000349 await asyncio.sleep(10, loop=self.loop)
350
351 def run(self):
352 """
353 Start of the thread
354 :return: None
355 """
356 self.loop = asyncio.new_event_loop()
357 try:
358 if not self.db:
359 if self.config["database"]["driver"] == "mongo":
360 self.db = dbmongo.DbMongo()
361 self.db.db_connect(self.config["database"])
362 elif self.config["database"]["driver"] == "memory":
363 self.db = dbmemory.DbMemory()
364 self.db.db_connect(self.config["database"])
365 else:
sousaedu80135b92021-02-17 15:05:18 +0100366 raise VimAdminException(
367 "Invalid configuration param '{}' at '[database]':'driver'".format(
368 self.config["database"]["driver"]
369 )
370 )
371
tiernof1b640f2020-12-09 15:06:01 +0000372 self.lock_renew.start(self.db, self.loop)
373
tierno70eeb182020-10-19 16:38:00 +0000374 if not self.msg:
375 config_msg = self.config["message"].copy()
376 config_msg["loop"] = self.loop
sousaedu80135b92021-02-17 15:05:18 +0100377
tierno70eeb182020-10-19 16:38:00 +0000378 if config_msg["driver"] == "local":
379 self.msg = msglocal.MsgLocal()
380 self.msg.connect(config_msg)
381 elif config_msg["driver"] == "kafka":
382 self.msg = msgkafka.MsgKafka()
383 self.msg.connect(config_msg)
384 else:
sousaedu80135b92021-02-17 15:05:18 +0100385 raise VimAdminException(
386 "Invalid configuration param '{}' at '[message]':'driver'".format(
387 config_msg["driver"]
388 )
389 )
tierno70eeb182020-10-19 16:38:00 +0000390 except (DbException, MsgException) as e:
391 raise VimAdminException(str(e), http_code=e.http_code)
392
tierno86153522020-12-06 18:27:16 +0000393 self.logger.info("Starting")
tierno70eeb182020-10-19 16:38:00 +0000394 while not self.to_terminate:
395 try:
sousaedu80135b92021-02-17 15:05:18 +0100396 self.loop.run_until_complete(
397 asyncio.ensure_future(self.aiomain(), loop=self.loop)
398 )
tierno70eeb182020-10-19 16:38:00 +0000399 # except asyncio.CancelledError:
400 # break # if cancelled it should end, breaking loop
401 except Exception as e:
402 if not self.to_terminate:
sousaedu80135b92021-02-17 15:05:18 +0100403 self.logger.exception(
404 "Exception '{}' at messaging read loop".format(e), exc_info=True
405 )
tierno70eeb182020-10-19 16:38:00 +0000406
tierno86153522020-12-06 18:27:16 +0000407 self.logger.info("Finishing")
tierno70eeb182020-10-19 16:38:00 +0000408 self._stop()
409 self.loop.close()
410
411 async def _msg_callback(self, topic, command, params):
412 """
413 Callback to process a received message from kafka
414 :param topic: topic received
415 :param command: command received
416 :param params: rest of parameters
417 :return: None
418 """
419 try:
420 if command == "echo":
421 return
sousaedu80135b92021-02-17 15:05:18 +0100422
tierno70eeb182020-10-19 16:38:00 +0000423 if topic in self.kafka_topics:
sousaedu80135b92021-02-17 15:05:18 +0100424 target = topic[0:3] # vim, wim or sdn
tierno70eeb182020-10-19 16:38:00 +0000425 target_id = target + ":" + params["_id"]
sousaedu80135b92021-02-17 15:05:18 +0100426
tierno70eeb182020-10-19 16:38:00 +0000427 if command in ("edited", "edit"):
428 self.engine.reload_vim(target_id)
429 self.logger.debug("ordered to reload {}".format(target_id))
430 elif command in ("deleted", "delete"):
431 self.engine.unload_vim(target_id)
432 self.logger.debug("ordered to unload {}".format(target_id))
433 elif command in ("create", "created"):
434 self.engine.check_vim(target_id)
435 self.logger.debug("ordered to check {}".format(target_id))
tiernof1b640f2020-12-09 15:06:01 +0000436 except (DbException, MsgException) as e:
sousaedu80135b92021-02-17 15:05:18 +0100437 self.logger.error(
438 "Error while processing topic={} command={}: {}".format(
439 topic, command, e
440 )
441 )
tierno70eeb182020-10-19 16:38:00 +0000442 except Exception as e:
sousaedu80135b92021-02-17 15:05:18 +0100443 self.logger.exception(
444 "Exception while processing topic={} command={}: {}".format(
445 topic, command, e
446 ),
447 exc_info=True,
448 )
tierno70eeb182020-10-19 16:38:00 +0000449
450 def _stop(self):
451 """
452 Close all connections
453 :return: None
454 """
455 try:
456 if self.db:
457 self.db.db_disconnect()
sousaedu80135b92021-02-17 15:05:18 +0100458
tierno70eeb182020-10-19 16:38:00 +0000459 if self.msg:
460 self.msg.disconnect()
461 except (DbException, MsgException) as e:
462 raise VimAdminException(str(e), http_code=e.http_code)
463
464 def terminate(self):
465 """
466 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
467 but not immediately.
468 :return: None
469 """
470 self.to_terminate = True
tiernof1b640f2020-12-09 15:06:01 +0000471 self.lock_renew.to_terminate = True
sousaedu80135b92021-02-17 15:05:18 +0100472
tierno70eeb182020-10-19 16:38:00 +0000473 if self.aiomain_task_kafka:
474 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
sousaedu80135b92021-02-17 15:05:18 +0100475
tierno70eeb182020-10-19 16:38:00 +0000476 if self.aiomain_task_vim:
477 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
sousaedu80135b92021-02-17 15:05:18 +0100478
tiernof1b640f2020-12-09 15:06:01 +0000479 if self.aiomain_task_renew_lock:
480 self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
sousaedu80135b92021-02-17 15:05:18 +0100481
tiernof1b640f2020-12-09 15:06:01 +0000482 self.lock_renew.stop()