1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
23 from http
import HTTPStatus
28 from osm_common
import dbmemory
, dbmongo
, msgkafka
, msglocal
29 from osm_common
.dbbase
import DbException
30 from osm_common
.msgbase
import MsgException
32 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35 class VimAdminException(Exception):
36 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
37 self
.http_code
= http_code
38 Exception.__init
__(self
, message
)
43 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
44 # be renewed. The time order is achieved as it is appended at the end
46 def __init__(self
, config
, logger
):
49 :param config: configuration parameters of database and messaging
53 self
.to_terminate
= False
55 self
.task_locked_time
= config
["global"]["task_locked_time"]
56 self
.task_relock_time
= config
["global"]["task_relock_time"]
57 self
.task_max_locked_time
= config
["global"]["task_max_locked_time"]
63 def add_lock_object(database_table
, database_object
, thread_object
):
65 Insert a task to renew the locking
66 :param database_table: database collection where to maintain the lock
67 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
68 :param thread_object: Thread object that has locked to check if it is alive
69 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
72 "table": database_table
,
73 "_id": database_object
["_id"],
74 "initial_lock_time": database_object
["locked_at"],
75 "locked_at": database_object
["locked_at"],
76 "thread": thread_object
,
77 "unlocked": False, # True when it is not needed any more
79 LockRenew
.renew_list
.append(lock_object
)
84 def remove_lock_object(lock_object
):
85 lock_object
["unlocked"] = True
87 async def renew_locks(self
):
88 while not self
.to_terminate
:
89 if not self
.renew_list
:
90 await asyncio
.sleep(self
.task_locked_time
- self
.task_relock_time
)
93 lock_object
= self
.renew_list
[0]
96 lock_object
["unlocked"]
97 or not lock_object
["thread"]
98 or not lock_object
["thread"].is_alive()
100 # task has been finished or locker thread is dead, not needed to re-locked.
101 self
.renew_list
.pop(0)
104 locked_at
= lock_object
["locked_at"]
107 locked_at
+ self
.task_locked_time
- self
.task_relock_time
- now
110 if time_to_relock
< 1:
111 if lock_object
["initial_lock_time"] + self
.task_max_locked_time
< now
:
112 self
.renew_list
.pop(0)
114 new_locked_at
= locked_at
+ self
.task_locked_time
118 lock_object
["table"],
120 "locked_at": new_locked_at
,
124 "_id": lock_object
["_id"],
125 "locked_at": locked_at
,
130 "Renew lock for {}.{}".format(
131 lock_object
["table"], lock_object
["_id"]
134 lock_object
["locked_at"] = new_locked_at
135 self
.renew_list
.append(lock_object
)
138 "Cannot renew lock for {}.{}".format(
139 lock_object
["table"], lock_object
["_id"]
142 except Exception as e
:
144 "Exception when trying to renew lock for {}.{}: {}".format(
145 lock_object
["table"], lock_object
["_id"], e
149 # wait until it is time to re-lock it
150 await asyncio
.sleep(time_to_relock
)
153 # unlock all locked items
156 for lock_object
in self
.renew_list
:
157 locked_at
= lock_object
["locked_at"]
159 if not lock_object
["unlocked"] or locked_at
+ self
.task_locked_time
>= now
:
161 lock_object
["table"],
162 update_dict
={"locked_at": 0},
163 q_filter
={"_id": lock_object
["_id"], "locked_at": locked_at
},
168 class VimAdminThread(threading
.Thread
):
169 MAX_TIME_UNATTENDED
= 600 # 10min
170 TIME_CHECK_UNUSED_VIM
= 3600 * 2 # 2h
171 kafka_topics
= ("vim_account", "wim_account", "sdn")
173 def __init__(self
, config
, engine
):
176 :param config: configuration parameters of database and messaging
177 :param engine: an instance of Engine class, used for deleting instances
179 threading
.Thread
.__init
__(self
)
180 self
.to_terminate
= False
186 self
.last_rotask_time
= 0
187 self
.next_check_unused_vim
= time() + self
.TIME_CHECK_UNUSED_VIM
188 self
.logger
= logging
.getLogger("ro.vimadmin")
189 # asyncio task for receiving vim actions from kafka bus
190 self
.aiomain_task_kafka
= None
191 # asyncio task for watching ro_tasks not processed by nobody
192 self
.aiomain_task_vim
= None
193 self
.aiomain_task_renew_lock
= None
194 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
195 self
.lock_renew
= LockRenew(config
, self
.logger
)
196 self
.task_locked_time
= config
["global"]["task_locked_time"]
198 async def vim_watcher(self
):
199 """Reads database periodically looking for tasks not processed by nobody because of a reboot
200 in order to load this vim"""
201 # firstly read VIMS not processed
202 for target_database
in ("vim_accounts", "wim_accounts", "sdns"):
203 unattended_targets
= self
.db
.get_list(
205 q_filter
={"_admin.operations.operationState": "PROCESSING"},
208 for target
in unattended_targets
:
209 target_id
= "{}:{}".format(target_database
[:3], target
["_id"])
210 self
.logger
.info("ordered to check {}".format(target_id
))
211 self
.engine
.check_vim(target_id
)
213 while not self
.to_terminate
:
217 if not self
.last_rotask_time
:
218 self
.last_rotask_time
= 0
220 ro_tasks
= self
.db
.get_list(
223 "target_id.ncont": self
.engine
.get_assigned_vims(),
224 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
225 "locked_at.lt": now
- self
.task_locked_time
,
226 "to_check_at.gt": self
.last_rotask_time
,
227 "to_check_at.lte": now
- self
.MAX_TIME_UNATTENDED
,
230 self
.last_rotask_time
= now
- self
.MAX_TIME_UNATTENDED
232 for ro_task
in ro_tasks
:
233 # if already checked ignore
234 if ro_task
["target_id"] in processed_vims
:
237 processed_vims
.append(ro_task
["target_id"])
239 # if already assigned ignore
240 if ro_task
["target_id"] in self
.engine
.get_assigned_vims():
243 # if there is some task locked on this VIM, there is an RO working on it, so ignore
247 "target_id": ro_task
["target_id"],
248 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
249 "locked_at.gt": now
- self
.task_locked_time
,
254 # unattended, assign vim
255 self
.engine
.assign_vim(ro_task
["target_id"])
257 "ordered to load {}. Inactivity detected".format(
262 # every 2 hours check if there are vims without any ro_task and unload it
263 if now
> self
.next_check_unused_vim
:
264 self
.next_check_unused_vim
= now
+ self
.TIME_CHECK_UNUSED_VIM
265 self
.engine
.unload_unused_vims()
267 await asyncio
.sleep(self
.MAX_TIME_UNATTENDED
)
269 async def aiomain(self
):
271 while not self
.to_terminate
:
273 if not self
.aiomain_task_kafka
:
274 for kafka_topic
in self
.kafka_topics
:
275 await self
.msg
.aiowrite(kafka_topic
, "echo", "dummy message")
277 self
.logger
.debug("Starting vim_account subscription task")
278 self
.aiomain_task_kafka
= asyncio
.ensure_future(
282 aiocallback
=self
._msg
_callback
,
286 if not self
.aiomain_task_vim
:
287 self
.aiomain_task_vim
= asyncio
.ensure_future(self
.vim_watcher())
289 if not self
.aiomain_task_renew_lock
:
290 self
.aiomain_task_renew_lock
= asyncio
.ensure_future(
291 self
.lock_renew
.renew_locks()
294 done
, _
= await asyncio
.wait(
296 self
.aiomain_task_kafka
,
297 self
.aiomain_task_vim
,
298 self
.aiomain_task_renew_lock
,
301 return_when
=asyncio
.FIRST_COMPLETED
,
305 if self
.aiomain_task_kafka
in done
:
306 exc
= self
.aiomain_task_kafka
.exception()
308 "kafka subscription task exception: {}".format(exc
)
310 self
.aiomain_task_kafka
= None
312 if self
.aiomain_task_vim
in done
:
313 exc
= self
.aiomain_task_vim
.exception()
315 "vim_account watcher task exception: {}".format(exc
)
317 self
.aiomain_task_vim
= None
319 if self
.aiomain_task_renew_lock
in done
:
320 exc
= self
.aiomain_task_renew_lock
.exception()
321 self
.logger
.error("renew_locks task exception: {}".format(exc
))
322 self
.aiomain_task_renew_lock
= None
323 except asyncio
.CancelledError
:
324 self
.logger
.exception("asyncio.CancelledError occured.")
326 except Exception as e
:
327 if self
.to_terminate
:
331 # logging only first time
332 self
.logger
.critical(
333 "Error accessing kafka '{}'. Retrying ...".format(e
)
335 kafka_working
= False
337 await asyncio
.sleep(10)
344 self
.loop
= asyncio
.new_event_loop()
347 if self
.config
["database"]["driver"] == "mongo":
348 self
.db
= dbmongo
.DbMongo()
349 self
.db
.db_connect(self
.config
["database"])
350 elif self
.config
["database"]["driver"] == "memory":
351 self
.db
= dbmemory
.DbMemory()
352 self
.db
.db_connect(self
.config
["database"])
354 raise VimAdminException(
355 "Invalid configuration param '{}' at '[database]':'driver'".format(
356 self
.config
["database"]["driver"]
360 self
.lock_renew
.start(self
.db
)
363 config_msg
= self
.config
["message"].copy()
365 if config_msg
["driver"] == "local":
366 self
.msg
= msglocal
.MsgLocal()
367 self
.msg
.connect(config_msg
)
368 elif config_msg
["driver"] == "kafka":
369 self
.msg
= msgkafka
.MsgKafka()
370 self
.msg
.connect(config_msg
)
372 raise VimAdminException(
373 "Invalid configuration param '{}' at '[message]':'driver'".format(
377 except (DbException
, MsgException
) as e
:
378 raise VimAdminException(str(e
), http_code
=e
.http_code
)
380 self
.logger
.info("Starting")
381 while not self
.to_terminate
:
383 asyncio
.run(self
.main_task())
384 except Exception as e
:
385 if not self
.to_terminate
:
386 self
.logger
.exception(
387 "Exception '{}' at messaging read loop".format(e
), exc_info
=True
390 self
.logger
.info("Finishing")
394 async def main_task(self
):
395 task
= asyncio
.ensure_future(self
.aiomain())
398 async def _msg_callback(self
, topic
, command
, params
):
400 Callback to process a received message from kafka
401 :param topic: topic received
402 :param command: command received
403 :param params: rest of parameters
407 if command
== "echo":
410 if topic
in self
.kafka_topics
:
411 target
= topic
[0:3] # vim, wim or sdn
412 target_id
= target
+ ":" + params
["_id"]
414 if command
in ("edited", "edit"):
415 self
.engine
.reload_vim(target_id
)
416 self
.logger
.debug("ordered to reload {}".format(target_id
))
417 elif command
in ("deleted", "delete"):
418 self
.engine
.unload_vim(target_id
)
419 self
.logger
.debug("ordered to unload {}".format(target_id
))
420 elif command
in ("create", "created"):
421 self
.engine
.check_vim(target_id
)
422 self
.logger
.debug("ordered to check {}".format(target_id
))
423 except (DbException
, MsgException
) as e
:
425 "Error while processing topic={} command={}: {}".format(
429 except Exception as e
:
430 self
.logger
.exception(
431 "Exception while processing topic={} command={}: {}".format(
439 Close all connections
444 self
.db
.db_disconnect()
447 self
.msg
.disconnect()
448 except (DbException
, MsgException
) as e
:
449 raise VimAdminException(str(e
), http_code
=e
.http_code
)
453 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
457 self
.to_terminate
= True
458 self
.lock_renew
.to_terminate
= True
460 if self
.aiomain_task_kafka
:
461 self
.loop
.call_soon_threadsafe(self
.aiomain_task_kafka
.cancel())
463 if self
.aiomain_task_vim
:
464 self
.loop
.call_soon_threadsafe(self
.aiomain_task_vim
.cancel())
466 if self
.aiomain_task_renew_lock
:
467 self
.loop
.call_soon_threadsafe(self
.aiomain_task_renew_lock
.cancel())
469 self
.lock_renew
.stop()