Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

vim_admin.py

Trend

Classes100%
 
Lines17%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
vim_admin.py
100%
1/1
17%
37/224
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
vim_admin.py
17%
37/224
N/A

Source

NG-RO/osm_ng_ro/vim_admin.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 1 """
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20 """
21
22 1 import asyncio
23 1 from http import HTTPStatus
24 1 import logging
25 1 import threading
26 1 from time import time
27
28 1 from osm_common import dbmemory, dbmongo, msgkafka, msglocal
29 1 from osm_common.dbbase import DbException
30 1 from osm_common.msgbase import MsgException
31
32 1 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 1 class VimAdminException(Exception):
36 1     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
37 0         self.http_code = http_code
38 0         Exception.__init__(self, message)
39
40
41 1 class LockRenew:
42 1     renew_list = []
43     # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
44     # be renewed. The time order is achieved as it is appended at the end
45
46 1     def __init__(self, config, logger):
47         """
48         Constructor of class
49         :param config: configuration parameters of database and messaging
50         """
51 0         self.config = config
52 0         self.logger = logger
53 0         self.to_terminate = False
54 0         self.loop = None
55 0         self.db = None
56 0         self.task_locked_time = config["global"]["task_locked_time"]
57 0         self.task_relock_time = config["global"]["task_relock_time"]
58 0         self.task_max_locked_time = config["global"]["task_max_locked_time"]
59
60 1     def start(self, db, loop):
61 0         self.db = db
62 0         self.loop = loop
63
64 1     @staticmethod
65 1     def add_lock_object(database_table, database_object, thread_object):
66         """
67         Insert a task to renew the locking
68         :param database_table: database collection where to maintain the lock
69         :param database_object: database object. '_id' and 'locked_at' are mandatory keys
70         :param thread_object: Thread object that has locked to check if it is alive
71         :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
72         """
73 1         lock_object = {
74             "table": database_table,
75             "_id": database_object["_id"],
76             "initial_lock_time": database_object["locked_at"],
77             "locked_at": database_object["locked_at"],
78             "thread": thread_object,
79             "unlocked": False,  # True when it is not needed any more
80         }
81 1         LockRenew.renew_list.append(lock_object)
82
83 1         return lock_object
84
85 1     @staticmethod
86 1     def remove_lock_object(lock_object):
87 1         lock_object["unlocked"] = True
88
89 1     async def renew_locks(self):
90 0         while not self.to_terminate:
91 0             if not self.renew_list:
92 0                 await asyncio.sleep(
93                     self.task_locked_time - self.task_relock_time, loop=self.loop
94                 )
95 0                 continue
96
97 0             lock_object = self.renew_list[0]
98
99 0             if (
100                 lock_object["unlocked"]
101                 or not lock_object["thread"]
102                 or not lock_object["thread"].is_alive()
103             ):
104                 # task has been finished or locker thread is dead, not needed to re-locked.
105 0                 self.renew_list.pop(0)
106 0                 continue
107
108 0             locked_at = lock_object["locked_at"]
109 0             now = time()
110 0             time_to_relock = (
111                 locked_at + self.task_locked_time - self.task_relock_time - now
112             )
113
114 0             if time_to_relock < 1:
115 0                 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
116 0                     self.renew_list.pop(0)
117                     # re-lock
118 0                     new_locked_at = locked_at + self.task_locked_time
119
120 0                     try:
121 0                         if self.db.set_one(
122                             lock_object["table"],
123                             update_dict={
124                                 "locked_at": new_locked_at,
125                                 "modified_at": now,
126                             },
127                             q_filter={
128                                 "_id": lock_object["_id"],
129                                 "locked_at": locked_at,
130                             },
131                             fail_on_empty=False,
132                         ):
133 0                             self.logger.debug(
134                                 "Renew lock for {}.{}".format(
135                                     lock_object["table"], lock_object["_id"]
136                                 )
137                             )
138 0                             lock_object["locked_at"] = new_locked_at
139 0                             self.renew_list.append(lock_object)
140                         else:
141 0                             self.logger.info(
142                                 "Cannot renew lock for {}.{}".format(
143                                     lock_object["table"], lock_object["_id"]
144                                 )
145                             )
146 0                     except Exception as e:
147 0                         self.logger.error(
148                             "Exception when trying to renew lock for {}.{}: {}".format(
149                                 lock_object["table"], lock_object["_id"], e
150                             )
151                         )
152             else:
153                 # wait until it is time to re-lock it
154 0                 await asyncio.sleep(time_to_relock, loop=self.loop)
155
156 1     def stop(self):
157         # unlock all locked items
158 0         now = time()
159
160 0         for lock_object in self.renew_list:
161 0             locked_at = lock_object["locked_at"]
162
163 0             if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
164 0                 self.db.set_one(
165                     lock_object["table"],
166                     update_dict={"locked_at": 0},
167                     q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
168                     fail_on_empty=False,
169                 )
170
171
172 1 class VimAdminThread(threading.Thread):
173 1     MAX_TIME_UNATTENDED = 600  # 10min
174 1     TIME_CHECK_UNUSED_VIM = 3600 * 2  # 2h
175 1     kafka_topics = ("vim_account", "wim_account", "sdn")
176
177 1     def __init__(self, config, engine):
178         """
179         Constructor of class
180         :param config: configuration parameters of database and messaging
181         :param engine: an instance of Engine class, used for deleting instances
182         """
183 0         threading.Thread.__init__(self)
184 0         self.to_terminate = False
185 0         self.config = config
186 0         self.db = None
187 0         self.msg = None
188 0         self.engine = engine
189 0         self.loop = None
190 0         self.last_rotask_time = 0
191 0         self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
192 0         self.logger = logging.getLogger("ro.vimadmin")
193         # asyncio task for receiving vim actions from kafka bus
194 0         self.aiomain_task_kafka = None
195         # asyncio task for watching ro_tasks not processed by nobody
196 0         self.aiomain_task_vim = None
197 0         self.aiomain_task_renew_lock = None
198         # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
199 0         self.lock_renew = LockRenew(config, self.logger)
200 0         self.task_locked_time = config["global"]["task_locked_time"]
201
202 1     async def vim_watcher(self):
203         """Reads database periodically looking for tasks not processed by nobody because of a reboot
204         in order to load this vim"""
205         # firstly read VIMS not processed
206 0         for target_database in ("vim_accounts", "wim_accounts", "sdns"):
207 0             unattended_targets = self.db.get_list(
208                 target_database,
209                 q_filter={"_admin.operations.operationState": "PROCESSING"},
210             )
211
212 0             for target in unattended_targets:
213 0                 target_id = "{}:{}".format(target_database[:3], target["_id"])
214 0                 self.logger.info("ordered to check {}".format(target_id))
215 0                 self.engine.check_vim(target_id)
216
217 0         while not self.to_terminate:
218 0             now = time()
219 0             processed_vims = []
220
221 0             if not self.last_rotask_time:
222 0                 self.last_rotask_time = 0
223
224 0             ro_tasks = self.db.get_list(
225                 "ro_tasks",
226                 q_filter={
227                     "target_id.ncont": self.engine.get_assigned_vims(),
228                     "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
229                     "locked_at.lt": now - self.task_locked_time,
230                     "to_check_at.gt": self.last_rotask_time,
231                     "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
232                 },
233             )
234 0             self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
235
236 0             for ro_task in ro_tasks:
237                 # if already checked ignore
238 0                 if ro_task["target_id"] in processed_vims:
239 0                     continue
240
241 0                 processed_vims.append(ro_task["target_id"])
242
243                 # if already assigned ignore
244 0                 if ro_task["target_id"] in self.engine.get_assigned_vims():
245 0                     continue
246
247                 # if there is some task locked on this VIM, there is an RO working on it, so ignore
248 0                 if self.db.get_list(
249                     "ro_tasks",
250                     q_filter={
251                         "target_id": ro_task["target_id"],
252                         "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
253                         "locked_at.gt": now - self.task_locked_time,
254                     },
255                 ):
256 0                     continue
257
258                 # unattended, assign vim
259 0                 self.engine.assign_vim(ro_task["target_id"])
260 0                 self.logger.debug(
261                     "ordered to load {}. Inactivity detected".format(
262                         ro_task["target_id"]
263                     )
264                 )
265
266             # every 2 hours check if there are vims without any ro_task and unload it
267 0             if now > self.next_check_unused_vim:
268 0                 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
269 0                 self.engine.unload_unused_vims()
270
271 0             await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
272
273 1     async def aiomain(self):
274 0         kafka_working = True
275 0         while not self.to_terminate:
276 0             try:
277 0                 if not self.aiomain_task_kafka:
278                     # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
279 0                     for kafka_topic in self.kafka_topics:
280 0                         await self.msg.aiowrite(
281                             kafka_topic, "echo", "dummy message", loop=self.loop
282                         )
283 0                     kafka_working = True
284 0                     self.logger.debug("Starting vim_account subscription task")
285 0                     self.aiomain_task_kafka = asyncio.ensure_future(
286                         self.msg.aioread(
287                             self.kafka_topics,
288                             loop=self.loop,
289                             group_id=False,
290                             aiocallback=self._msg_callback,
291                         ),
292                         loop=self.loop,
293                     )
294
295 0                 if not self.aiomain_task_vim:
296 0                     self.aiomain_task_vim = asyncio.ensure_future(
297                         self.vim_watcher(), loop=self.loop
298                     )
299
300 0                 if not self.aiomain_task_renew_lock:
301 0                     self.aiomain_task_renew_lock = asyncio.ensure_future(
302                         self.lock_renew.renew_locks(), loop=self.loop
303                     )
304
305 0                 done, _ = await asyncio.wait(
306                     [
307                         self.aiomain_task_kafka,
308                         self.aiomain_task_vim,
309                         self.aiomain_task_renew_lock,
310                     ],
311                     timeout=None,
312                     loop=self.loop,
313                     return_when=asyncio.FIRST_COMPLETED,
314                 )
315
316 0                 try:
317 0                     if self.aiomain_task_kafka in done:
318 0                         exc = self.aiomain_task_kafka.exception()
319 0                         self.logger.error(
320                             "kafka subscription task exception: {}".format(exc)
321                         )
322 0                         self.aiomain_task_kafka = None
323
324 0                     if self.aiomain_task_vim in done:
325 0                         exc = self.aiomain_task_vim.exception()
326 0                         self.logger.error(
327                             "vim_account watcher task exception: {}".format(exc)
328                         )
329 0                         self.aiomain_task_vim = None
330
331 0                     if self.aiomain_task_renew_lock in done:
332 0                         exc = self.aiomain_task_renew_lock.exception()
333 0                         self.logger.error("renew_locks task exception: {}".format(exc))
334 0                         self.aiomain_task_renew_lock = None
335 0                 except asyncio.CancelledError:
336 0                     self.logger.exception("asyncio.CancelledError occured.")
337
338 0             except Exception as e:
339 0                 if self.to_terminate:
340 0                     return
341
342 0                 if kafka_working:
343                     # logging only first time
344 0                     self.logger.critical(
345                         "Error accessing kafka '{}'. Retrying ...".format(e)
346                     )
347 0                     kafka_working = False
348
349 0             await asyncio.sleep(10, loop=self.loop)
350
351 1     def run(self):
352         """
353         Start of the thread
354         :return: None
355         """
356 0         self.loop = asyncio.new_event_loop()
357 0         try:
358 0             if not self.db:
359 0                 if self.config["database"]["driver"] == "mongo":
360 0                     self.db = dbmongo.DbMongo()
361 0                     self.db.db_connect(self.config["database"])
362 0                 elif self.config["database"]["driver"] == "memory":
363 0                     self.db = dbmemory.DbMemory()
364 0                     self.db.db_connect(self.config["database"])
365                 else:
366 0                     raise VimAdminException(
367                         "Invalid configuration param '{}' at '[database]':'driver'".format(
368                             self.config["database"]["driver"]
369                         )
370                     )
371
372 0             self.lock_renew.start(self.db, self.loop)
373
374 0             if not self.msg:
375 0                 config_msg = self.config["message"].copy()
376 0                 config_msg["loop"] = self.loop
377
378 0                 if config_msg["driver"] == "local":
379 0                     self.msg = msglocal.MsgLocal()
380 0                     self.msg.connect(config_msg)
381 0                 elif config_msg["driver"] == "kafka":
382 0                     self.msg = msgkafka.MsgKafka()
383 0                     self.msg.connect(config_msg)
384                 else:
385 0                     raise VimAdminException(
386                         "Invalid configuration param '{}' at '[message]':'driver'".format(
387                             config_msg["driver"]
388                         )
389                     )
390 0         except (DbException, MsgException) as e:
391 0             raise VimAdminException(str(e), http_code=e.http_code)
392
393 0         self.logger.info("Starting")
394 0         while not self.to_terminate:
395 0             try:
396 0                 self.loop.run_until_complete(
397                     asyncio.ensure_future(self.aiomain(), loop=self.loop)
398                 )
399             # except asyncio.CancelledError:
400             #     break  # if cancelled it should end, breaking loop
401 0             except Exception as e:
402 0                 if not self.to_terminate:
403 0                     self.logger.exception(
404                         "Exception '{}' at messaging read loop".format(e), exc_info=True
405                     )
406
407 0         self.logger.info("Finishing")
408 0         self._stop()
409 0         self.loop.close()
410
411 1     async def _msg_callback(self, topic, command, params):
412         """
413         Callback to process a received message from kafka
414         :param topic:  topic received
415         :param command:  command received
416         :param params: rest of parameters
417         :return: None
418         """
419 0         try:
420 0             if command == "echo":
421 0                 return
422
423 0             if topic in self.kafka_topics:
424 0                 target = topic[0:3]  # vim, wim or sdn
425 0                 target_id = target + ":" + params["_id"]
426
427 0                 if command in ("edited", "edit"):
428 0                     self.engine.reload_vim(target_id)
429 0                     self.logger.debug("ordered to reload {}".format(target_id))
430 0                 elif command in ("deleted", "delete"):
431 0                     self.engine.unload_vim(target_id)
432 0                     self.logger.debug("ordered to unload {}".format(target_id))
433 0                 elif command in ("create", "created"):
434 0                     self.engine.check_vim(target_id)
435 0                     self.logger.debug("ordered to check {}".format(target_id))
436 0         except (DbException, MsgException) as e:
437 0             self.logger.error(
438                 "Error while processing topic={} command={}: {}".format(
439                     topic, command, e
440                 )
441             )
442 0         except Exception as e:
443 0             self.logger.exception(
444                 "Exception while processing topic={} command={}: {}".format(
445                     topic, command, e
446                 ),
447                 exc_info=True,
448             )
449
450 1     def _stop(self):
451         """
452         Close all connections
453         :return: None
454         """
455 0         try:
456 0             if self.db:
457 0                 self.db.db_disconnect()
458
459 0             if self.msg:
460 0                 self.msg.disconnect()
461 0         except (DbException, MsgException) as e:
462 0             raise VimAdminException(str(e), http_code=e.http_code)
463
464 1     def terminate(self):
465         """
466         This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
467         but not immediately.
468         :return: None
469         """
470 0         self.to_terminate = True
471 0         self.lock_renew.to_terminate = True
472
473 0         if self.aiomain_task_kafka:
474 0             self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
475
476 0         if self.aiomain_task_vim:
477 0             self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
478
479 0         if self.aiomain_task_renew_lock:
480 0             self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
481
482 0         self.lock_renew.stop()