1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
25 from http
import HTTPStatus
27 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
28 from osm_common
.dbbase
import DbException
29 from osm_common
.msgbase
import MsgException
32 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35 class VimAdminException(Exception):
37 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
38 self
.http_code
= http_code
39 Exception.__init
__(self
, message
)
45 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
46 # be renewed. The time order is achieved as it is appended at the end
48 def __init__(self
, config
, logger
):
51 :param config: configuration parameters of database and messaging
55 self
.to_terminate
= False
58 self
.task_locked_time
= config
["global"]["task_locked_time"]
59 self
.task_relock_time
= config
["global"]["task_relock_time"]
60 self
.task_max_locked_time
= config
["global"]["task_max_locked_time"]
62 def start(self
, db
, loop
):
67 def add_lock_object(database_table
, database_object
, thread_object
):
69 Insert a task to renew the locking
70 :param database_table: database collection where to maintain the lock
71 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
72 :param thread_object: Thread object that has locked to check if it is alive
73 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
76 "table": database_table
,
77 "_id": database_object
["_id"],
78 "initial_lock_time": database_object
["locked_at"],
79 "locked_at": database_object
["locked_at"],
80 "thread": thread_object
,
81 "unlocked": False # True when it is not needed any more
83 LockRenew
.renew_list
.append(lock_object
)
87 def remove_lock_object(lock_object
):
88 lock_object
["unlocked"] = True
90 async def renew_locks(self
):
91 while not self
.to_terminate
:
92 if not self
.renew_list
:
93 await asyncio
.sleep(self
.task_locked_time
- self
.task_relock_time
, loop
=self
.loop
)
95 lock_object
= self
.renew_list
[0]
96 if lock_object
["unlocked"] or not lock_object
["thread"] or not lock_object
["thread"].is_alive():
97 # task has been finished or locker thread is dead, not needed to re-locked.
98 self
.renew_list
.pop(0)
101 locked_at
= lock_object
["locked_at"]
103 time_to_relock
= locked_at
+ self
.task_locked_time
- self
.task_relock_time
- now
104 if time_to_relock
< 1:
105 if lock_object
["initial_lock_time"] + self
.task_max_locked_time
< now
:
106 self
.renew_list
.pop(0)
108 new_locked_at
= locked_at
+ self
.task_locked_time
110 if self
.db
.set_one(lock_object
["table"],
111 update_dict
={"locked_at": new_locked_at
, "modified_at": now
},
112 q_filter
={"_id": lock_object
["_id"], "locked_at": locked_at
},
113 fail_on_empty
=False):
114 self
.logger
.debug("Renew lock for {}.{}".format(lock_object
["table"], lock_object
["_id"]))
115 lock_object
["locked_at"] = new_locked_at
116 self
.renew_list
.append(lock_object
)
118 self
.logger
.info("Cannot renew lock for {}.{}".format(lock_object
["table"],
120 except Exception as e
:
121 self
.logger
.error("Exception when trying to renew lock for {}.{}: {}".format(
122 lock_object
["table"], lock_object
["_id"], e
))
124 # wait until it is time to re-lock it
125 await asyncio
.sleep(time_to_relock
, loop
=self
.loop
)
128 # unlock all locked items
130 for lock_object
in self
.renew_list
:
131 locked_at
= lock_object
["locked_at"]
132 if not lock_object
["unlocked"] or locked_at
+ self
.task_locked_time
>= now
:
133 self
.db
.set_one(lock_object
["table"], update_dict
={"locked_at": 0},
134 q_filter
={"_id": lock_object
["_id"], "locked_at": locked_at
},
138 class VimAdminThread(threading
.Thread
):
139 MAX_TIME_UNATTENDED
= 600 # 10min
140 TIME_CHECK_UNUSED_VIM
= 3600 * 2 # 2h
141 kafka_topics
= ("vim_account", "wim_account", "sdn")
143 def __init__(self
, config
, engine
):
146 :param config: configuration parameters of database and messaging
147 :param engine: an instance of Engine class, used for deleting instances
149 threading
.Thread
.__init
__(self
)
150 self
.to_terminate
= False
156 self
.last_rotask_time
= 0
157 self
.next_check_unused_vim
= time() + self
.TIME_CHECK_UNUSED_VIM
158 self
.logger
= logging
.getLogger("ro.vimadmin")
159 self
.aiomain_task_kafka
= None # asyncio task for receiving vim actions from kafka bus
160 self
.aiomain_task_vim
= None # asyncio task for watching ro_tasks not processed by nobody
161 self
.aiomain_task_renew_lock
= None
162 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
163 self
.lock_renew
= LockRenew(config
, self
.logger
)
164 self
.task_locked_time
= config
["global"]["task_locked_time"]
166 async def vim_watcher(self
):
167 """ Reads database periodically looking for tasks not processed by nobody because of a reboot
168 in order to load this vim"""
169 # firstly read VIMS not processed
170 for target_database
in ("vim_accounts", "wim_accounts", "sdns"):
171 unattended_targets
= self
.db
.get_list(target_database
,
172 q_filter
={"_admin.operations.operationState": "PROCESSING"})
173 for target
in unattended_targets
:
174 target_id
= "{}:{}".format(target_database
[:3], target
["_id"])
175 self
.logger
.info("ordered to check {}".format(target_id
))
176 self
.engine
.check_vim(target_id
)
178 while not self
.to_terminate
:
181 if not self
.last_rotask_time
:
182 self
.last_rotask_time
= 0
183 ro_tasks
= self
.db
.get_list("ro_tasks",
184 q_filter
={"target_id.ncont": self
.engine
.get_assigned_vims(),
185 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
186 "locked_at.lt": now
- self
.task_locked_time
,
187 "to_check_at.gt": self
.last_rotask_time
,
188 "to_check_at.lte": now
- self
.MAX_TIME_UNATTENDED
})
189 self
.last_rotask_time
= now
- self
.MAX_TIME_UNATTENDED
190 for ro_task
in ro_tasks
:
191 # if already checked ignore
192 if ro_task
["target_id"] in processed_vims
:
194 processed_vims
.append(ro_task
["target_id"])
195 # if already assigned ignore
196 if ro_task
["target_id"] in self
.engine
.get_assigned_vims():
198 # if there is some task locked on this VIM, there is an RO working on it, so ignore
199 if self
.db
.get_list("ro_tasks",
200 q_filter
={"target_id": ro_task
["target_id"],
201 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
202 "locked_at.gt": now
- self
.task_locked_time
}):
204 # unattended, assign vim
205 self
.engine
.assign_vim(ro_task
["target_id"])
206 self
.logger
.debug("ordered to load {}. Inactivity detected".format(ro_task
["target_id"]))
208 # every 2 hours check if there are vims without any ro_task and unload it
209 if now
> self
.next_check_unused_vim
:
210 self
.next_check_unused_vim
= now
+ self
.TIME_CHECK_UNUSED_VIM
211 self
.engine
.unload_unused_vims()
212 await asyncio
.sleep(self
.MAX_TIME_UNATTENDED
, loop
=self
.loop
)
214 async def aiomain(self
):
216 while not self
.to_terminate
:
218 if not self
.aiomain_task_kafka
:
219 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
220 await self
.msg
.aiowrite("vim_account", "echo", "dummy message", loop
=self
.loop
)
222 self
.logger
.debug("Starting vim_account subscription task")
223 self
.aiomain_task_kafka
= asyncio
.ensure_future(
224 self
.msg
.aioread(self
.kafka_topics
, loop
=self
.loop
, group_id
=False,
225 aiocallback
=self
._msg
_callback
),
227 if not self
.aiomain_task_vim
:
228 self
.aiomain_task_vim
= asyncio
.ensure_future(
231 if not self
.aiomain_task_renew_lock
:
232 self
.aiomain_task_renew_lock
= asyncio
.ensure_future(self
.lock_renew
.renew_locks(), loop
=self
.loop
)
234 done
, _
= await asyncio
.wait(
235 [self
.aiomain_task_kafka
, self
.aiomain_task_vim
, self
.aiomain_task_renew_lock
],
236 timeout
=None, loop
=self
.loop
, return_when
=asyncio
.FIRST_COMPLETED
)
238 if self
.aiomain_task_kafka
in done
:
239 exc
= self
.aiomain_task_kafka
.exception()
240 self
.logger
.error("kafka subscription task exception: {}".format(exc
))
241 self
.aiomain_task_kafka
= None
242 if self
.aiomain_task_vim
in done
:
243 exc
= self
.aiomain_task_vim
.exception()
244 self
.logger
.error("vim_account watcher task exception: {}".format(exc
))
245 self
.aiomain_task_vim
= None
246 if self
.aiomain_task_renew_lock
in done
:
247 exc
= self
.aiomain_task_renew_lock
.exception()
248 self
.logger
.error("renew_locks task exception: {}".format(exc
))
249 self
.aiomain_task_renew_lock
= None
250 except asyncio
.CancelledError
:
253 except Exception as e
:
254 if self
.to_terminate
:
257 # logging only first time
258 self
.logger
.critical("Error accessing kafka '{}'. Retrying ...".format(e
))
259 kafka_working
= False
260 await asyncio
.sleep(10, loop
=self
.loop
)
267 self
.loop
= asyncio
.new_event_loop()
270 if self
.config
["database"]["driver"] == "mongo":
271 self
.db
= dbmongo
.DbMongo()
272 self
.db
.db_connect(self
.config
["database"])
273 elif self
.config
["database"]["driver"] == "memory":
274 self
.db
= dbmemory
.DbMemory()
275 self
.db
.db_connect(self
.config
["database"])
277 raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
278 self
.config
["database"]["driver"]))
279 self
.lock_renew
.start(self
.db
, self
.loop
)
282 config_msg
= self
.config
["message"].copy()
283 config_msg
["loop"] = self
.loop
284 if config_msg
["driver"] == "local":
285 self
.msg
= msglocal
.MsgLocal()
286 self
.msg
.connect(config_msg
)
287 elif config_msg
["driver"] == "kafka":
288 self
.msg
= msgkafka
.MsgKafka()
289 self
.msg
.connect(config_msg
)
291 raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
292 config_msg
["driver"]))
293 except (DbException
, MsgException
) as e
:
294 raise VimAdminException(str(e
), http_code
=e
.http_code
)
296 self
.logger
.info("Starting")
297 while not self
.to_terminate
:
299 self
.loop
.run_until_complete(asyncio
.ensure_future(self
.aiomain(), loop
=self
.loop
))
300 # except asyncio.CancelledError:
301 # break # if cancelled it should end, breaking loop
302 except Exception as e
:
303 if not self
.to_terminate
:
304 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
306 self
.logger
.info("Finishing")
310 async def _msg_callback(self
, topic
, command
, params
):
312 Callback to process a received message from kafka
313 :param topic: topic received
314 :param command: command received
315 :param params: rest of parameters
319 if command
== "echo":
321 if topic
in self
.kafka_topics
:
322 target
= topic
[0:3] # vim, wim or sdn
323 target_id
= target
+ ":" + params
["_id"]
324 if command
in ("edited", "edit"):
325 self
.engine
.reload_vim(target_id
)
326 self
.logger
.debug("ordered to reload {}".format(target_id
))
327 elif command
in ("deleted", "delete"):
328 self
.engine
.unload_vim(target_id
)
329 self
.logger
.debug("ordered to unload {}".format(target_id
))
330 elif command
in ("create", "created"):
331 self
.engine
.check_vim(target_id
)
332 self
.logger
.debug("ordered to check {}".format(target_id
))
334 except (DbException
, MsgException
) as e
:
335 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
336 except Exception as e
:
337 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
342 Close all connections
347 self
.db
.db_disconnect()
349 self
.msg
.disconnect()
350 except (DbException
, MsgException
) as e
:
351 raise VimAdminException(str(e
), http_code
=e
.http_code
)
355 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
359 self
.to_terminate
= True
360 self
.lock_renew
.to_terminate
= True
361 if self
.aiomain_task_kafka
:
362 self
.loop
.call_soon_threadsafe(self
.aiomain_task_kafka
.cancel
)
363 if self
.aiomain_task_vim
:
364 self
.loop
.call_soon_threadsafe(self
.aiomain_task_vim
.cancel
)
365 if self
.aiomain_task_renew_lock
:
366 self
.loop
.call_soon_threadsafe(self
.aiomain_task_renew_lock
.cancel
)
367 self
.lock_renew
.stop()