Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

vim_admin.py

Trend

Classes100%
 
Lines17%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
vim_admin.py
100%
1/1
17%
37/224
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
vim_admin.py
17%
37/224
N/A

Source

NG-RO/osm_ng_ro/vim_admin.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 1 """
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20 """
21
22 1 import asyncio
23 1 from http import HTTPStatus
24 1 import logging
25 1 import threading
26 1 from time import time
27
28 1 from osm_common import dbmemory, dbmongo, msgkafka, msglocal
29 1 from osm_common.dbbase import DbException
30 1 from osm_common.msgbase import MsgException
31
32 1 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 1 class VimAdminException(Exception):
36 1     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
37 0         self.http_code = http_code
38 0         Exception.__init__(self, message)
39
40
41 1 class LockRenew:
42
43 1     renew_list = []
44     # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
45     # be renewed. The time order is achieved as it is appended at the end
46
47 1     def __init__(self, config, logger):
48         """
49         Constructor of class
50         :param config: configuration parameters of database and messaging
51         """
52 0         self.config = config
53 0         self.logger = logger
54 0         self.to_terminate = False
55 0         self.loop = None
56 0         self.db = None
57 0         self.task_locked_time = config["global"]["task_locked_time"]
58 0         self.task_relock_time = config["global"]["task_relock_time"]
59 0         self.task_max_locked_time = config["global"]["task_max_locked_time"]
60
61 1     def start(self, db, loop):
62 0         self.db = db
63 0         self.loop = loop
64
65 1     @staticmethod
66 1     def add_lock_object(database_table, database_object, thread_object):
67         """
68         Insert a task to renew the locking
69         :param database_table: database collection where to maintain the lock
70         :param database_object: database object. '_id' and 'locked_at' are mandatory keys
71         :param thread_object: Thread object that has locked to check if it is alive
72         :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
73         """
74 1         lock_object = {
75             "table": database_table,
76             "_id": database_object["_id"],
77             "initial_lock_time": database_object["locked_at"],
78             "locked_at": database_object["locked_at"],
79             "thread": thread_object,
80             "unlocked": False,  # True when it is not needed any more
81         }
82 1         LockRenew.renew_list.append(lock_object)
83
84 1         return lock_object
85
86 1     @staticmethod
87 1     def remove_lock_object(lock_object):
88 1         lock_object["unlocked"] = True
89
90 1     async def renew_locks(self):
91 0         while not self.to_terminate:
92 0             if not self.renew_list:
93 0                 await asyncio.sleep(
94                     self.task_locked_time - self.task_relock_time, loop=self.loop
95                 )
96 0                 continue
97
98 0             lock_object = self.renew_list[0]
99
100 0             if (
101                 lock_object["unlocked"]
102                 or not lock_object["thread"]
103                 or not lock_object["thread"].is_alive()
104             ):
105                 # task has been finished or locker thread is dead, not needed to re-locked.
106 0                 self.renew_list.pop(0)
107 0                 continue
108
109 0             locked_at = lock_object["locked_at"]
110 0             now = time()
111 0             time_to_relock = (
112                 locked_at + self.task_locked_time - self.task_relock_time - now
113             )
114
115 0             if time_to_relock < 1:
116 0                 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
117 0                     self.renew_list.pop(0)
118                     # re-lock
119 0                     new_locked_at = locked_at + self.task_locked_time
120
121 0                     try:
122 0                         if self.db.set_one(
123                             lock_object["table"],
124                             update_dict={
125                                 "locked_at": new_locked_at,
126                                 "modified_at": now,
127                             },
128                             q_filter={
129                                 "_id": lock_object["_id"],
130                                 "locked_at": locked_at,
131                             },
132                             fail_on_empty=False,
133                         ):
134 0                             self.logger.debug(
135                                 "Renew lock for {}.{}".format(
136                                     lock_object["table"], lock_object["_id"]
137                                 )
138                             )
139 0                             lock_object["locked_at"] = new_locked_at
140 0                             self.renew_list.append(lock_object)
141                         else:
142 0                             self.logger.info(
143                                 "Cannot renew lock for {}.{}".format(
144                                     lock_object["table"], lock_object["_id"]
145                                 )
146                             )
147 0                     except Exception as e:
148 0                         self.logger.error(
149                             "Exception when trying to renew lock for {}.{}: {}".format(
150                                 lock_object["table"], lock_object["_id"], e
151                             )
152                         )
153             else:
154                 # wait until it is time to re-lock it
155 0                 await asyncio.sleep(time_to_relock, loop=self.loop)
156
157 1     def stop(self):
158         # unlock all locked items
159 0         now = time()
160
161 0         for lock_object in self.renew_list:
162 0             locked_at = lock_object["locked_at"]
163
164 0             if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
165 0                 self.db.set_one(
166                     lock_object["table"],
167                     update_dict={"locked_at": 0},
168                     q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
169                     fail_on_empty=False,
170                 )
171
172
173 1 class VimAdminThread(threading.Thread):
174 1     MAX_TIME_UNATTENDED = 600  # 10min
175 1     TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
176 1     kafka_topics = ("vim_account", "wim_account", "sdn")
177
178 1     def __init__(self, config, engine):
179         """
180         Constructor of class
181         :param config: configuration parameters of database and messaging
182         :param engine: an instance of Engine class, used for deleting instances
183         """
184 0         threading.Thread.__init__(self)
185 0         self.to_terminate = False
186 0         self.config = config
187 0         self.db = None
188 0         self.msg = None
189 0         self.engine = engine
190 0         self.loop = None
191 0         self.last_rotask_time = 0
192 0         self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
193 0         self.logger = logging.getLogger("ro.vimadmin")
194         # asyncio task for receiving vim actions from kafka bus
195 0         self.aiomain_task_kafka = None
196         # asyncio task for watching ro_tasks not processed by nobody
197 0         self.aiomain_task_vim = None
198 0         self.aiomain_task_renew_lock = None
199         # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
200 0         self.lock_renew = LockRenew(config, self.logger)
201 0         self.task_locked_time = config["global"]["task_locked_time"]
202
203 1     async def vim_watcher(self):
204         """Reads database periodically looking for tasks not processed by nobody because of a reboot
205         in order to load this vim"""
206         # firstly read VIMS not processed
207 0         for target_database in ("vim_accounts", "wim_accounts", "sdns"):
208 0             unattended_targets = self.db.get_list(
209                 target_database,
210                 q_filter={"_admin.operations.operationState": "PROCESSING"},
211             )
212
213 0             for target in unattended_targets:
214 0                 target_id = "{}:{}".format(target_database[:3], target["_id"])
215 0                 self.logger.info("ordered to check {}".format(target_id))
216 0                 self.engine.check_vim(target_id)
217
218 0         while not self.to_terminate:
219 0             now = time()
220 0             processed_vims = []
221
222 0             if not self.last_rotask_time:
223 0                 self.last_rotask_time = 0
224
225 0             ro_tasks = self.db.get_list(
226                 "ro_tasks",
227                 q_filter={
228                     "target_id.ncont": self.engine.get_assigned_vims(),
229                     "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
230                     "locked_at.lt": now - self.task_locked_time,
231                     "to_check_at.gt": self.last_rotask_time,
232                     "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
233                 },
234             )
235 0             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
236
237 0             for ro_task in ro_tasks:
238                 # if already checked ignore
239 0                 if ro_task["target_id"] in processed_vims:
240 0                     continue
241
242 0                 processed_vims.append(ro_task["target_id"])
243
244                 # if already assigned ignore
245 0                 if ro_task["target_id"] in self.engine.get_assigned_vims():
246 0                     continue
247
248                 # if there is some task locked on this VIM, there is an RO working on it, so ignore
249 0                 if self.db.get_list(
250                     "ro_tasks",
251                     q_filter={
252                         "target_id": ro_task["target_id"],
253                         "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
254                         "locked_at.gt": now - self.task_locked_time,
255                     },
256                 ):
257 0                     continue
258
259                 # unattended, assign vim
260 0                 self.engine.assign_vim(ro_task["target_id"])
261 0                 self.logger.debug(
262                     "ordered to load {}. Inactivity detected".format(
263                         ro_task["target_id"]
264                     )
265                 )
266
267             # every 2 hours check if there are vims without any ro_task and unload it
268 0             if now > self.next_check_unused_vim:
269 0                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
270 0                 self.engine.unload_unused_vims()
271
272 0             await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
273
274 1     async def aiomain(self):
275 0         kafka_working = True
276 0         while not self.to_terminate:
277 0             try:
278 0                 if not self.aiomain_task_kafka:
279                     # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
280 0                     for kafka_topic in self.kafka_topics:
281 0                         await self.msg.aiowrite(
282                             kafka_topic, "echo", "dummy message", loop=self.loop
283                         )
284 0                     kafka_working = True
285 0                     self.logger.debug("Starting vim_account subscription task")
286 0                     self.aiomain_task_kafka = asyncio.ensure_future(
287                         self.msg.aioread(
288                             self.kafka_topics,
289                             loop=self.loop,
290                             group_id=False,
291                             aiocallback=self._msg_callback,
292                         ),
293                         loop=self.loop,
294                     )
295
296 0                 if not self.aiomain_task_vim:
297 0                     self.aiomain_task_vim = asyncio.ensure_future(
298                         self.vim_watcher(), loop=self.loop
299                     )
300
301 0                 if not self.aiomain_task_renew_lock:
302 0                     self.aiomain_task_renew_lock = asyncio.ensure_future(
303                         self.lock_renew.renew_locks(), loop=self.loop
304                     )
305
306 0                 done, _ = await asyncio.wait(
307                     [
308                         self.aiomain_task_kafka,
309                         self.aiomain_task_vim,
310                         self.aiomain_task_renew_lock,
311                     ],
312                     timeout=None,
313                     loop=self.loop,
314                     return_when=asyncio.FIRST_COMPLETED,
315                 )
316
317 0                 try:
318 0                     if self.aiomain_task_kafka in done:
319 0                         exc = self.aiomain_task_kafka.exception()
320 0                         self.logger.error(
321                             "kafka subscription task exception: {}".format(exc)
322                         )
323 0                         self.aiomain_task_kafka = None
324
325 0                     if self.aiomain_task_vim in done:
326 0                         exc = self.aiomain_task_vim.exception()
327 0                         self.logger.error(
328                             "vim_account watcher task exception: {}".format(exc)
329                         )
330 0                         self.aiomain_task_vim = None
331
332 0                     if self.aiomain_task_renew_lock in done:
333 0                         exc = self.aiomain_task_renew_lock.exception()
334 0                         self.logger.error("renew_locks task exception: {}".format(exc))
335 0                         self.aiomain_task_renew_lock = None
336 0                 except asyncio.CancelledError:
337 0                     pass
338
339 0             except Exception as e:
340 0                 if self.to_terminate:
341 0                     return
342
343 0                 if kafka_working:
344                     # logging only first time
345 0                     self.logger.critical(
346                         "Error accessing kafka '{}'. Retrying ...".format(e)
347                     )
348 0                     kafka_working = False
349
350 0             await asyncio.sleep(10, loop=self.loop)
351
352 1     def run(self):
353         """
354         Start of the thread
355         :return: None
356         """
357 0         self.loop = asyncio.new_event_loop()
358 0         try:
359 0             if not self.db:
360 0                 if self.config["database"]["driver"] == "mongo":
361 0                     self.db = dbmongo.DbMongo()
362 0                     self.db.db_connect(self.config["database"])
363 0                 elif self.config["database"]["driver"] == "memory":
364 0                     self.db = dbmemory.DbMemory()
365 0                     self.db.db_connect(self.config["database"])
366                 else:
367 0                     raise VimAdminException(
368                         "Invalid configuration param '{}' at '[database]':'driver'".format(
369                             self.config["database"]["driver"]
370                         )
371                     )
372
373 0             self.lock_renew.start(self.db, self.loop)
374
375 0             if not self.msg:
376 0                 config_msg = self.config["message"].copy()
377 0                 config_msg["loop"] = self.loop
378
379 0                 if config_msg["driver"] == "local":
380 0                     self.msg = msglocal.MsgLocal()
381 0                     self.msg.connect(config_msg)
382 0                 elif config_msg["driver"] == "kafka":
383 0                     self.msg = msgkafka.MsgKafka()
384 0                     self.msg.connect(config_msg)
385                 else:
386 0                     raise VimAdminException(
387                         "Invalid configuration param '{}' at '[message]':'driver'".format(
388                             config_msg["driver"]
389                         )
390                     )
391 0         except (DbException, MsgException) as e:
392 0             raise VimAdminException(str(e), http_code=e.http_code)
393
394 0         self.logger.info("Starting")
395 0         while not self.to_terminate:
396 0             try:
397 0                 self.loop.run_until_complete(
398                     asyncio.ensure_future(self.aiomain(), loop=self.loop)
399                 )
400             # except asyncio.CancelledError:
401             #     break  # if cancelled it should end, breaking loop
402 0             except Exception as e:
403 0                 if not self.to_terminate:
404 0                     self.logger.exception(
405                         "Exception '{}' at messaging read loop".format(e), exc_info=True
406                     )
407
408 0         self.logger.info("Finishing")
409 0         self._stop()
410 0         self.loop.close()
411
412 1     async def _msg_callback(self, topic, command, params):
413         """
414         Callback to process a received message from kafka
415         :param topic:  topic received
416         :param command:  command received
417         :param params: rest of parameters
418         :return: None
419         """
420 0         try:
421 0             if command == "echo":
422 0                 return
423
424 0             if topic in self.kafka_topics:
425 0                 target = topic[0:3]  # vim, wim or sdn
426 0                 target_id = target + ":" + params["_id"]
427
428 0                 if command in ("edited", "edit"):
429 0                     self.engine.reload_vim(target_id)
430 0                     self.logger.debug("ordered to reload {}".format(target_id))
431 0                 elif command in ("deleted", "delete"):
432 0                     self.engine.unload_vim(target_id)
433 0                     self.logger.debug("ordered to unload {}".format(target_id))
434 0                 elif command in ("create", "created"):
435 0                     self.engine.check_vim(target_id)
436 0                     self.logger.debug("ordered to check {}".format(target_id))
437 0         except (DbException, MsgException) as e:
438 0             self.logger.error(
439                 "Error while processing topic={} command={}: {}".format(
440                     topic, command, e
441                 )
442             )
443 0         except Exception as e:
444 0             self.logger.exception(
445                 "Exception while processing topic={} command={}: {}".format(
446                     topic, command, e
447                 ),
448                 exc_info=True,
449             )
450
451 1     def _stop(self):
452         """
453         Close all connections
454         :return: None
455         """
456 0         try:
457 0             if self.db:
458 0                 self.db.db_disconnect()
459
460 0             if self.msg:
461 0                 self.msg.disconnect()
462 0         except (DbException, MsgException) as e:
463 0             raise VimAdminException(str(e), http_code=e.http_code)
464
465 1     def terminate(self):
466         """
467         This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
468         but not immediately.
469         :return: None
470         """
471 0         self.to_terminate = True
472 0         self.lock_renew.to_terminate = True
473
474 0         if self.aiomain_task_kafka:
475 0             self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
476
477 0         if self.aiomain_task_vim:
478 0             self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
479
480 0         if self.aiomain_task_renew_lock:
481 0             self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
482
483 0         self.lock_renew.stop()