1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
1 |
""" |
17 |
|
This module implements a thread that reads from kafka bus reading VIM messages. |
18 |
|
It is based on asyncio. |
19 |
|
It is in charge of load tasks assigned to VIMs that nobody is in chage of it |
20 |
|
""" |
21 |
|
|
22 |
1 |
import asyncio |
23 |
1 |
from http import HTTPStatus |
24 |
1 |
import logging |
25 |
1 |
import threading |
26 |
1 |
from time import time |
27 |
|
|
28 |
1 |
from osm_common import dbmemory, dbmongo, msgkafka, msglocal |
29 |
1 |
from osm_common.dbbase import DbException |
30 |
1 |
from osm_common.msgbase import MsgException |
31 |
|
|
32 |
1 |
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
33 |
|
|
34 |
|
|
35 |
1 |
class VimAdminException(Exception): |
36 |
1 |
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
37 |
0 |
self.http_code = http_code |
38 |
0 |
Exception.__init__(self, message) |
39 |
|
|
40 |
|
|
41 |
1 |
class LockRenew: |
42 |
1 |
renew_list = [] |
43 |
|
# ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to |
44 |
|
# be renewed. The time order is achieved as it is appended at the end |
45 |
|
|
46 |
1 |
def __init__(self, config, logger): |
47 |
|
""" |
48 |
|
Constructor of class |
49 |
|
:param config: configuration parameters of database and messaging |
50 |
|
""" |
51 |
0 |
self.config = config |
52 |
0 |
self.logger = logger |
53 |
0 |
self.to_terminate = False |
54 |
0 |
self.loop = None |
55 |
0 |
self.db = None |
56 |
0 |
self.task_locked_time = config["global"]["task_locked_time"] |
57 |
0 |
self.task_relock_time = config["global"]["task_relock_time"] |
58 |
0 |
self.task_max_locked_time = config["global"]["task_max_locked_time"] |
59 |
|
|
60 |
1 |
def start(self, db, loop): |
61 |
0 |
self.db = db |
62 |
0 |
self.loop = loop |
63 |
|
|
64 |
1 |
@staticmethod |
65 |
1 |
def add_lock_object(database_table, database_object, thread_object): |
66 |
|
""" |
67 |
|
Insert a task to renew the locking |
68 |
|
:param database_table: database collection where to maintain the lock |
69 |
|
:param database_object: database object. '_id' and 'locked_at' are mandatory keys |
70 |
|
:param thread_object: Thread object that has locked to check if it is alive |
71 |
|
:return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at' |
72 |
|
""" |
73 |
1 |
lock_object = { |
74 |
|
"table": database_table, |
75 |
|
"_id": database_object["_id"], |
76 |
|
"initial_lock_time": database_object["locked_at"], |
77 |
|
"locked_at": database_object["locked_at"], |
78 |
|
"thread": thread_object, |
79 |
|
"unlocked": False, # True when it is not needed any more |
80 |
|
} |
81 |
1 |
LockRenew.renew_list.append(lock_object) |
82 |
|
|
83 |
1 |
return lock_object |
84 |
|
|
85 |
1 |
@staticmethod |
86 |
1 |
def remove_lock_object(lock_object): |
87 |
1 |
lock_object["unlocked"] = True |
88 |
|
|
89 |
1 |
async def renew_locks(self): |
90 |
0 |
while not self.to_terminate: |
91 |
0 |
if not self.renew_list: |
92 |
0 |
await asyncio.sleep( |
93 |
|
self.task_locked_time - self.task_relock_time, loop=self.loop |
94 |
|
) |
95 |
0 |
continue |
96 |
|
|
97 |
0 |
lock_object = self.renew_list[0] |
98 |
|
|
99 |
0 |
if ( |
100 |
|
lock_object["unlocked"] |
101 |
|
or not lock_object["thread"] |
102 |
|
or not lock_object["thread"].is_alive() |
103 |
|
): |
104 |
|
# task has been finished or locker thread is dead, not needed to re-locked. |
105 |
0 |
self.renew_list.pop(0) |
106 |
0 |
continue |
107 |
|
|
108 |
0 |
locked_at = lock_object["locked_at"] |
109 |
0 |
now = time() |
110 |
0 |
time_to_relock = ( |
111 |
|
locked_at + self.task_locked_time - self.task_relock_time - now |
112 |
|
) |
113 |
|
|
114 |
0 |
if time_to_relock < 1: |
115 |
0 |
if lock_object["initial_lock_time"] + self.task_max_locked_time < now: |
116 |
0 |
self.renew_list.pop(0) |
117 |
|
# re-lock |
118 |
0 |
new_locked_at = locked_at + self.task_locked_time |
119 |
|
|
120 |
0 |
try: |
121 |
0 |
if self.db.set_one( |
122 |
|
lock_object["table"], |
123 |
|
update_dict={ |
124 |
|
"locked_at": new_locked_at, |
125 |
|
"modified_at": now, |
126 |
|
}, |
127 |
|
q_filter={ |
128 |
|
"_id": lock_object["_id"], |
129 |
|
"locked_at": locked_at, |
130 |
|
}, |
131 |
|
fail_on_empty=False, |
132 |
|
): |
133 |
0 |
self.logger.debug( |
134 |
|
"Renew lock for {}.{}".format( |
135 |
|
lock_object["table"], lock_object["_id"] |
136 |
|
) |
137 |
|
) |
138 |
0 |
lock_object["locked_at"] = new_locked_at |
139 |
0 |
self.renew_list.append(lock_object) |
140 |
|
else: |
141 |
0 |
self.logger.info( |
142 |
|
"Cannot renew lock for {}.{}".format( |
143 |
|
lock_object["table"], lock_object["_id"] |
144 |
|
) |
145 |
|
) |
146 |
0 |
except Exception as e: |
147 |
0 |
self.logger.error( |
148 |
|
"Exception when trying to renew lock for {}.{}: {}".format( |
149 |
|
lock_object["table"], lock_object["_id"], e |
150 |
|
) |
151 |
|
) |
152 |
|
else: |
153 |
|
# wait until it is time to re-lock it |
154 |
0 |
await asyncio.sleep(time_to_relock, loop=self.loop) |
155 |
|
|
156 |
1 |
def stop(self): |
157 |
|
# unlock all locked items |
158 |
0 |
now = time() |
159 |
|
|
160 |
0 |
for lock_object in self.renew_list: |
161 |
0 |
locked_at = lock_object["locked_at"] |
162 |
|
|
163 |
0 |
if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now: |
164 |
0 |
self.db.set_one( |
165 |
|
lock_object["table"], |
166 |
|
update_dict={"locked_at": 0}, |
167 |
|
q_filter={"_id": lock_object["_id"], "locked_at": locked_at}, |
168 |
|
fail_on_empty=False, |
169 |
|
) |
170 |
|
|
171 |
|
|
172 |
1 |
class VimAdminThread(threading.Thread): |
173 |
1 |
MAX_TIME_UNATTENDED = 600 # 10min |
174 |
1 |
TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h |
175 |
1 |
kafka_topics = ("vim_account", "wim_account", "sdn") |
176 |
|
|
177 |
1 |
def __init__(self, config, engine): |
178 |
|
""" |
179 |
|
Constructor of class |
180 |
|
:param config: configuration parameters of database and messaging |
181 |
|
:param engine: an instance of Engine class, used for deleting instances |
182 |
|
""" |
183 |
0 |
threading.Thread.__init__(self) |
184 |
0 |
self.to_terminate = False |
185 |
0 |
self.config = config |
186 |
0 |
self.db = None |
187 |
0 |
self.msg = None |
188 |
0 |
self.engine = engine |
189 |
0 |
self.loop = None |
190 |
0 |
self.last_rotask_time = 0 |
191 |
0 |
self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM |
192 |
0 |
self.logger = logging.getLogger("ro.vimadmin") |
193 |
|
# asyncio task for receiving vim actions from kafka bus |
194 |
0 |
self.aiomain_task_kafka = None |
195 |
|
# asyncio task for watching ro_tasks not processed by nobody |
196 |
0 |
self.aiomain_task_vim = None |
197 |
0 |
self.aiomain_task_renew_lock = None |
198 |
|
# ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order |
199 |
0 |
self.lock_renew = LockRenew(config, self.logger) |
200 |
0 |
self.task_locked_time = config["global"]["task_locked_time"] |
201 |
|
|
202 |
1 |
async def vim_watcher(self): |
203 |
|
"""Reads database periodically looking for tasks not processed by nobody because of a reboot |
204 |
|
in order to load this vim""" |
205 |
|
# firstly read VIMS not processed |
206 |
0 |
for target_database in ("vim_accounts", "wim_accounts", "sdns"): |
207 |
0 |
unattended_targets = self.db.get_list( |
208 |
|
target_database, |
209 |
|
q_filter={"_admin.operations.operationState": "PROCESSING"}, |
210 |
|
) |
211 |
|
|
212 |
0 |
for target in unattended_targets: |
213 |
0 |
target_id = "{}:{}".format(target_database[:3], target["_id"]) |
214 |
0 |
self.logger.info("ordered to check {}".format(target_id)) |
215 |
0 |
self.engine.check_vim(target_id) |
216 |
|
|
217 |
0 |
while not self.to_terminate: |
218 |
0 |
now = time() |
219 |
0 |
processed_vims = [] |
220 |
|
|
221 |
0 |
if not self.last_rotask_time: |
222 |
0 |
self.last_rotask_time = 0 |
223 |
|
|
224 |
0 |
ro_tasks = self.db.get_list( |
225 |
|
"ro_tasks", |
226 |
|
q_filter={ |
227 |
|
"target_id.ncont": self.engine.get_assigned_vims(), |
228 |
|
"tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], |
229 |
|
"locked_at.lt": now - self.task_locked_time, |
230 |
|
"to_check_at.gt": self.last_rotask_time, |
231 |
|
"to_check_at.lte": now - self.MAX_TIME_UNATTENDED, |
232 |
|
}, |
233 |
|
) |
234 |
0 |
self.last_rotask_time = now - self.MAX_TIME_UNATTENDED |
235 |
|
|
236 |
0 |
for ro_task in ro_tasks: |
237 |
|
# if already checked ignore |
238 |
0 |
if ro_task["target_id"] in processed_vims: |
239 |
0 |
continue |
240 |
|
|
241 |
0 |
processed_vims.append(ro_task["target_id"]) |
242 |
|
|
243 |
|
# if already assigned ignore |
244 |
0 |
if ro_task["target_id"] in self.engine.get_assigned_vims(): |
245 |
0 |
continue |
246 |
|
|
247 |
|
# if there is some task locked on this VIM, there is an RO working on it, so ignore |
248 |
0 |
if self.db.get_list( |
249 |
|
"ro_tasks", |
250 |
|
q_filter={ |
251 |
|
"target_id": ro_task["target_id"], |
252 |
|
"tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], |
253 |
|
"locked_at.gt": now - self.task_locked_time, |
254 |
|
}, |
255 |
|
): |
256 |
0 |
continue |
257 |
|
|
258 |
|
# unattended, assign vim |
259 |
0 |
self.engine.assign_vim(ro_task["target_id"]) |
260 |
0 |
self.logger.debug( |
261 |
|
"ordered to load {}. Inactivity detected".format( |
262 |
|
ro_task["target_id"] |
263 |
|
) |
264 |
|
) |
265 |
|
|
266 |
|
# every 2 hours check if there are vims without any ro_task and unload it |
267 |
0 |
if now > self.next_check_unused_vim: |
268 |
0 |
self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM |
269 |
0 |
self.engine.unload_unused_vims() |
270 |
|
|
271 |
0 |
await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop) |
272 |
|
|
273 |
1 |
async def aiomain(self): |
274 |
0 |
kafka_working = True |
275 |
0 |
while not self.to_terminate: |
276 |
0 |
try: |
277 |
0 |
if not self.aiomain_task_kafka: |
278 |
|
# await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop) |
279 |
0 |
for kafka_topic in self.kafka_topics: |
280 |
0 |
await self.msg.aiowrite( |
281 |
|
kafka_topic, "echo", "dummy message", loop=self.loop |
282 |
|
) |
283 |
0 |
kafka_working = True |
284 |
0 |
self.logger.debug("Starting vim_account subscription task") |
285 |
0 |
self.aiomain_task_kafka = asyncio.ensure_future( |
286 |
|
self.msg.aioread( |
287 |
|
self.kafka_topics, |
288 |
|
loop=self.loop, |
289 |
|
group_id=False, |
290 |
|
aiocallback=self._msg_callback, |
291 |
|
), |
292 |
|
loop=self.loop, |
293 |
|
) |
294 |
|
|
295 |
0 |
if not self.aiomain_task_vim: |
296 |
0 |
self.aiomain_task_vim = asyncio.ensure_future( |
297 |
|
self.vim_watcher(), loop=self.loop |
298 |
|
) |
299 |
|
|
300 |
0 |
if not self.aiomain_task_renew_lock: |
301 |
0 |
self.aiomain_task_renew_lock = asyncio.ensure_future( |
302 |
|
self.lock_renew.renew_locks(), loop=self.loop |
303 |
|
) |
304 |
|
|
305 |
0 |
done, _ = await asyncio.wait( |
306 |
|
[ |
307 |
|
self.aiomain_task_kafka, |
308 |
|
self.aiomain_task_vim, |
309 |
|
self.aiomain_task_renew_lock, |
310 |
|
], |
311 |
|
timeout=None, |
312 |
|
loop=self.loop, |
313 |
|
return_when=asyncio.FIRST_COMPLETED, |
314 |
|
) |
315 |
|
|
316 |
0 |
try: |
317 |
0 |
if self.aiomain_task_kafka in done: |
318 |
0 |
exc = self.aiomain_task_kafka.exception() |
319 |
0 |
self.logger.error( |
320 |
|
"kafka subscription task exception: {}".format(exc) |
321 |
|
) |
322 |
0 |
self.aiomain_task_kafka = None |
323 |
|
|
324 |
0 |
if self.aiomain_task_vim in done: |
325 |
0 |
exc = self.aiomain_task_vim.exception() |
326 |
0 |
self.logger.error( |
327 |
|
"vim_account watcher task exception: {}".format(exc) |
328 |
|
) |
329 |
0 |
self.aiomain_task_vim = None |
330 |
|
|
331 |
0 |
if self.aiomain_task_renew_lock in done: |
332 |
0 |
exc = self.aiomain_task_renew_lock.exception() |
333 |
0 |
self.logger.error("renew_locks task exception: {}".format(exc)) |
334 |
0 |
self.aiomain_task_renew_lock = None |
335 |
0 |
except asyncio.CancelledError: |
336 |
0 |
self.logger.exception("asyncio.CancelledError occured.") |
337 |
|
|
338 |
0 |
except Exception as e: |
339 |
0 |
if self.to_terminate: |
340 |
0 |
return |
341 |
|
|
342 |
0 |
if kafka_working: |
343 |
|
# logging only first time |
344 |
0 |
self.logger.critical( |
345 |
|
"Error accessing kafka '{}'. Retrying ...".format(e) |
346 |
|
) |
347 |
0 |
kafka_working = False |
348 |
|
|
349 |
0 |
await asyncio.sleep(10, loop=self.loop) |
350 |
|
|
351 |
1 |
def run(self): |
352 |
|
""" |
353 |
|
Start of the thread |
354 |
|
:return: None |
355 |
|
""" |
356 |
0 |
self.loop = asyncio.new_event_loop() |
357 |
0 |
try: |
358 |
0 |
if not self.db: |
359 |
0 |
if self.config["database"]["driver"] == "mongo": |
360 |
0 |
self.db = dbmongo.DbMongo() |
361 |
0 |
self.db.db_connect(self.config["database"]) |
362 |
0 |
elif self.config["database"]["driver"] == "memory": |
363 |
0 |
self.db = dbmemory.DbMemory() |
364 |
0 |
self.db.db_connect(self.config["database"]) |
365 |
|
else: |
366 |
0 |
raise VimAdminException( |
367 |
|
"Invalid configuration param '{}' at '[database]':'driver'".format( |
368 |
|
self.config["database"]["driver"] |
369 |
|
) |
370 |
|
) |
371 |
|
|
372 |
0 |
self.lock_renew.start(self.db, self.loop) |
373 |
|
|
374 |
0 |
if not self.msg: |
375 |
0 |
config_msg = self.config["message"].copy() |
376 |
0 |
config_msg["loop"] = self.loop |
377 |
|
|
378 |
0 |
if config_msg["driver"] == "local": |
379 |
0 |
self.msg = msglocal.MsgLocal() |
380 |
0 |
self.msg.connect(config_msg) |
381 |
0 |
elif config_msg["driver"] == "kafka": |
382 |
0 |
self.msg = msgkafka.MsgKafka() |
383 |
0 |
self.msg.connect(config_msg) |
384 |
|
else: |
385 |
0 |
raise VimAdminException( |
386 |
|
"Invalid configuration param '{}' at '[message]':'driver'".format( |
387 |
|
config_msg["driver"] |
388 |
|
) |
389 |
|
) |
390 |
0 |
except (DbException, MsgException) as e: |
391 |
0 |
raise VimAdminException(str(e), http_code=e.http_code) |
392 |
|
|
393 |
0 |
self.logger.info("Starting") |
394 |
0 |
while not self.to_terminate: |
395 |
0 |
try: |
396 |
0 |
self.loop.run_until_complete( |
397 |
|
asyncio.ensure_future(self.aiomain(), loop=self.loop) |
398 |
|
) |
399 |
|
# except asyncio.CancelledError: |
400 |
|
# break # if cancelled it should end, breaking loop |
401 |
0 |
except Exception as e: |
402 |
0 |
if not self.to_terminate: |
403 |
0 |
self.logger.exception( |
404 |
|
"Exception '{}' at messaging read loop".format(e), exc_info=True |
405 |
|
) |
406 |
|
|
407 |
0 |
self.logger.info("Finishing") |
408 |
0 |
self._stop() |
409 |
0 |
self.loop.close() |
410 |
|
|
411 |
1 |
async def _msg_callback(self, topic, command, params): |
412 |
|
""" |
413 |
|
Callback to process a received message from kafka |
414 |
|
:param topic: topic received |
415 |
|
:param command: command received |
416 |
|
:param params: rest of parameters |
417 |
|
:return: None |
418 |
|
""" |
419 |
0 |
try: |
420 |
0 |
if command == "echo": |
421 |
0 |
return |
422 |
|
|
423 |
0 |
if topic in self.kafka_topics: |
424 |
0 |
target = topic[0:3] # vim, wim or sdn |
425 |
0 |
target_id = target + ":" + params["_id"] |
426 |
|
|
427 |
0 |
if command in ("edited", "edit"): |
428 |
0 |
self.engine.reload_vim(target_id) |
429 |
0 |
self.logger.debug("ordered to reload {}".format(target_id)) |
430 |
0 |
elif command in ("deleted", "delete"): |
431 |
0 |
self.engine.unload_vim(target_id) |
432 |
0 |
self.logger.debug("ordered to unload {}".format(target_id)) |
433 |
0 |
elif command in ("create", "created"): |
434 |
0 |
self.engine.check_vim(target_id) |
435 |
0 |
self.logger.debug("ordered to check {}".format(target_id)) |
436 |
0 |
except (DbException, MsgException) as e: |
437 |
0 |
self.logger.error( |
438 |
|
"Error while processing topic={} command={}: {}".format( |
439 |
|
topic, command, e |
440 |
|
) |
441 |
|
) |
442 |
0 |
except Exception as e: |
443 |
0 |
self.logger.exception( |
444 |
|
"Exception while processing topic={} command={}: {}".format( |
445 |
|
topic, command, e |
446 |
|
), |
447 |
|
exc_info=True, |
448 |
|
) |
449 |
|
|
450 |
1 |
def _stop(self): |
451 |
|
""" |
452 |
|
Close all connections |
453 |
|
:return: None |
454 |
|
""" |
455 |
0 |
try: |
456 |
0 |
if self.db: |
457 |
0 |
self.db.db_disconnect() |
458 |
|
|
459 |
0 |
if self.msg: |
460 |
0 |
self.msg.disconnect() |
461 |
0 |
except (DbException, MsgException) as e: |
462 |
0 |
raise VimAdminException(str(e), http_code=e.http_code) |
463 |
|
|
464 |
1 |
def terminate(self): |
465 |
|
""" |
466 |
|
This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
467 |
|
but not immediately. |
468 |
|
:return: None |
469 |
|
""" |
470 |
0 |
self.to_terminate = True |
471 |
0 |
self.lock_renew.to_terminate = True |
472 |
|
|
473 |
0 |
if self.aiomain_task_kafka: |
474 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel) |
475 |
|
|
476 |
0 |
if self.aiomain_task_vim: |
477 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel) |
478 |
|
|
479 |
0 |
if self.aiomain_task_renew_lock: |
480 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel) |
481 |
|
|
482 |
0 |
self.lock_renew.stop() |