1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
23 from http
import HTTPStatus
28 from osm_common
import dbmemory
, dbmongo
, msgkafka
, msglocal
29 from osm_common
.dbbase
import DbException
30 from osm_common
.msgbase
import MsgException
32 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35 class VimAdminException(Exception):
36 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
37 self
.http_code
= http_code
38 Exception.__init
__(self
, message
)
44 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
45 # be renewed. The time order is achieved as it is appended at the end
47 def __init__(self
, config
, logger
):
50 :param config: configuration parameters of database and messaging
54 self
.to_terminate
= False
57 self
.task_locked_time
= config
["global"]["task_locked_time"]
58 self
.task_relock_time
= config
["global"]["task_relock_time"]
59 self
.task_max_locked_time
= config
["global"]["task_max_locked_time"]
61 def start(self
, db
, loop
):
66 def add_lock_object(database_table
, database_object
, thread_object
):
68 Insert a task to renew the locking
69 :param database_table: database collection where to maintain the lock
70 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
71 :param thread_object: Thread object that has locked to check if it is alive
72 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
75 "table": database_table
,
76 "_id": database_object
["_id"],
77 "initial_lock_time": database_object
["locked_at"],
78 "locked_at": database_object
["locked_at"],
79 "thread": thread_object
,
80 "unlocked": False, # True when it is not needed any more
82 LockRenew
.renew_list
.append(lock_object
)
87 def remove_lock_object(lock_object
):
88 lock_object
["unlocked"] = True
90 async def renew_locks(self
):
91 while not self
.to_terminate
:
92 if not self
.renew_list
:
94 self
.task_locked_time
- self
.task_relock_time
, loop
=self
.loop
98 lock_object
= self
.renew_list
[0]
101 lock_object
["unlocked"]
102 or not lock_object
["thread"]
103 or not lock_object
["thread"].is_alive()
105 # task has been finished or locker thread is dead, not needed to re-locked.
106 self
.renew_list
.pop(0)
109 locked_at
= lock_object
["locked_at"]
112 locked_at
+ self
.task_locked_time
- self
.task_relock_time
- now
115 if time_to_relock
< 1:
116 if lock_object
["initial_lock_time"] + self
.task_max_locked_time
< now
:
117 self
.renew_list
.pop(0)
119 new_locked_at
= locked_at
+ self
.task_locked_time
123 lock_object
["table"],
125 "locked_at": new_locked_at
,
129 "_id": lock_object
["_id"],
130 "locked_at": locked_at
,
135 "Renew lock for {}.{}".format(
136 lock_object
["table"], lock_object
["_id"]
139 lock_object
["locked_at"] = new_locked_at
140 self
.renew_list
.append(lock_object
)
143 "Cannot renew lock for {}.{}".format(
144 lock_object
["table"], lock_object
["_id"]
147 except Exception as e
:
149 "Exception when trying to renew lock for {}.{}: {}".format(
150 lock_object
["table"], lock_object
["_id"], e
154 # wait until it is time to re-lock it
155 await asyncio
.sleep(time_to_relock
, loop
=self
.loop
)
158 # unlock all locked items
161 for lock_object
in self
.renew_list
:
162 locked_at
= lock_object
["locked_at"]
164 if not lock_object
["unlocked"] or locked_at
+ self
.task_locked_time
>= now
:
166 lock_object
["table"],
167 update_dict
={"locked_at": 0},
168 q_filter
={"_id": lock_object
["_id"], "locked_at": locked_at
},
173 class VimAdminThread(threading
.Thread
):
174 MAX_TIME_UNATTENDED
= 600 # 10min
175 TIME_CHECK_UNUSED_VIM
= 3600 * 2 # 2h
176 kafka_topics
= ("vim_account", "wim_account", "sdn")
178 def __init__(self
, config
, engine
):
181 :param config: configuration parameters of database and messaging
182 :param engine: an instance of Engine class, used for deleting instances
184 threading
.Thread
.__init
__(self
)
185 self
.to_terminate
= False
191 self
.last_rotask_time
= 0
192 self
.next_check_unused_vim
= time() + self
.TIME_CHECK_UNUSED_VIM
193 self
.logger
= logging
.getLogger("ro.vimadmin")
194 # asyncio task for receiving vim actions from kafka bus
195 self
.aiomain_task_kafka
= None
196 # asyncio task for watching ro_tasks not processed by nobody
197 self
.aiomain_task_vim
= None
198 self
.aiomain_task_renew_lock
= None
199 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
200 self
.lock_renew
= LockRenew(config
, self
.logger
)
201 self
.task_locked_time
= config
["global"]["task_locked_time"]
203 async def vim_watcher(self
):
204 """Reads database periodically looking for tasks not processed by nobody because of a reboot
205 in order to load this vim"""
206 # firstly read VIMS not processed
207 for target_database
in ("vim_accounts", "wim_accounts", "sdns"):
208 unattended_targets
= self
.db
.get_list(
210 q_filter
={"_admin.operations.operationState": "PROCESSING"},
213 for target
in unattended_targets
:
214 target_id
= "{}:{}".format(target_database
[:3], target
["_id"])
215 self
.logger
.info("ordered to check {}".format(target_id
))
216 self
.engine
.check_vim(target_id
)
218 while not self
.to_terminate
:
222 if not self
.last_rotask_time
:
223 self
.last_rotask_time
= 0
225 ro_tasks
= self
.db
.get_list(
228 "target_id.ncont": self
.engine
.get_assigned_vims(),
229 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
230 "locked_at.lt": now
- self
.task_locked_time
,
231 "to_check_at.gt": self
.last_rotask_time
,
232 "to_check_at.lte": now
- self
.MAX_TIME_UNATTENDED
,
235 self
.last_rotask_time
= now
- self
.MAX_TIME_UNATTENDED
237 for ro_task
in ro_tasks
:
238 # if already checked ignore
239 if ro_task
["target_id"] in processed_vims
:
242 processed_vims
.append(ro_task
["target_id"])
244 # if already assigned ignore
245 if ro_task
["target_id"] in self
.engine
.get_assigned_vims():
248 # if there is some task locked on this VIM, there is an RO working on it, so ignore
252 "target_id": ro_task
["target_id"],
253 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
254 "locked_at.gt": now
- self
.task_locked_time
,
259 # unattended, assign vim
260 self
.engine
.assign_vim(ro_task
["target_id"])
262 "ordered to load {}. Inactivity detected".format(
267 # every 2 hours check if there are vims without any ro_task and unload it
268 if now
> self
.next_check_unused_vim
:
269 self
.next_check_unused_vim
= now
+ self
.TIME_CHECK_UNUSED_VIM
270 self
.engine
.unload_unused_vims()
272 await asyncio
.sleep(self
.MAX_TIME_UNATTENDED
, loop
=self
.loop
)
274 async def aiomain(self
):
276 while not self
.to_terminate
:
278 if not self
.aiomain_task_kafka
:
279 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
280 for kafka_topic
in self
.kafka_topics
:
281 await self
.msg
.aiowrite(
282 kafka_topic
, "echo", "dummy message", loop
=self
.loop
285 self
.logger
.debug("Starting vim_account subscription task")
286 self
.aiomain_task_kafka
= asyncio
.ensure_future(
291 aiocallback
=self
._msg
_callback
,
296 if not self
.aiomain_task_vim
:
297 self
.aiomain_task_vim
= asyncio
.ensure_future(
298 self
.vim_watcher(), loop
=self
.loop
301 if not self
.aiomain_task_renew_lock
:
302 self
.aiomain_task_renew_lock
= asyncio
.ensure_future(
303 self
.lock_renew
.renew_locks(), loop
=self
.loop
306 done
, _
= await asyncio
.wait(
308 self
.aiomain_task_kafka
,
309 self
.aiomain_task_vim
,
310 self
.aiomain_task_renew_lock
,
314 return_when
=asyncio
.FIRST_COMPLETED
,
318 if self
.aiomain_task_kafka
in done
:
319 exc
= self
.aiomain_task_kafka
.exception()
321 "kafka subscription task exception: {}".format(exc
)
323 self
.aiomain_task_kafka
= None
325 if self
.aiomain_task_vim
in done
:
326 exc
= self
.aiomain_task_vim
.exception()
328 "vim_account watcher task exception: {}".format(exc
)
330 self
.aiomain_task_vim
= None
332 if self
.aiomain_task_renew_lock
in done
:
333 exc
= self
.aiomain_task_renew_lock
.exception()
334 self
.logger
.error("renew_locks task exception: {}".format(exc
))
335 self
.aiomain_task_renew_lock
= None
336 except asyncio
.CancelledError
:
339 except Exception as e
:
340 if self
.to_terminate
:
344 # logging only first time
345 self
.logger
.critical(
346 "Error accessing kafka '{}'. Retrying ...".format(e
)
348 kafka_working
= False
350 await asyncio
.sleep(10, loop
=self
.loop
)
357 self
.loop
= asyncio
.new_event_loop()
360 if self
.config
["database"]["driver"] == "mongo":
361 self
.db
= dbmongo
.DbMongo()
362 self
.db
.db_connect(self
.config
["database"])
363 elif self
.config
["database"]["driver"] == "memory":
364 self
.db
= dbmemory
.DbMemory()
365 self
.db
.db_connect(self
.config
["database"])
367 raise VimAdminException(
368 "Invalid configuration param '{}' at '[database]':'driver'".format(
369 self
.config
["database"]["driver"]
373 self
.lock_renew
.start(self
.db
, self
.loop
)
376 config_msg
= self
.config
["message"].copy()
377 config_msg
["loop"] = self
.loop
379 if config_msg
["driver"] == "local":
380 self
.msg
= msglocal
.MsgLocal()
381 self
.msg
.connect(config_msg
)
382 elif config_msg
["driver"] == "kafka":
383 self
.msg
= msgkafka
.MsgKafka()
384 self
.msg
.connect(config_msg
)
386 raise VimAdminException(
387 "Invalid configuration param '{}' at '[message]':'driver'".format(
391 except (DbException
, MsgException
) as e
:
392 raise VimAdminException(str(e
), http_code
=e
.http_code
)
394 self
.logger
.info("Starting")
395 while not self
.to_terminate
:
397 self
.loop
.run_until_complete(
398 asyncio
.ensure_future(self
.aiomain(), loop
=self
.loop
)
400 # except asyncio.CancelledError:
401 # break # if cancelled it should end, breaking loop
402 except Exception as e
:
403 if not self
.to_terminate
:
404 self
.logger
.exception(
405 "Exception '{}' at messaging read loop".format(e
), exc_info
=True
408 self
.logger
.info("Finishing")
412 async def _msg_callback(self
, topic
, command
, params
):
414 Callback to process a received message from kafka
415 :param topic: topic received
416 :param command: command received
417 :param params: rest of parameters
421 if command
== "echo":
424 if topic
in self
.kafka_topics
:
425 target
= topic
[0:3] # vim, wim or sdn
426 target_id
= target
+ ":" + params
["_id"]
428 if command
in ("edited", "edit"):
429 self
.engine
.reload_vim(target_id
)
430 self
.logger
.debug("ordered to reload {}".format(target_id
))
431 elif command
in ("deleted", "delete"):
432 self
.engine
.unload_vim(target_id
)
433 self
.logger
.debug("ordered to unload {}".format(target_id
))
434 elif command
in ("create", "created"):
435 self
.engine
.check_vim(target_id
)
436 self
.logger
.debug("ordered to check {}".format(target_id
))
437 except (DbException
, MsgException
) as e
:
439 "Error while processing topic={} command={}: {}".format(
443 except Exception as e
:
444 self
.logger
.exception(
445 "Exception while processing topic={} command={}: {}".format(
453 Close all connections
458 self
.db
.db_disconnect()
461 self
.msg
.disconnect()
462 except (DbException
, MsgException
) as e
:
463 raise VimAdminException(str(e
), http_code
=e
.http_code
)
467 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
471 self
.to_terminate
= True
472 self
.lock_renew
.to_terminate
= True
474 if self
.aiomain_task_kafka
:
475 self
.loop
.call_soon_threadsafe(self
.aiomain_task_kafka
.cancel
)
477 if self
.aiomain_task_vim
:
478 self
.loop
.call_soon_threadsafe(self
.aiomain_task_vim
.cancel
)
480 if self
.aiomain_task_renew_lock
:
481 self
.loop
.call_soon_threadsafe(self
.aiomain_task_renew_lock
.cancel
)
483 self
.lock_renew
.stop()