17bfb20238d0a6984f5988d8dcd55b6af9d9aefc
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20 """
21
22 import logging
23 import threading
24 import asyncio
25 from http import HTTPStatus
26
27 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28 from osm_common.dbbase import DbException
29 from osm_common.msgbase import MsgException
30 from time import time
31
32 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 class VimAdminException(Exception):
36 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
37 self.http_code = http_code
38 Exception.__init__(self, message)
39
40
41 class LockRenew:
42
43 renew_list = []
44 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
45 # be renewed. The time order is achieved as it is appended at the end
46
47 def __init__(self, config, logger):
48 """
49 Constructor of class
50 :param config: configuration parameters of database and messaging
51 """
52 self.config = config
53 self.logger = logger
54 self.to_terminate = False
55 self.loop = None
56 self.db = None
57 self.task_locked_time = config["global"]["task_locked_time"]
58 self.task_relock_time = config["global"]["task_relock_time"]
59 self.task_max_locked_time = config["global"]["task_max_locked_time"]
60
61 def start(self, db, loop):
62 self.db = db
63 self.loop = loop
64
65 @staticmethod
66 def add_lock_object(database_table, database_object, thread_object):
67 """
68 Insert a task to renew the locking
69 :param database_table: database collection where to maintain the lock
70 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
71 :param thread_object: Thread object that has locked to check if it is alive
72 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
73 """
74 lock_object = {
75 "table": database_table,
76 "_id": database_object["_id"],
77 "initial_lock_time": database_object["locked_at"],
78 "locked_at": database_object["locked_at"],
79 "thread": thread_object,
80 "unlocked": False, # True when it is not needed any more
81 }
82 LockRenew.renew_list.append(lock_object)
83
84 return lock_object
85
86 @staticmethod
87 def remove_lock_object(lock_object):
88 lock_object["unlocked"] = True
89
90 async def renew_locks(self):
91 while not self.to_terminate:
92 if not self.renew_list:
93 await asyncio.sleep(
94 self.task_locked_time - self.task_relock_time, loop=self.loop
95 )
96 continue
97
98 lock_object = self.renew_list[0]
99
100 if (
101 lock_object["unlocked"]
102 or not lock_object["thread"]
103 or not lock_object["thread"].is_alive()
104 ):
105 # task has been finished or locker thread is dead, not needed to re-locked.
106 self.renew_list.pop(0)
107 continue
108
109 locked_at = lock_object["locked_at"]
110 now = time()
111 time_to_relock = (
112 locked_at + self.task_locked_time - self.task_relock_time - now
113 )
114
115 if time_to_relock < 1:
116 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
117 self.renew_list.pop(0)
118 # re-lock
119 new_locked_at = locked_at + self.task_locked_time
120
121 try:
122 if self.db.set_one(
123 lock_object["table"],
124 update_dict={
125 "locked_at": new_locked_at,
126 "modified_at": now,
127 },
128 q_filter={
129 "_id": lock_object["_id"],
130 "locked_at": locked_at,
131 },
132 fail_on_empty=False,
133 ):
134 self.logger.debug(
135 "Renew lock for {}.{}".format(
136 lock_object["table"], lock_object["_id"]
137 )
138 )
139 lock_object["locked_at"] = new_locked_at
140 self.renew_list.append(lock_object)
141 else:
142 self.logger.info(
143 "Cannot renew lock for {}.{}".format(
144 lock_object["table"], lock_object["_id"]
145 )
146 )
147 except Exception as e:
148 self.logger.error(
149 "Exception when trying to renew lock for {}.{}: {}".format(
150 lock_object["table"], lock_object["_id"], e
151 )
152 )
153 else:
154 # wait until it is time to re-lock it
155 await asyncio.sleep(time_to_relock, loop=self.loop)
156
157 def stop(self):
158 # unlock all locked items
159 now = time()
160
161 for lock_object in self.renew_list:
162 locked_at = lock_object["locked_at"]
163
164 if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
165 self.db.set_one(
166 lock_object["table"],
167 update_dict={"locked_at": 0},
168 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
169 fail_on_empty=False,
170 )
171
172
173 class VimAdminThread(threading.Thread):
174 MAX_TIME_UNATTENDED = 600 # 10min
175 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
176 kafka_topics = ("vim_account", "wim_account", "sdn")
177
178 def __init__(self, config, engine):
179 """
180 Constructor of class
181 :param config: configuration parameters of database and messaging
182 :param engine: an instance of Engine class, used for deleting instances
183 """
184 threading.Thread.__init__(self)
185 self.to_terminate = False
186 self.config = config
187 self.db = None
188 self.msg = None
189 self.engine = engine
190 self.loop = None
191 self.last_rotask_time = 0
192 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
193 self.logger = logging.getLogger("ro.vimadmin")
194 # asyncio task for receiving vim actions from kafka bus
195 self.aiomain_task_kafka = None
196 # asyncio task for watching ro_tasks not processed by nobody
197 self.aiomain_task_vim = None
198 self.aiomain_task_renew_lock = None
199 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
200 self.lock_renew = LockRenew(config, self.logger)
201 self.task_locked_time = config["global"]["task_locked_time"]
202
203 async def vim_watcher(self):
204 """Reads database periodically looking for tasks not processed by nobody because of a reboot
205 in order to load this vim"""
206 # firstly read VIMS not processed
207 for target_database in ("vim_accounts", "wim_accounts", "sdns"):
208 unattended_targets = self.db.get_list(
209 target_database,
210 q_filter={"_admin.operations.operationState": "PROCESSING"},
211 )
212
213 for target in unattended_targets:
214 target_id = "{}:{}".format(target_database[:3], target["_id"])
215 self.logger.info("ordered to check {}".format(target_id))
216 self.engine.check_vim(target_id)
217
218 while not self.to_terminate:
219 now = time()
220 processed_vims = []
221
222 if not self.last_rotask_time:
223 self.last_rotask_time = 0
224
225 ro_tasks = self.db.get_list(
226 "ro_tasks",
227 q_filter={
228 "target_id.ncont": self.engine.get_assigned_vims(),
229 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
230 "locked_at.lt": now - self.task_locked_time,
231 "to_check_at.gt": self.last_rotask_time,
232 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED,
233 },
234 )
235 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
236
237 for ro_task in ro_tasks:
238 # if already checked ignore
239 if ro_task["target_id"] in processed_vims:
240 continue
241
242 processed_vims.append(ro_task["target_id"])
243
244 # if already assigned ignore
245 if ro_task["target_id"] in self.engine.get_assigned_vims():
246 continue
247
248 # if there is some task locked on this VIM, there is an RO working on it, so ignore
249 if self.db.get_list(
250 "ro_tasks",
251 q_filter={
252 "target_id": ro_task["target_id"],
253 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
254 "locked_at.gt": now - self.task_locked_time,
255 },
256 ):
257 continue
258
259 # unattended, assign vim
260 self.engine.assign_vim(ro_task["target_id"])
261 self.logger.debug(
262 "ordered to load {}. Inactivity detected".format(
263 ro_task["target_id"]
264 )
265 )
266
267 # every 2 hours check if there are vims without any ro_task and unload it
268 if now > self.next_check_unused_vim:
269 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
270 self.engine.unload_unused_vims()
271
272 await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
273
274 async def aiomain(self):
275 kafka_working = True
276 while not self.to_terminate:
277 try:
278 if not self.aiomain_task_kafka:
279 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
280 await self.msg.aiowrite(
281 "vim_account", "echo", "dummy message", loop=self.loop
282 )
283 kafka_working = True
284 self.logger.debug("Starting vim_account subscription task")
285 self.aiomain_task_kafka = asyncio.ensure_future(
286 self.msg.aioread(
287 self.kafka_topics,
288 loop=self.loop,
289 group_id=False,
290 aiocallback=self._msg_callback,
291 ),
292 loop=self.loop,
293 )
294
295 if not self.aiomain_task_vim:
296 self.aiomain_task_vim = asyncio.ensure_future(
297 self.vim_watcher(), loop=self.loop
298 )
299
300 if not self.aiomain_task_renew_lock:
301 self.aiomain_task_renew_lock = asyncio.ensure_future(
302 self.lock_renew.renew_locks(), loop=self.loop
303 )
304
305 done, _ = await asyncio.wait(
306 [
307 self.aiomain_task_kafka,
308 self.aiomain_task_vim,
309 self.aiomain_task_renew_lock,
310 ],
311 timeout=None,
312 loop=self.loop,
313 return_when=asyncio.FIRST_COMPLETED,
314 )
315
316 try:
317 if self.aiomain_task_kafka in done:
318 exc = self.aiomain_task_kafka.exception()
319 self.logger.error(
320 "kafka subscription task exception: {}".format(exc)
321 )
322 self.aiomain_task_kafka = None
323
324 if self.aiomain_task_vim in done:
325 exc = self.aiomain_task_vim.exception()
326 self.logger.error(
327 "vim_account watcher task exception: {}".format(exc)
328 )
329 self.aiomain_task_vim = None
330
331 if self.aiomain_task_renew_lock in done:
332 exc = self.aiomain_task_renew_lock.exception()
333 self.logger.error("renew_locks task exception: {}".format(exc))
334 self.aiomain_task_renew_lock = None
335 except asyncio.CancelledError:
336 pass
337
338 except Exception as e:
339 if self.to_terminate:
340 return
341
342 if kafka_working:
343 # logging only first time
344 self.logger.critical(
345 "Error accessing kafka '{}'. Retrying ...".format(e)
346 )
347 kafka_working = False
348
349 await asyncio.sleep(10, loop=self.loop)
350
351 def run(self):
352 """
353 Start of the thread
354 :return: None
355 """
356 self.loop = asyncio.new_event_loop()
357 try:
358 if not self.db:
359 if self.config["database"]["driver"] == "mongo":
360 self.db = dbmongo.DbMongo()
361 self.db.db_connect(self.config["database"])
362 elif self.config["database"]["driver"] == "memory":
363 self.db = dbmemory.DbMemory()
364 self.db.db_connect(self.config["database"])
365 else:
366 raise VimAdminException(
367 "Invalid configuration param '{}' at '[database]':'driver'".format(
368 self.config["database"]["driver"]
369 )
370 )
371
372 self.lock_renew.start(self.db, self.loop)
373
374 if not self.msg:
375 config_msg = self.config["message"].copy()
376 config_msg["loop"] = self.loop
377
378 if config_msg["driver"] == "local":
379 self.msg = msglocal.MsgLocal()
380 self.msg.connect(config_msg)
381 elif config_msg["driver"] == "kafka":
382 self.msg = msgkafka.MsgKafka()
383 self.msg.connect(config_msg)
384 else:
385 raise VimAdminException(
386 "Invalid configuration param '{}' at '[message]':'driver'".format(
387 config_msg["driver"]
388 )
389 )
390 except (DbException, MsgException) as e:
391 raise VimAdminException(str(e), http_code=e.http_code)
392
393 self.logger.info("Starting")
394 while not self.to_terminate:
395 try:
396 self.loop.run_until_complete(
397 asyncio.ensure_future(self.aiomain(), loop=self.loop)
398 )
399 # except asyncio.CancelledError:
400 # break # if cancelled it should end, breaking loop
401 except Exception as e:
402 if not self.to_terminate:
403 self.logger.exception(
404 "Exception '{}' at messaging read loop".format(e), exc_info=True
405 )
406
407 self.logger.info("Finishing")
408 self._stop()
409 self.loop.close()
410
411 async def _msg_callback(self, topic, command, params):
412 """
413 Callback to process a received message from kafka
414 :param topic: topic received
415 :param command: command received
416 :param params: rest of parameters
417 :return: None
418 """
419 try:
420 if command == "echo":
421 return
422
423 if topic in self.kafka_topics:
424 target = topic[0:3] # vim, wim or sdn
425 target_id = target + ":" + params["_id"]
426
427 if command in ("edited", "edit"):
428 self.engine.reload_vim(target_id)
429 self.logger.debug("ordered to reload {}".format(target_id))
430 elif command in ("deleted", "delete"):
431 self.engine.unload_vim(target_id)
432 self.logger.debug("ordered to unload {}".format(target_id))
433 elif command in ("create", "created"):
434 self.engine.check_vim(target_id)
435 self.logger.debug("ordered to check {}".format(target_id))
436 except (DbException, MsgException) as e:
437 self.logger.error(
438 "Error while processing topic={} command={}: {}".format(
439 topic, command, e
440 )
441 )
442 except Exception as e:
443 self.logger.exception(
444 "Exception while processing topic={} command={}: {}".format(
445 topic, command, e
446 ),
447 exc_info=True,
448 )
449
450 def _stop(self):
451 """
452 Close all connections
453 :return: None
454 """
455 try:
456 if self.db:
457 self.db.db_disconnect()
458
459 if self.msg:
460 self.msg.disconnect()
461 except (DbException, MsgException) as e:
462 raise VimAdminException(str(e), http_code=e.http_code)
463
464 def terminate(self):
465 """
466 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
467 but not immediately.
468 :return: None
469 """
470 self.to_terminate = True
471 self.lock_renew.to_terminate = True
472
473 if self.aiomain_task_kafka:
474 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
475
476 if self.aiomain_task_vim:
477 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
478
479 if self.aiomain_task_renew_lock:
480 self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
481
482 self.lock_renew.stop()