1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
1 |
""" |
17 |
|
This module implements a thread that reads from kafka bus reading VIM messages. |
18 |
|
It is based on asyncio. |
19 |
|
It is in charge of load tasks assigned to VIMs that nobody is in chage of it |
20 |
|
""" |
21 |
|
|
22 |
1 |
import asyncio |
23 |
1 |
from http import HTTPStatus |
24 |
1 |
import logging |
25 |
1 |
import threading |
26 |
1 |
from time import time |
27 |
|
|
28 |
1 |
from osm_common import dbmemory, dbmongo, msgkafka, msglocal |
29 |
1 |
from osm_common.dbbase import DbException |
30 |
1 |
from osm_common.msgbase import MsgException |
31 |
|
|
32 |
1 |
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
33 |
|
|
34 |
|
|
35 |
1 |
class VimAdminException(Exception): |
36 |
1 |
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
37 |
0 |
self.http_code = http_code |
38 |
0 |
Exception.__init__(self, message) |
39 |
|
|
40 |
|
|
41 |
1 |
class LockRenew: |
42 |
1 |
renew_list = [] |
43 |
|
# ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to |
44 |
|
# be renewed. The time order is achieved as it is appended at the end |
45 |
|
|
46 |
1 |
def __init__(self, config, logger): |
47 |
|
""" |
48 |
|
Constructor of class |
49 |
|
:param config: configuration parameters of database and messaging |
50 |
|
""" |
51 |
0 |
self.config = config |
52 |
0 |
self.logger = logger |
53 |
0 |
self.to_terminate = False |
54 |
0 |
self.db = None |
55 |
0 |
self.task_locked_time = config["global"]["task_locked_time"] |
56 |
0 |
self.task_relock_time = config["global"]["task_relock_time"] |
57 |
0 |
self.task_max_locked_time = config["global"]["task_max_locked_time"] |
58 |
|
|
59 |
1 |
def start(self, db): |
60 |
0 |
self.db = db |
61 |
|
|
62 |
1 |
@staticmethod |
63 |
1 |
def add_lock_object(database_table, database_object, thread_object): |
64 |
|
""" |
65 |
|
Insert a task to renew the locking |
66 |
|
:param database_table: database collection where to maintain the lock |
67 |
|
:param database_object: database object. '_id' and 'locked_at' are mandatory keys |
68 |
|
:param thread_object: Thread object that has locked to check if it is alive |
69 |
|
:return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at' |
70 |
|
""" |
71 |
1 |
lock_object = { |
72 |
|
"table": database_table, |
73 |
|
"_id": database_object["_id"], |
74 |
|
"initial_lock_time": database_object["locked_at"], |
75 |
|
"locked_at": database_object["locked_at"], |
76 |
|
"thread": thread_object, |
77 |
|
"unlocked": False, # True when it is not needed any more |
78 |
|
} |
79 |
1 |
LockRenew.renew_list.append(lock_object) |
80 |
|
|
81 |
1 |
return lock_object |
82 |
|
|
83 |
1 |
@staticmethod |
84 |
1 |
def remove_lock_object(lock_object): |
85 |
1 |
lock_object["unlocked"] = True |
86 |
|
|
87 |
1 |
async def renew_locks(self): |
88 |
0 |
while not self.to_terminate: |
89 |
0 |
if not self.renew_list: |
90 |
0 |
await asyncio.sleep(self.task_locked_time - self.task_relock_time) |
91 |
0 |
continue |
92 |
|
|
93 |
0 |
lock_object = self.renew_list[0] |
94 |
|
|
95 |
0 |
if ( |
96 |
|
lock_object["unlocked"] |
97 |
|
or not lock_object["thread"] |
98 |
|
or not lock_object["thread"].is_alive() |
99 |
|
): |
100 |
|
# task has been finished or locker thread is dead, not needed to re-locked. |
101 |
0 |
self.renew_list.pop(0) |
102 |
0 |
continue |
103 |
|
|
104 |
0 |
locked_at = lock_object["locked_at"] |
105 |
0 |
now = time() |
106 |
0 |
time_to_relock = ( |
107 |
|
locked_at + self.task_locked_time - self.task_relock_time - now |
108 |
|
) |
109 |
|
|
110 |
0 |
if time_to_relock < 1: |
111 |
0 |
if lock_object["initial_lock_time"] + self.task_max_locked_time < now: |
112 |
0 |
self.renew_list.pop(0) |
113 |
|
# re-lock |
114 |
0 |
new_locked_at = locked_at + self.task_locked_time |
115 |
|
|
116 |
0 |
try: |
117 |
0 |
if self.db.set_one( |
118 |
|
lock_object["table"], |
119 |
|
update_dict={ |
120 |
|
"locked_at": new_locked_at, |
121 |
|
"modified_at": now, |
122 |
|
}, |
123 |
|
q_filter={ |
124 |
|
"_id": lock_object["_id"], |
125 |
|
"locked_at": locked_at, |
126 |
|
}, |
127 |
|
fail_on_empty=False, |
128 |
|
): |
129 |
0 |
self.logger.debug( |
130 |
|
"Renew lock for {}.{}".format( |
131 |
|
lock_object["table"], lock_object["_id"] |
132 |
|
) |
133 |
|
) |
134 |
0 |
lock_object["locked_at"] = new_locked_at |
135 |
0 |
self.renew_list.append(lock_object) |
136 |
|
else: |
137 |
0 |
self.logger.info( |
138 |
|
"Cannot renew lock for {}.{}".format( |
139 |
|
lock_object["table"], lock_object["_id"] |
140 |
|
) |
141 |
|
) |
142 |
0 |
except Exception as e: |
143 |
0 |
self.logger.error( |
144 |
|
"Exception when trying to renew lock for {}.{}: {}".format( |
145 |
|
lock_object["table"], lock_object["_id"], e |
146 |
|
) |
147 |
|
) |
148 |
|
else: |
149 |
|
# wait until it is time to re-lock it |
150 |
0 |
await asyncio.sleep(time_to_relock) |
151 |
|
|
152 |
1 |
def stop(self): |
153 |
|
# unlock all locked items |
154 |
0 |
now = time() |
155 |
|
|
156 |
0 |
for lock_object in self.renew_list: |
157 |
0 |
locked_at = lock_object["locked_at"] |
158 |
|
|
159 |
0 |
if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now: |
160 |
0 |
self.db.set_one( |
161 |
|
lock_object["table"], |
162 |
|
update_dict={"locked_at": 0}, |
163 |
|
q_filter={"_id": lock_object["_id"], "locked_at": locked_at}, |
164 |
|
fail_on_empty=False, |
165 |
|
) |
166 |
|
|
167 |
|
|
168 |
1 |
class VimAdminThread(threading.Thread): |
169 |
1 |
MAX_TIME_UNATTENDED = 600 # 10min |
170 |
1 |
TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h |
171 |
1 |
kafka_topics = ("vim_account", "wim_account", "sdn") |
172 |
|
|
173 |
1 |
def __init__(self, config, engine): |
174 |
|
""" |
175 |
|
Constructor of class |
176 |
|
:param config: configuration parameters of database and messaging |
177 |
|
:param engine: an instance of Engine class, used for deleting instances |
178 |
|
""" |
179 |
0 |
threading.Thread.__init__(self) |
180 |
0 |
self.to_terminate = False |
181 |
0 |
self.config = config |
182 |
0 |
self.db = None |
183 |
0 |
self.msg = None |
184 |
0 |
self.engine = engine |
185 |
0 |
self.loop = None |
186 |
0 |
self.last_rotask_time = 0 |
187 |
0 |
self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM |
188 |
0 |
self.logger = logging.getLogger("ro.vimadmin") |
189 |
|
# asyncio task for receiving vim actions from kafka bus |
190 |
0 |
self.aiomain_task_kafka = None |
191 |
|
# asyncio task for watching ro_tasks not processed by nobody |
192 |
0 |
self.aiomain_task_vim = None |
193 |
0 |
self.aiomain_task_renew_lock = None |
194 |
|
# ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order |
195 |
0 |
self.lock_renew = LockRenew(config, self.logger) |
196 |
0 |
self.task_locked_time = config["global"]["task_locked_time"] |
197 |
|
|
198 |
1 |
async def vim_watcher(self): |
199 |
|
"""Reads database periodically looking for tasks not processed by nobody because of a reboot |
200 |
|
in order to load this vim""" |
201 |
|
# firstly read VIMS not processed |
202 |
0 |
for target_database in ("vim_accounts", "wim_accounts", "sdns"): |
203 |
0 |
unattended_targets = self.db.get_list( |
204 |
|
target_database, |
205 |
|
q_filter={"_admin.operations.operationState": "PROCESSING"}, |
206 |
|
) |
207 |
|
|
208 |
0 |
for target in unattended_targets: |
209 |
0 |
target_id = "{}:{}".format(target_database[:3], target["_id"]) |
210 |
0 |
self.logger.info("ordered to check {}".format(target_id)) |
211 |
0 |
self.engine.check_vim(target_id) |
212 |
|
|
213 |
0 |
while not self.to_terminate: |
214 |
0 |
now = time() |
215 |
0 |
processed_vims = [] |
216 |
|
|
217 |
0 |
if not self.last_rotask_time: |
218 |
0 |
self.last_rotask_time = 0 |
219 |
|
|
220 |
0 |
ro_tasks = self.db.get_list( |
221 |
|
"ro_tasks", |
222 |
|
q_filter={ |
223 |
|
"target_id.ncont": self.engine.get_assigned_vims(), |
224 |
|
"tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], |
225 |
|
"locked_at.lt": now - self.task_locked_time, |
226 |
|
"to_check_at.gt": self.last_rotask_time, |
227 |
|
"to_check_at.lte": now - self.MAX_TIME_UNATTENDED, |
228 |
|
}, |
229 |
|
) |
230 |
0 |
self.last_rotask_time = now - self.MAX_TIME_UNATTENDED |
231 |
|
|
232 |
0 |
for ro_task in ro_tasks: |
233 |
|
# if already checked ignore |
234 |
0 |
if ro_task["target_id"] in processed_vims: |
235 |
0 |
continue |
236 |
|
|
237 |
0 |
processed_vims.append(ro_task["target_id"]) |
238 |
|
|
239 |
|
# if already assigned ignore |
240 |
0 |
if ro_task["target_id"] in self.engine.get_assigned_vims(): |
241 |
0 |
continue |
242 |
|
|
243 |
|
# if there is some task locked on this VIM, there is an RO working on it, so ignore |
244 |
0 |
if self.db.get_list( |
245 |
|
"ro_tasks", |
246 |
|
q_filter={ |
247 |
|
"target_id": ro_task["target_id"], |
248 |
|
"tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], |
249 |
|
"locked_at.gt": now - self.task_locked_time, |
250 |
|
}, |
251 |
|
): |
252 |
0 |
continue |
253 |
|
|
254 |
|
# unattended, assign vim |
255 |
0 |
self.engine.assign_vim(ro_task["target_id"]) |
256 |
0 |
self.logger.debug( |
257 |
|
"ordered to load {}. Inactivity detected".format( |
258 |
|
ro_task["target_id"] |
259 |
|
) |
260 |
|
) |
261 |
|
|
262 |
|
# every 2 hours check if there are vims without any ro_task and unload it |
263 |
0 |
if now > self.next_check_unused_vim: |
264 |
0 |
self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM |
265 |
0 |
self.engine.unload_unused_vims() |
266 |
|
|
267 |
0 |
await asyncio.sleep(self.MAX_TIME_UNATTENDED) |
268 |
|
|
269 |
1 |
async def aiomain(self): |
270 |
0 |
kafka_working = True |
271 |
0 |
while not self.to_terminate: |
272 |
0 |
try: |
273 |
0 |
if not self.aiomain_task_kafka: |
274 |
0 |
for kafka_topic in self.kafka_topics: |
275 |
0 |
await self.msg.aiowrite(kafka_topic, "echo", "dummy message") |
276 |
0 |
kafka_working = True |
277 |
0 |
self.logger.debug("Starting vim_account subscription task") |
278 |
0 |
self.aiomain_task_kafka = asyncio.ensure_future( |
279 |
|
self.msg.aioread( |
280 |
|
self.kafka_topics, |
281 |
|
group_id=False, |
282 |
|
aiocallback=self._msg_callback, |
283 |
|
), |
284 |
|
) |
285 |
|
|
286 |
0 |
if not self.aiomain_task_vim: |
287 |
0 |
self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher()) |
288 |
|
|
289 |
0 |
if not self.aiomain_task_renew_lock: |
290 |
0 |
self.aiomain_task_renew_lock = asyncio.ensure_future( |
291 |
|
self.lock_renew.renew_locks() |
292 |
|
) |
293 |
|
|
294 |
0 |
done, _ = await asyncio.wait( |
295 |
|
[ |
296 |
|
self.aiomain_task_kafka, |
297 |
|
self.aiomain_task_vim, |
298 |
|
self.aiomain_task_renew_lock, |
299 |
|
], |
300 |
|
timeout=None, |
301 |
|
return_when=asyncio.FIRST_COMPLETED, |
302 |
|
) |
303 |
|
|
304 |
0 |
try: |
305 |
0 |
if self.aiomain_task_kafka in done: |
306 |
0 |
exc = self.aiomain_task_kafka.exception() |
307 |
0 |
self.logger.error( |
308 |
|
"kafka subscription task exception: {}".format(exc) |
309 |
|
) |
310 |
0 |
self.aiomain_task_kafka = None |
311 |
|
|
312 |
0 |
if self.aiomain_task_vim in done: |
313 |
0 |
exc = self.aiomain_task_vim.exception() |
314 |
0 |
self.logger.error( |
315 |
|
"vim_account watcher task exception: {}".format(exc) |
316 |
|
) |
317 |
0 |
self.aiomain_task_vim = None |
318 |
|
|
319 |
0 |
if self.aiomain_task_renew_lock in done: |
320 |
0 |
exc = self.aiomain_task_renew_lock.exception() |
321 |
0 |
self.logger.error("renew_locks task exception: {}".format(exc)) |
322 |
0 |
self.aiomain_task_renew_lock = None |
323 |
0 |
except asyncio.CancelledError: |
324 |
0 |
self.logger.exception("asyncio.CancelledError occured.") |
325 |
|
|
326 |
0 |
except Exception as e: |
327 |
0 |
if self.to_terminate: |
328 |
0 |
return |
329 |
|
|
330 |
0 |
if kafka_working: |
331 |
|
# logging only first time |
332 |
0 |
self.logger.critical( |
333 |
|
"Error accessing kafka '{}'. Retrying ...".format(e) |
334 |
|
) |
335 |
0 |
kafka_working = False |
336 |
|
|
337 |
0 |
await asyncio.sleep(10) |
338 |
|
|
339 |
1 |
def run(self): |
340 |
|
""" |
341 |
|
Start of the thread |
342 |
|
:return: None |
343 |
|
""" |
344 |
0 |
self.loop = asyncio.new_event_loop() |
345 |
0 |
try: |
346 |
0 |
if not self.db: |
347 |
0 |
if self.config["database"]["driver"] == "mongo": |
348 |
0 |
self.db = dbmongo.DbMongo() |
349 |
0 |
self.db.db_connect(self.config["database"]) |
350 |
0 |
elif self.config["database"]["driver"] == "memory": |
351 |
0 |
self.db = dbmemory.DbMemory() |
352 |
0 |
self.db.db_connect(self.config["database"]) |
353 |
|
else: |
354 |
0 |
raise VimAdminException( |
355 |
|
"Invalid configuration param '{}' at '[database]':'driver'".format( |
356 |
|
self.config["database"]["driver"] |
357 |
|
) |
358 |
|
) |
359 |
|
|
360 |
0 |
self.lock_renew.start(self.db) |
361 |
|
|
362 |
0 |
if not self.msg: |
363 |
0 |
config_msg = self.config["message"].copy() |
364 |
|
|
365 |
0 |
if config_msg["driver"] == "local": |
366 |
0 |
self.msg = msglocal.MsgLocal() |
367 |
0 |
self.msg.connect(config_msg) |
368 |
0 |
elif config_msg["driver"] == "kafka": |
369 |
0 |
self.msg = msgkafka.MsgKafka() |
370 |
0 |
self.msg.connect(config_msg) |
371 |
|
else: |
372 |
0 |
raise VimAdminException( |
373 |
|
"Invalid configuration param '{}' at '[message]':'driver'".format( |
374 |
|
config_msg["driver"] |
375 |
|
) |
376 |
|
) |
377 |
0 |
except (DbException, MsgException) as e: |
378 |
0 |
raise VimAdminException(str(e), http_code=e.http_code) |
379 |
|
|
380 |
0 |
self.logger.info("Starting") |
381 |
0 |
while not self.to_terminate: |
382 |
0 |
try: |
383 |
0 |
asyncio.run(self.main_task()) |
384 |
0 |
except Exception as e: |
385 |
0 |
if not self.to_terminate: |
386 |
0 |
self.logger.exception( |
387 |
|
"Exception '{}' at messaging read loop".format(e), exc_info=True |
388 |
|
) |
389 |
|
|
390 |
0 |
self.logger.info("Finishing") |
391 |
0 |
self._stop() |
392 |
0 |
self.loop.close() |
393 |
|
|
394 |
1 |
async def main_task(self): |
395 |
0 |
task = asyncio.ensure_future(self.aiomain()) |
396 |
0 |
await task |
397 |
|
|
398 |
1 |
async def _msg_callback(self, topic, command, params): |
399 |
|
""" |
400 |
|
Callback to process a received message from kafka |
401 |
|
:param topic: topic received |
402 |
|
:param command: command received |
403 |
|
:param params: rest of parameters |
404 |
|
:return: None |
405 |
|
""" |
406 |
0 |
try: |
407 |
0 |
if command == "echo": |
408 |
0 |
return |
409 |
|
|
410 |
0 |
if topic in self.kafka_topics: |
411 |
0 |
target = topic[0:3] # vim, wim or sdn |
412 |
0 |
target_id = target + ":" + params["_id"] |
413 |
|
|
414 |
0 |
if command in ("edited", "edit"): |
415 |
0 |
self.engine.reload_vim(target_id) |
416 |
0 |
self.logger.debug("ordered to reload {}".format(target_id)) |
417 |
0 |
elif command in ("deleted", "delete"): |
418 |
0 |
self.engine.unload_vim(target_id) |
419 |
0 |
self.logger.debug("ordered to unload {}".format(target_id)) |
420 |
0 |
elif command in ("create", "created"): |
421 |
0 |
self.engine.check_vim(target_id) |
422 |
0 |
self.logger.debug("ordered to check {}".format(target_id)) |
423 |
0 |
except (DbException, MsgException) as e: |
424 |
0 |
self.logger.error( |
425 |
|
"Error while processing topic={} command={}: {}".format( |
426 |
|
topic, command, e |
427 |
|
) |
428 |
|
) |
429 |
0 |
except Exception as e: |
430 |
0 |
self.logger.exception( |
431 |
|
"Exception while processing topic={} command={}: {}".format( |
432 |
|
topic, command, e |
433 |
|
), |
434 |
|
exc_info=True, |
435 |
|
) |
436 |
|
|
437 |
1 |
def _stop(self): |
438 |
|
""" |
439 |
|
Close all connections |
440 |
|
:return: None |
441 |
|
""" |
442 |
0 |
try: |
443 |
0 |
if self.db: |
444 |
0 |
self.db.db_disconnect() |
445 |
|
|
446 |
0 |
if self.msg: |
447 |
0 |
self.msg.disconnect() |
448 |
0 |
except (DbException, MsgException) as e: |
449 |
0 |
raise VimAdminException(str(e), http_code=e.http_code) |
450 |
|
|
451 |
1 |
def terminate(self): |
452 |
|
""" |
453 |
|
This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
454 |
|
but not immediately. |
455 |
|
:return: None |
456 |
|
""" |
457 |
0 |
self.to_terminate = True |
458 |
0 |
self.lock_renew.to_terminate = True |
459 |
|
|
460 |
0 |
if self.aiomain_task_kafka: |
461 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel()) |
462 |
|
|
463 |
0 |
if self.aiomain_task_vim: |
464 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel()) |
465 |
|
|
466 |
0 |
if self.aiomain_task_renew_lock: |
467 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel()) |
468 |
|
|
469 |
0 |
self.lock_renew.stop() |