Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

vim_admin.py

Trend

File Coverage summary

NameClassesLinesConditionals
vim_admin.py
100%
1/1
17%
38/224
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
vim_admin.py
17%
38/224
N/A

Source

NG-RO/osm_ng_ro/vim_admin.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 1 """
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20 """
21
22 1 import asyncio
23 1 from http import HTTPStatus
24 1 import logging
25 1 import threading
26 1 from time import time
27
28 1 from osm_common import dbmemory, dbmongo, msgkafka, msglocal
29 1 from osm_common.dbbase import DbException
30 1 from osm_common.msgbase import MsgException
31
32 1 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 1 class VimAdminException(Exception):
36 1     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
37 0         self.http_code = http_code
38 0         Exception.__init__(self, message)
39
40
41 1 class LockRenew:
42 1     renew_list = []
43     # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
44     # be renewed. The time order is achieved as it is appended at the end
45
46 1     def __init__(self, config, logger):
47         """
48         Constructor of class
49         :param config: configuration parameters of database and messaging
50         """
51 0         self.config = config
52 0         self.logger = logger
53 0         self.to_terminate = False
54 0         self.db = None
55 0         self.task_locked_time = config["global"]["task_locked_time"]
56 0         self.task_relock_time = config["global"]["task_relock_time"]
57 0         self.task_max_locked_time = config["global"]["task_max_locked_time"]
58
59 1     def start(self, db):
60 0         self.db = db
61
62 1     @staticmethod
63 1     def add_lock_object(database_table, database_object, thread_object):
64         """
65         Insert a task to renew the locking
66         :param database_table: database collection where to maintain the lock
67         :param database_object: database object. '_id' and 'locked_at' are mandatory keys
68         :param thread_object: Thread object that has locked to check if it is alive
69         :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
70         """
71 1         lock_object = {
72             "table": database_table,
73             "_id": database_object["_id"],
74             "initial_lock_time": database_object["locked_at"],
75             "locked_at": database_object["locked_at"],
76             "thread": thread_object,
77             "unlocked": False,  # True when it is not needed any more
78         }
79 1         LockRenew.renew_list.append(lock_object)
80
81 1         return lock_object
82
83 1     @staticmethod
84 1     def remove_lock_object(lock_object):
85 1         lock_object["unlocked"] = True
86
87 1     async def renew_locks(self):
88 0         while not self.to_terminate:
89 0             if not self.renew_list:
90 0                 await asyncio.sleep(self.task_locked_time - self.task_relock_time)
91 0                 continue
92
93 0             lock_object = self.renew_list[0]
94
95 0             if (
96                 lock_object["unlocked"]
97                 or not lock_object["thread"]
98                 or not lock_object["thread"].is_alive()
99             ):
100                 # task has been finished or locker thread is dead, not needed to re-locked.
101 0                 self.renew_list.pop(0)
102 0                 continue
103
104 0             locked_at = lock_object["locked_at"]
105 0             now = time()
106 0             time_to_relock = (
107                 locked_at + self.task_locked_time - self.task_relock_time - now
108             )
109
110 0             if time_to_relock < 1:
111 0                 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
112 0                     self.renew_list.pop(0)
113                     # re-lock
114 0                     new_locked_at = locked_at + self.task_locked_time
115
116 0                     try:
117 0                         if self.db.set_one(
118                             lock_object["table"],
119                             update_dict={
120                                 "locked_at": new_locked_at,
121                                 "modified_at": now,
122                             },
123                             q_filter={
124                                 "_id": lock_object["_id"],
125                                 "locked_at": locked_at,
126                             },
127                             fail_on_empty=False,
128                         ):
129 0                             self.logger.debug(
130                                 "Renew lock for {}.{}".format(
131                                     lock_object["table"], lock_object["_id"]
132                                 )
133                             )
134 0                             lock_object["locked_at"] = new_locked_at
135 0                             self.renew_list.append(lock_object)
136                         else:
137 0                             self.logger.info(
138                                 "Cannot renew lock for {}.{}".format(
139                                     lock_object["table"], lock_object["_id"]
140                                 )
141                             )
142 0                     except Exception as e:
143 0                         self.logger.error(
144                             "Exception when trying to renew lock for {}.{}: {}".format(
145                                 lock_object["table"], lock_object["_id"], e
146                             )
147                         )
148             else:
149                 # wait until it is time to re-lock it
150 0                 await asyncio.sleep(time_to_relock)
151
152 1     def stop(self):
153         # unlock all locked items
154 0         now = time()
155
156 0         for lock_object in self.renew_list:
157 0             locked_at = lock_object["locked_at"]
158
159 0             if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
160 0                 self.db.set_one(
161                     lock_object["table"],
162                     update_dict={"locked_at": 0},
163                     q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
164                     fail_on_empty=False,
165                 )
166
167
168 1 class VimAdminThread(threading.Thread):
169 1     MAX_TIME_UNATTENDED = 600  # 10min
170 1     TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
171 1     kafka_topics = ("vim_account", "wim_account", "sdn")
172
173 1     def __init__(self, config, engine):
174         """
175         Constructor of class
176         :param config: configuration parameters of database and messaging
177         :param engine: an instance of Engine class, used for deleting instances
178         """
179 0         threading.Thread.__init__(self)
180 0         self.to_terminate = False
181 0         self.config = config
182 0         self.db = None
183 0         self.msg = None
184 0         self.engine = engine
185 0         self.loop = None
186 0         self.last_rotask_time = 0
187 0         self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
188 0         self.logger = logging.getLogger("ro.vimadmin")
189         # asyncio task for receiving vim actions from kafka bus
190 0         self.aiomain_task_kafka = None
191         # asyncio task for watching ro_tasks not processed by nobody
192 0         self.aiomain_task_vim = None
193 0         self.aiomain_task_renew_lock = None
194         # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
195 0         self.lock_renew = LockRenew(config, self.logger)
196 0         self.task_locked_time = config["global"]["task_locked_time"]
197
198 1     async def vim_watcher(self):
199         """Reads database periodically looking for tasks not processed by nobody because of a reboot
200         in order to load this vim"""
201         # firstly read VIMS not processed
202 0         for target_database in ("vim_accounts", "wim_accounts", "sdns"):
203 0             unattended_targets = self.db.get_list(
204                 target_database,
205                 q_filter={"_admin.operations.operationState": "PROCESSING"},
206             )
207
208 0             for target in unattended_targets:
209 0                 target_id = "{}:{}".format(target_database[:3], target["_id"])
210 0                 self.logger.info("ordered to check {}".format(target_id))
211 0                 self.engine.check_vim(target_id)
212
213 0         while not self.to_terminate:
214 0             now = time()
215 0             processed_vims = []
216
217 0             if not self.last_rotask_time:
218 0                 self.last_rotask_time = 0
219
220 0             ro_tasks = self.db.get_list(
221                 "ro_tasks",
222                 q_filter={
223                     "target_id.ncont": self.engine.get_assigned_vims(),
224                     "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
225                     "locked_at.lt": now - self.task_locked_time,
226                     "to_check_at.gt": self.last_rotask_time,
227                     "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
228                 },
229             )
230 0             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
231
232 0             for ro_task in ro_tasks:
233                 # if already checked ignore
234 0                 if ro_task["target_id"] in processed_vims:
235 0                     continue
236
237 0                 processed_vims.append(ro_task["target_id"])
238
239                 # if already assigned ignore
240 0                 if ro_task["target_id"] in self.engine.get_assigned_vims():
241 0                     continue
242
243                 # if there is some task locked on this VIM, there is an RO working on it, so ignore
244 0                 if self.db.get_list(
245                     "ro_tasks",
246                     q_filter={
247                         "target_id": ro_task["target_id"],
248                         "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
249                         "locked_at.gt": now - self.task_locked_time,
250                     },
251                 ):
252 0                     continue
253
254                 # unattended, assign vim
255 0                 self.engine.assign_vim(ro_task["target_id"])
256 0                 self.logger.debug(
257                     "ordered to load {}. Inactivity detected".format(
258                         ro_task["target_id"]
259                     )
260                 )
261
262             # every 2 hours check if there are vims without any ro_task and unload it
263 0             if now > self.next_check_unused_vim:
264 0                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
265 0                 self.engine.unload_unused_vims()
266
267 0             await asyncio.sleep(self.MAX_TIME_UNATTENDED)
268
269 1     async def aiomain(self):
270 0         kafka_working = True
271 0         while not self.to_terminate:
272 0             try:
273 0                 if not self.aiomain_task_kafka:
274 0                     for kafka_topic in self.kafka_topics:
275 0                         await self.msg.aiowrite(kafka_topic, "echo", "dummy message")
276 0                     kafka_working = True
277 0                     self.logger.debug("Starting vim_account subscription task")
278 0                     self.aiomain_task_kafka = asyncio.ensure_future(
279                         self.msg.aioread(
280                             self.kafka_topics,
281                             group_id=False,
282                             aiocallback=self._msg_callback,
283                         ),
284                     )
285
286 0                 if not self.aiomain_task_vim:
287 0                     self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher())
288
289 0                 if not self.aiomain_task_renew_lock:
290 0                     self.aiomain_task_renew_lock = asyncio.ensure_future(
291                         self.lock_renew.renew_locks()
292                     )
293
294 0                 done, _ = await asyncio.wait(
295                     [
296                         self.aiomain_task_kafka,
297                         self.aiomain_task_vim,
298                         self.aiomain_task_renew_lock,
299                     ],
300                     timeout=None,
301                     return_when=asyncio.FIRST_COMPLETED,
302                 )
303
304 0                 try:
305 0                     if self.aiomain_task_kafka in done:
306 0                         exc = self.aiomain_task_kafka.exception()
307 0                         self.logger.error(
308                             "kafka subscription task exception: {}".format(exc)
309                         )
310 0                         self.aiomain_task_kafka = None
311
312 0                     if self.aiomain_task_vim in done:
313 0                         exc = self.aiomain_task_vim.exception()
314 0                         self.logger.error(
315                             "vim_account watcher task exception: {}".format(exc)
316                         )
317 0                         self.aiomain_task_vim = None
318
319 0                     if self.aiomain_task_renew_lock in done:
320 0                         exc = self.aiomain_task_renew_lock.exception()
321 0                         self.logger.error("renew_locks task exception: {}".format(exc))
322 0                         self.aiomain_task_renew_lock = None
323 0                 except asyncio.CancelledError:
324 0                     self.logger.exception("asyncio.CancelledError occured.")
325
326 0             except Exception as e:
327 0                 if self.to_terminate:
328 0                     return
329
330 0                 if kafka_working:
331                     # logging only first time
332 0                     self.logger.critical(
333                         "Error accessing kafka '{}'. Retrying ...".format(e)
334                     )
335 0                     kafka_working = False
336
337 0             await asyncio.sleep(10)
338
339 1     def run(self):
340         """
341         Start of the thread
342         :return: None
343         """
344 0         self.loop = asyncio.new_event_loop()
345 0         try:
346 0             if not self.db:
347 0                 if self.config["database"]["driver"] == "mongo":
348 0                     self.db = dbmongo.DbMongo()
349 0                     self.db.db_connect(self.config["database"])
350 0                 elif self.config["database"]["driver"] == "memory":
351 0                     self.db = dbmemory.DbMemory()
352 0                     self.db.db_connect(self.config["database"])
353                 else:
354 0                     raise VimAdminException(
355                         "Invalid configuration param '{}' at '[database]':'driver'".format(
356                             self.config["database"]["driver"]
357                         )
358                     )
359
360 0             self.lock_renew.start(self.db)
361
362 0             if not self.msg:
363 0                 config_msg = self.config["message"].copy()
364
365 0                 if config_msg["driver"] == "local":
366 0                     self.msg = msglocal.MsgLocal()
367 0                     self.msg.connect(config_msg)
368 0                 elif config_msg["driver"] == "kafka":
369 0                     self.msg = msgkafka.MsgKafka()
370 0                     self.msg.connect(config_msg)
371                 else:
372 0                     raise VimAdminException(
373                         "Invalid configuration param '{}' at '[message]':'driver'".format(
374                             config_msg["driver"]
375                         )
376                     )
377 0         except (DbException, MsgException) as e:
378 0             raise VimAdminException(str(e), http_code=e.http_code)
379
380 0         self.logger.info("Starting")
381 0         while not self.to_terminate:
382 0             try:
383 0                 asyncio.run(self.main_task())
384 0             except Exception as e:
385 0                 if not self.to_terminate:
386 0                     self.logger.exception(
387                         "Exception '{}' at messaging read loop".format(e), exc_info=True
388                     )
389
390 0         self.logger.info("Finishing")
391 0         self._stop()
392 0         self.loop.close()
393
394 1     async def main_task(self):
395 0         task = asyncio.ensure_future(self.aiomain())
396 0         await task
397
398 1     async def _msg_callback(self, topic, command, params):
399         """
400         Callback to process a received message from kafka
401         :param topic:  topic received
402         :param command:  command received
403         :param params: rest of parameters
404         :return: None
405         """
406 0         try:
407 0             if command == "echo":
408 0                 return
409
410 0             if topic in self.kafka_topics:
411 0                 target = topic[0:3]  # vim, wim or sdn
412 0                 target_id = target + ":" + params["_id"]
413
414 0                 if command in ("edited", "edit"):
415 0                     self.engine.reload_vim(target_id)
416 0                     self.logger.debug("ordered to reload {}".format(target_id))
417 0                 elif command in ("deleted", "delete"):
418 0                     self.engine.unload_vim(target_id)
419 0                     self.logger.debug("ordered to unload {}".format(target_id))
420 0                 elif command in ("create", "created"):
421 0                     self.engine.check_vim(target_id)
422 0                     self.logger.debug("ordered to check {}".format(target_id))
423 0         except (DbException, MsgException) as e:
424 0             self.logger.error(
425                 "Error while processing topic={} command={}: {}".format(
426                     topic, command, e
427                 )
428             )
429 0         except Exception as e:
430 0             self.logger.exception(
431                 "Exception while processing topic={} command={}: {}".format(
432                     topic, command, e
433                 ),
434                 exc_info=True,
435             )
436
437 1     def _stop(self):
438         """
439         Close all connections
440         :return: None
441         """
442 0         try:
443 0             if self.db:
444 0                 self.db.db_disconnect()
445
446 0             if self.msg:
447 0                 self.msg.disconnect()
448 0         except (DbException, MsgException) as e:
449 0             raise VimAdminException(str(e), http_code=e.http_code)
450
451 1     def terminate(self):
452         """
453         This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
454         but not immediately.
455         :return: None
456         """
457 0         self.to_terminate = True
458 0         self.lock_renew.to_terminate = True
459
460 0         if self.aiomain_task_kafka:
461 0             self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel())
462
463 0         if self.aiomain_task_vim:
464 0             self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel())
465
466 0         if self.aiomain_task_renew_lock:
467 0             self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel())
468
469 0         self.lock_renew.stop()