blob: 2582ee2a480e38bcb195b69edb7942b3b7c96fd0 [file] [log] [blame]
tierno70eeb182020-10-19 16:38:00 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus reading VIM messages.
18It is based on asyncio.
19It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20"""
21
tierno70eeb182020-10-19 16:38:00 +000022import asyncio
23from http import HTTPStatus
sousaedu049cbb12022-01-05 11:39:35 +000024import logging
25import threading
26from time import time
tierno70eeb182020-10-19 16:38:00 +000027
sousaedu049cbb12022-01-05 11:39:35 +000028from osm_common import dbmemory, dbmongo, msgkafka, msglocal
tierno70eeb182020-10-19 16:38:00 +000029from osm_common.dbbase import DbException
30from osm_common.msgbase import MsgException
tierno70eeb182020-10-19 16:38:00 +000031
32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35class VimAdminException(Exception):
tierno70eeb182020-10-19 16:38:00 +000036 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
37 self.http_code = http_code
38 Exception.__init__(self, message)
39
40
tiernof1b640f2020-12-09 15:06:01 +000041class LockRenew:
42
43 renew_list = []
44 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
45 # be renewed. The time order is achieved as it is appended at the end
46
47 def __init__(self, config, logger):
48 """
49 Constructor of class
50 :param config: configuration parameters of database and messaging
51 """
52 self.config = config
53 self.logger = logger
54 self.to_terminate = False
55 self.loop = None
56 self.db = None
57 self.task_locked_time = config["global"]["task_locked_time"]
58 self.task_relock_time = config["global"]["task_relock_time"]
59 self.task_max_locked_time = config["global"]["task_max_locked_time"]
60
61 def start(self, db, loop):
62 self.db = db
63 self.loop = loop
64
65 @staticmethod
66 def add_lock_object(database_table, database_object, thread_object):
67 """
68 Insert a task to renew the locking
69 :param database_table: database collection where to maintain the lock
70 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
71 :param thread_object: Thread object that has locked to check if it is alive
72 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
73 """
74 lock_object = {
75 "table": database_table,
76 "_id": database_object["_id"],
77 "initial_lock_time": database_object["locked_at"],
78 "locked_at": database_object["locked_at"],
79 "thread": thread_object,
sousaedu80135b92021-02-17 15:05:18 +010080 "unlocked": False, # True when it is not needed any more
tiernof1b640f2020-12-09 15:06:01 +000081 }
82 LockRenew.renew_list.append(lock_object)
sousaedu80135b92021-02-17 15:05:18 +010083
tiernof1b640f2020-12-09 15:06:01 +000084 return lock_object
85
86 @staticmethod
87 def remove_lock_object(lock_object):
88 lock_object["unlocked"] = True
89
90 async def renew_locks(self):
91 while not self.to_terminate:
92 if not self.renew_list:
sousaedu80135b92021-02-17 15:05:18 +010093 await asyncio.sleep(
94 self.task_locked_time - self.task_relock_time, loop=self.loop
95 )
tiernof1b640f2020-12-09 15:06:01 +000096 continue
sousaedu80135b92021-02-17 15:05:18 +010097
tiernof1b640f2020-12-09 15:06:01 +000098 lock_object = self.renew_list[0]
sousaedu80135b92021-02-17 15:05:18 +010099
100 if (
101 lock_object["unlocked"]
102 or not lock_object["thread"]
103 or not lock_object["thread"].is_alive()
104 ):
tiernof1b640f2020-12-09 15:06:01 +0000105 # task has been finished or locker thread is dead, not needed to re-locked.
106 self.renew_list.pop(0)
107 continue
108
109 locked_at = lock_object["locked_at"]
110 now = time()
sousaedu80135b92021-02-17 15:05:18 +0100111 time_to_relock = (
112 locked_at + self.task_locked_time - self.task_relock_time - now
113 )
114
tiernof1b640f2020-12-09 15:06:01 +0000115 if time_to_relock < 1:
116 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
117 self.renew_list.pop(0)
118 # re-lock
119 new_locked_at = locked_at + self.task_locked_time
sousaedu80135b92021-02-17 15:05:18 +0100120
tiernof1b640f2020-12-09 15:06:01 +0000121 try:
sousaedu80135b92021-02-17 15:05:18 +0100122 if self.db.set_one(
123 lock_object["table"],
124 update_dict={
125 "locked_at": new_locked_at,
126 "modified_at": now,
127 },
128 q_filter={
129 "_id": lock_object["_id"],
130 "locked_at": locked_at,
131 },
132 fail_on_empty=False,
133 ):
134 self.logger.debug(
135 "Renew lock for {}.{}".format(
136 lock_object["table"], lock_object["_id"]
137 )
138 )
tiernof1b640f2020-12-09 15:06:01 +0000139 lock_object["locked_at"] = new_locked_at
140 self.renew_list.append(lock_object)
141 else:
sousaedu80135b92021-02-17 15:05:18 +0100142 self.logger.info(
143 "Cannot renew lock for {}.{}".format(
144 lock_object["table"], lock_object["_id"]
145 )
146 )
tiernof1b640f2020-12-09 15:06:01 +0000147 except Exception as e:
sousaedu80135b92021-02-17 15:05:18 +0100148 self.logger.error(
149 "Exception when trying to renew lock for {}.{}: {}".format(
150 lock_object["table"], lock_object["_id"], e
151 )
152 )
tiernof1b640f2020-12-09 15:06:01 +0000153 else:
154 # wait until it is time to re-lock it
155 await asyncio.sleep(time_to_relock, loop=self.loop)
156
157 def stop(self):
158 # unlock all locked items
159 now = time()
sousaedu80135b92021-02-17 15:05:18 +0100160
tiernof1b640f2020-12-09 15:06:01 +0000161 for lock_object in self.renew_list:
162 locked_at = lock_object["locked_at"]
sousaedu80135b92021-02-17 15:05:18 +0100163
tiernof1b640f2020-12-09 15:06:01 +0000164 if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
sousaedu80135b92021-02-17 15:05:18 +0100165 self.db.set_one(
166 lock_object["table"],
167 update_dict={"locked_at": 0},
168 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
169 fail_on_empty=False,
170 )
tiernof1b640f2020-12-09 15:06:01 +0000171
172
tierno70eeb182020-10-19 16:38:00 +0000173class VimAdminThread(threading.Thread):
tierno86153522020-12-06 18:27:16 +0000174 MAX_TIME_UNATTENDED = 600 # 10min
175 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
tierno70eeb182020-10-19 16:38:00 +0000176 kafka_topics = ("vim_account", "wim_account", "sdn")
177
178 def __init__(self, config, engine):
179 """
180 Constructor of class
181 :param config: configuration parameters of database and messaging
182 :param engine: an instance of Engine class, used for deleting instances
183 """
184 threading.Thread.__init__(self)
185 self.to_terminate = False
186 self.config = config
187 self.db = None
188 self.msg = None
189 self.engine = engine
190 self.loop = None
191 self.last_rotask_time = 0
tierno86153522020-12-06 18:27:16 +0000192 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
tierno70eeb182020-10-19 16:38:00 +0000193 self.logger = logging.getLogger("ro.vimadmin")
sousaedu80135b92021-02-17 15:05:18 +0100194 # asyncio task for receiving vim actions from kafka bus
195 self.aiomain_task_kafka = None
196 # asyncio task for watching ro_tasks not processed by nobody
197 self.aiomain_task_vim = None
tiernof1b640f2020-12-09 15:06:01 +0000198 self.aiomain_task_renew_lock = None
199 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
200 self.lock_renew = LockRenew(config, self.logger)
201 self.task_locked_time = config["global"]["task_locked_time"]
tierno70eeb182020-10-19 16:38:00 +0000202
203 async def vim_watcher(self):
sousaedu80135b92021-02-17 15:05:18 +0100204 """Reads database periodically looking for tasks not processed by nobody because of a reboot
tierno70eeb182020-10-19 16:38:00 +0000205 in order to load this vim"""
tierno86153522020-12-06 18:27:16 +0000206 # firstly read VIMS not processed
207 for target_database in ("vim_accounts", "wim_accounts", "sdns"):
sousaedu80135b92021-02-17 15:05:18 +0100208 unattended_targets = self.db.get_list(
209 target_database,
210 q_filter={"_admin.operations.operationState": "PROCESSING"},
211 )
212
tierno86153522020-12-06 18:27:16 +0000213 for target in unattended_targets:
214 target_id = "{}:{}".format(target_database[:3], target["_id"])
215 self.logger.info("ordered to check {}".format(target_id))
216 self.engine.check_vim(target_id)
217
tierno70eeb182020-10-19 16:38:00 +0000218 while not self.to_terminate:
219 now = time()
tierno86153522020-12-06 18:27:16 +0000220 processed_vims = []
sousaedu80135b92021-02-17 15:05:18 +0100221
tierno70eeb182020-10-19 16:38:00 +0000222 if not self.last_rotask_time:
223 self.last_rotask_time = 0
sousaedu80135b92021-02-17 15:05:18 +0100224
225 ro_tasks = self.db.get_list(
226 "ro_tasks",
227 q_filter={
228 "target_id.ncont": self.engine.get_assigned_vims(),
229 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
230 "locked_at.lt": now - self.task_locked_time,
231 "to_check_at.gt": self.last_rotask_time,
232 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
233 },
234 )
tierno70eeb182020-10-19 16:38:00 +0000235 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
sousaedu80135b92021-02-17 15:05:18 +0100236
tierno70eeb182020-10-19 16:38:00 +0000237 for ro_task in ro_tasks:
tierno86153522020-12-06 18:27:16 +0000238 # if already checked ignore
239 if ro_task["target_id"] in processed_vims:
240 continue
sousaedu80135b92021-02-17 15:05:18 +0100241
tierno86153522020-12-06 18:27:16 +0000242 processed_vims.append(ro_task["target_id"])
sousaedu80135b92021-02-17 15:05:18 +0100243
tierno86153522020-12-06 18:27:16 +0000244 # if already assigned ignore
245 if ro_task["target_id"] in self.engine.get_assigned_vims():
246 continue
sousaedu80135b92021-02-17 15:05:18 +0100247
tierno86153522020-12-06 18:27:16 +0000248 # if there is some task locked on this VIM, there is an RO working on it, so ignore
sousaedu80135b92021-02-17 15:05:18 +0100249 if self.db.get_list(
250 "ro_tasks",
251 q_filter={
252 "target_id": ro_task["target_id"],
253 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
254 "locked_at.gt": now - self.task_locked_time,
255 },
256 ):
tierno86153522020-12-06 18:27:16 +0000257 continue
sousaedu80135b92021-02-17 15:05:18 +0100258
tierno86153522020-12-06 18:27:16 +0000259 # unattended, assign vim
260 self.engine.assign_vim(ro_task["target_id"])
sousaedu80135b92021-02-17 15:05:18 +0100261 self.logger.debug(
262 "ordered to load {}. Inactivity detected".format(
263 ro_task["target_id"]
264 )
265 )
tierno70eeb182020-10-19 16:38:00 +0000266
tierno86153522020-12-06 18:27:16 +0000267 # every 2 hours check if there are vims without any ro_task and unload it
268 if now > self.next_check_unused_vim:
269 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
270 self.engine.unload_unused_vims()
sousaedu80135b92021-02-17 15:05:18 +0100271
tierno86153522020-12-06 18:27:16 +0000272 await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
tierno70eeb182020-10-19 16:38:00 +0000273
274 async def aiomain(self):
275 kafka_working = True
276 while not self.to_terminate:
277 try:
278 if not self.aiomain_task_kafka:
279 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
garciadeblasdc5ab152021-05-20 13:08:05 +0200280 for kafka_topic in self.kafka_topics:
281 await self.msg.aiowrite(
282 kafka_topic, "echo", "dummy message", loop=self.loop
283 )
tierno70eeb182020-10-19 16:38:00 +0000284 kafka_working = True
285 self.logger.debug("Starting vim_account subscription task")
286 self.aiomain_task_kafka = asyncio.ensure_future(
sousaedu80135b92021-02-17 15:05:18 +0100287 self.msg.aioread(
288 self.kafka_topics,
289 loop=self.loop,
290 group_id=False,
291 aiocallback=self._msg_callback,
292 ),
293 loop=self.loop,
294 )
295
tierno70eeb182020-10-19 16:38:00 +0000296 if not self.aiomain_task_vim:
297 self.aiomain_task_vim = asyncio.ensure_future(
sousaedu80135b92021-02-17 15:05:18 +0100298 self.vim_watcher(), loop=self.loop
299 )
300
tiernof1b640f2020-12-09 15:06:01 +0000301 if not self.aiomain_task_renew_lock:
sousaedu80135b92021-02-17 15:05:18 +0100302 self.aiomain_task_renew_lock = asyncio.ensure_future(
303 self.lock_renew.renew_locks(), loop=self.loop
304 )
tiernof1b640f2020-12-09 15:06:01 +0000305
306 done, _ = await asyncio.wait(
sousaedu80135b92021-02-17 15:05:18 +0100307 [
308 self.aiomain_task_kafka,
309 self.aiomain_task_vim,
310 self.aiomain_task_renew_lock,
311 ],
312 timeout=None,
313 loop=self.loop,
314 return_when=asyncio.FIRST_COMPLETED,
315 )
316
tierno70eeb182020-10-19 16:38:00 +0000317 try:
318 if self.aiomain_task_kafka in done:
319 exc = self.aiomain_task_kafka.exception()
sousaedu80135b92021-02-17 15:05:18 +0100320 self.logger.error(
321 "kafka subscription task exception: {}".format(exc)
322 )
tierno70eeb182020-10-19 16:38:00 +0000323 self.aiomain_task_kafka = None
sousaedu80135b92021-02-17 15:05:18 +0100324
tierno70eeb182020-10-19 16:38:00 +0000325 if self.aiomain_task_vim in done:
326 exc = self.aiomain_task_vim.exception()
sousaedu80135b92021-02-17 15:05:18 +0100327 self.logger.error(
328 "vim_account watcher task exception: {}".format(exc)
329 )
tierno70eeb182020-10-19 16:38:00 +0000330 self.aiomain_task_vim = None
sousaedu80135b92021-02-17 15:05:18 +0100331
tiernof1b640f2020-12-09 15:06:01 +0000332 if self.aiomain_task_renew_lock in done:
333 exc = self.aiomain_task_renew_lock.exception()
334 self.logger.error("renew_locks task exception: {}".format(exc))
335 self.aiomain_task_renew_lock = None
tierno70eeb182020-10-19 16:38:00 +0000336 except asyncio.CancelledError:
337 pass
338
339 except Exception as e:
340 if self.to_terminate:
341 return
sousaedu80135b92021-02-17 15:05:18 +0100342
tierno70eeb182020-10-19 16:38:00 +0000343 if kafka_working:
344 # logging only first time
sousaedu80135b92021-02-17 15:05:18 +0100345 self.logger.critical(
346 "Error accessing kafka '{}'. Retrying ...".format(e)
347 )
tierno70eeb182020-10-19 16:38:00 +0000348 kafka_working = False
sousaedu80135b92021-02-17 15:05:18 +0100349
tierno70eeb182020-10-19 16:38:00 +0000350 await asyncio.sleep(10, loop=self.loop)
351
352 def run(self):
353 """
354 Start of the thread
355 :return: None
356 """
357 self.loop = asyncio.new_event_loop()
358 try:
359 if not self.db:
360 if self.config["database"]["driver"] == "mongo":
361 self.db = dbmongo.DbMongo()
362 self.db.db_connect(self.config["database"])
363 elif self.config["database"]["driver"] == "memory":
364 self.db = dbmemory.DbMemory()
365 self.db.db_connect(self.config["database"])
366 else:
sousaedu80135b92021-02-17 15:05:18 +0100367 raise VimAdminException(
368 "Invalid configuration param '{}' at '[database]':'driver'".format(
369 self.config["database"]["driver"]
370 )
371 )
372
tiernof1b640f2020-12-09 15:06:01 +0000373 self.lock_renew.start(self.db, self.loop)
374
tierno70eeb182020-10-19 16:38:00 +0000375 if not self.msg:
376 config_msg = self.config["message"].copy()
377 config_msg["loop"] = self.loop
sousaedu80135b92021-02-17 15:05:18 +0100378
tierno70eeb182020-10-19 16:38:00 +0000379 if config_msg["driver"] == "local":
380 self.msg = msglocal.MsgLocal()
381 self.msg.connect(config_msg)
382 elif config_msg["driver"] == "kafka":
383 self.msg = msgkafka.MsgKafka()
384 self.msg.connect(config_msg)
385 else:
sousaedu80135b92021-02-17 15:05:18 +0100386 raise VimAdminException(
387 "Invalid configuration param '{}' at '[message]':'driver'".format(
388 config_msg["driver"]
389 )
390 )
tierno70eeb182020-10-19 16:38:00 +0000391 except (DbException, MsgException) as e:
392 raise VimAdminException(str(e), http_code=e.http_code)
393
tierno86153522020-12-06 18:27:16 +0000394 self.logger.info("Starting")
tierno70eeb182020-10-19 16:38:00 +0000395 while not self.to_terminate:
396 try:
sousaedu80135b92021-02-17 15:05:18 +0100397 self.loop.run_until_complete(
398 asyncio.ensure_future(self.aiomain(), loop=self.loop)
399 )
tierno70eeb182020-10-19 16:38:00 +0000400 # except asyncio.CancelledError:
401 # break # if cancelled it should end, breaking loop
402 except Exception as e:
403 if not self.to_terminate:
sousaedu80135b92021-02-17 15:05:18 +0100404 self.logger.exception(
405 "Exception '{}' at messaging read loop".format(e), exc_info=True
406 )
tierno70eeb182020-10-19 16:38:00 +0000407
tierno86153522020-12-06 18:27:16 +0000408 self.logger.info("Finishing")
tierno70eeb182020-10-19 16:38:00 +0000409 self._stop()
410 self.loop.close()
411
412 async def _msg_callback(self, topic, command, params):
413 """
414 Callback to process a received message from kafka
415 :param topic: topic received
416 :param command: command received
417 :param params: rest of parameters
418 :return: None
419 """
420 try:
421 if command == "echo":
422 return
sousaedu80135b92021-02-17 15:05:18 +0100423
tierno70eeb182020-10-19 16:38:00 +0000424 if topic in self.kafka_topics:
sousaedu80135b92021-02-17 15:05:18 +0100425 target = topic[0:3] # vim, wim or sdn
tierno70eeb182020-10-19 16:38:00 +0000426 target_id = target + ":" + params["_id"]
sousaedu80135b92021-02-17 15:05:18 +0100427
tierno70eeb182020-10-19 16:38:00 +0000428 if command in ("edited", "edit"):
429 self.engine.reload_vim(target_id)
430 self.logger.debug("ordered to reload {}".format(target_id))
431 elif command in ("deleted", "delete"):
432 self.engine.unload_vim(target_id)
433 self.logger.debug("ordered to unload {}".format(target_id))
434 elif command in ("create", "created"):
435 self.engine.check_vim(target_id)
436 self.logger.debug("ordered to check {}".format(target_id))
tiernof1b640f2020-12-09 15:06:01 +0000437 except (DbException, MsgException) as e:
sousaedu80135b92021-02-17 15:05:18 +0100438 self.logger.error(
439 "Error while processing topic={} command={}: {}".format(
440 topic, command, e
441 )
442 )
tierno70eeb182020-10-19 16:38:00 +0000443 except Exception as e:
sousaedu80135b92021-02-17 15:05:18 +0100444 self.logger.exception(
445 "Exception while processing topic={} command={}: {}".format(
446 topic, command, e
447 ),
448 exc_info=True,
449 )
tierno70eeb182020-10-19 16:38:00 +0000450
451 def _stop(self):
452 """
453 Close all connections
454 :return: None
455 """
456 try:
457 if self.db:
458 self.db.db_disconnect()
sousaedu80135b92021-02-17 15:05:18 +0100459
tierno70eeb182020-10-19 16:38:00 +0000460 if self.msg:
461 self.msg.disconnect()
462 except (DbException, MsgException) as e:
463 raise VimAdminException(str(e), http_code=e.http_code)
464
465 def terminate(self):
466 """
467 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
468 but not immediately.
469 :return: None
470 """
471 self.to_terminate = True
tiernof1b640f2020-12-09 15:06:01 +0000472 self.lock_renew.to_terminate = True
sousaedu80135b92021-02-17 15:05:18 +0100473
tierno70eeb182020-10-19 16:38:00 +0000474 if self.aiomain_task_kafka:
475 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
sousaedu80135b92021-02-17 15:05:18 +0100476
tierno70eeb182020-10-19 16:38:00 +0000477 if self.aiomain_task_vim:
478 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
sousaedu80135b92021-02-17 15:05:18 +0100479
tiernof1b640f2020-12-09 15:06:01 +0000480 if self.aiomain_task_renew_lock:
481 self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
sousaedu80135b92021-02-17 15:05:18 +0100482
tiernof1b640f2020-12-09 15:06:01 +0000483 self.lock_renew.stop()