fix 1386: enhance on lock procedure
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20 """
21
22 import logging
23 import threading
24 import asyncio
25 from http import HTTPStatus
26
27 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28 from osm_common.dbbase import DbException
29 from osm_common.msgbase import MsgException
30 from time import time
31
32 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 class VimAdminException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
42 class LockRenew:
43
44 renew_list = []
45 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
46 # be renewed. The time order is achieved as it is appended at the end
47
48 def __init__(self, config, logger):
49 """
50 Constructor of class
51 :param config: configuration parameters of database and messaging
52 """
53 self.config = config
54 self.logger = logger
55 self.to_terminate = False
56 self.loop = None
57 self.db = None
58 self.task_locked_time = config["global"]["task_locked_time"]
59 self.task_relock_time = config["global"]["task_relock_time"]
60 self.task_max_locked_time = config["global"]["task_max_locked_time"]
61
62 def start(self, db, loop):
63 self.db = db
64 self.loop = loop
65
66 @staticmethod
67 def add_lock_object(database_table, database_object, thread_object):
68 """
69 Insert a task to renew the locking
70 :param database_table: database collection where to maintain the lock
71 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
72 :param thread_object: Thread object that has locked to check if it is alive
73 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
74 """
75 lock_object = {
76 "table": database_table,
77 "_id": database_object["_id"],
78 "initial_lock_time": database_object["locked_at"],
79 "locked_at": database_object["locked_at"],
80 "thread": thread_object,
81 "unlocked": False # True when it is not needed any more
82 }
83 LockRenew.renew_list.append(lock_object)
84 return lock_object
85
86 @staticmethod
87 def remove_lock_object(lock_object):
88 lock_object["unlocked"] = True
89
90 async def renew_locks(self):
91 while not self.to_terminate:
92 if not self.renew_list:
93 await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop)
94 continue
95 lock_object = self.renew_list[0]
96 if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive():
97 # task has been finished or locker thread is dead, not needed to re-locked.
98 self.renew_list.pop(0)
99 continue
100
101 locked_at = lock_object["locked_at"]
102 now = time()
103 time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now
104 if time_to_relock < 1:
105 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
106 self.renew_list.pop(0)
107 # re-lock
108 new_locked_at = locked_at + self.task_locked_time
109 try:
110 if self.db.set_one(lock_object["table"],
111 update_dict={"locked_at": new_locked_at, "modified_at": now},
112 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
113 fail_on_empty=False):
114 self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"]))
115 lock_object["locked_at"] = new_locked_at
116 self.renew_list.append(lock_object)
117 else:
118 self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"],
119 lock_object["_id"]))
120 except Exception as e:
121 self.logger.error("Exception when trying to renew lock for {}.{}: {}".format(
122 lock_object["table"], lock_object["_id"], e))
123 else:
124 # wait until it is time to re-lock it
125 await asyncio.sleep(time_to_relock, loop=self.loop)
126
127 def stop(self):
128 # unlock all locked items
129 now = time()
130 for lock_object in self.renew_list:
131 locked_at = lock_object["locked_at"]
132 if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
133 self.db.set_one(lock_object["table"], update_dict={"locked_at": 0},
134 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
135 fail_on_empty=False)
136
137
138 class VimAdminThread(threading.Thread):
139 MAX_TIME_UNATTENDED = 600 # 10min
140 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
141 kafka_topics = ("vim_account", "wim_account", "sdn")
142
143 def __init__(self, config, engine):
144 """
145 Constructor of class
146 :param config: configuration parameters of database and messaging
147 :param engine: an instance of Engine class, used for deleting instances
148 """
149 threading.Thread.__init__(self)
150 self.to_terminate = False
151 self.config = config
152 self.db = None
153 self.msg = None
154 self.engine = engine
155 self.loop = None
156 self.last_rotask_time = 0
157 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
158 self.logger = logging.getLogger("ro.vimadmin")
159 self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
160 self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
161 self.aiomain_task_renew_lock = None
162 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
163 self.lock_renew = LockRenew(config, self.logger)
164 self.task_locked_time = config["global"]["task_locked_time"]
165
166 async def vim_watcher(self):
167 """ Reads database periodically looking for tasks not processed by nobody because of a reboot
168 in order to load this vim"""
169 # firstly read VIMS not processed
170 for target_database in ("vim_accounts", "wim_accounts", "sdns"):
171 unattended_targets = self.db.get_list(target_database,
172 q_filter={"_admin.operations.operationState": "PROCESSING"})
173 for target in unattended_targets:
174 target_id = "{}:{}".format(target_database[:3], target["_id"])
175 self.logger.info("ordered to check {}".format(target_id))
176 self.engine.check_vim(target_id)
177
178 while not self.to_terminate:
179 now = time()
180 processed_vims = []
181 if not self.last_rotask_time:
182 self.last_rotask_time = 0
183 ro_tasks = self.db.get_list("ro_tasks",
184 q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
185 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
186 "locked_at.lt": now - self.task_locked_time,
187 "to_check_at.gt": self.last_rotask_time,
188 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
189 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
190 for ro_task in ro_tasks:
191 # if already checked ignore
192 if ro_task["target_id"] in processed_vims:
193 continue
194 processed_vims.append(ro_task["target_id"])
195 # if already assigned ignore
196 if ro_task["target_id"] in self.engine.get_assigned_vims():
197 continue
198 # if there is some task locked on this VIM, there is an RO working on it, so ignore
199 if self.db.get_list("ro_tasks",
200 q_filter={"target_id": ro_task["target_id"],
201 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
202 "locked_at.gt": now - self.task_locked_time}):
203 continue
204 # unattended, assign vim
205 self.engine.assign_vim(ro_task["target_id"])
206 self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
207
208 # every 2 hours check if there are vims without any ro_task and unload it
209 if now > self.next_check_unused_vim:
210 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
211 self.engine.unload_unused_vims()
212 await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
213
214 async def aiomain(self):
215 kafka_working = True
216 while not self.to_terminate:
217 try:
218 if not self.aiomain_task_kafka:
219 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
220 await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
221 kafka_working = True
222 self.logger.debug("Starting vim_account subscription task")
223 self.aiomain_task_kafka = asyncio.ensure_future(
224 self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
225 aiocallback=self._msg_callback),
226 loop=self.loop)
227 if not self.aiomain_task_vim:
228 self.aiomain_task_vim = asyncio.ensure_future(
229 self.vim_watcher(),
230 loop=self.loop)
231 if not self.aiomain_task_renew_lock:
232 self.aiomain_task_renew_lock = asyncio.ensure_future(self.lock_renew.renew_locks(), loop=self.loop)
233
234 done, _ = await asyncio.wait(
235 [self.aiomain_task_kafka, self.aiomain_task_vim, self.aiomain_task_renew_lock],
236 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
237 try:
238 if self.aiomain_task_kafka in done:
239 exc = self.aiomain_task_kafka.exception()
240 self.logger.error("kafka subscription task exception: {}".format(exc))
241 self.aiomain_task_kafka = None
242 if self.aiomain_task_vim in done:
243 exc = self.aiomain_task_vim.exception()
244 self.logger.error("vim_account watcher task exception: {}".format(exc))
245 self.aiomain_task_vim = None
246 if self.aiomain_task_renew_lock in done:
247 exc = self.aiomain_task_renew_lock.exception()
248 self.logger.error("renew_locks task exception: {}".format(exc))
249 self.aiomain_task_renew_lock = None
250 except asyncio.CancelledError:
251 pass
252
253 except Exception as e:
254 if self.to_terminate:
255 return
256 if kafka_working:
257 # logging only first time
258 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
259 kafka_working = False
260 await asyncio.sleep(10, loop=self.loop)
261
262 def run(self):
263 """
264 Start of the thread
265 :return: None
266 """
267 self.loop = asyncio.new_event_loop()
268 try:
269 if not self.db:
270 if self.config["database"]["driver"] == "mongo":
271 self.db = dbmongo.DbMongo()
272 self.db.db_connect(self.config["database"])
273 elif self.config["database"]["driver"] == "memory":
274 self.db = dbmemory.DbMemory()
275 self.db.db_connect(self.config["database"])
276 else:
277 raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
278 self.config["database"]["driver"]))
279 self.lock_renew.start(self.db, self.loop)
280
281 if not self.msg:
282 config_msg = self.config["message"].copy()
283 config_msg["loop"] = self.loop
284 if config_msg["driver"] == "local":
285 self.msg = msglocal.MsgLocal()
286 self.msg.connect(config_msg)
287 elif config_msg["driver"] == "kafka":
288 self.msg = msgkafka.MsgKafka()
289 self.msg.connect(config_msg)
290 else:
291 raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
292 config_msg["driver"]))
293 except (DbException, MsgException) as e:
294 raise VimAdminException(str(e), http_code=e.http_code)
295
296 self.logger.info("Starting")
297 while not self.to_terminate:
298 try:
299 self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
300 # except asyncio.CancelledError:
301 # break # if cancelled it should end, breaking loop
302 except Exception as e:
303 if not self.to_terminate:
304 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
305
306 self.logger.info("Finishing")
307 self._stop()
308 self.loop.close()
309
310 async def _msg_callback(self, topic, command, params):
311 """
312 Callback to process a received message from kafka
313 :param topic: topic received
314 :param command: command received
315 :param params: rest of parameters
316 :return: None
317 """
318 try:
319 if command == "echo":
320 return
321 if topic in self.kafka_topics:
322 target = topic[0:3] # vim, wim or sdn
323 target_id = target + ":" + params["_id"]
324 if command in ("edited", "edit"):
325 self.engine.reload_vim(target_id)
326 self.logger.debug("ordered to reload {}".format(target_id))
327 elif command in ("deleted", "delete"):
328 self.engine.unload_vim(target_id)
329 self.logger.debug("ordered to unload {}".format(target_id))
330 elif command in ("create", "created"):
331 self.engine.check_vim(target_id)
332 self.logger.debug("ordered to check {}".format(target_id))
333
334 except (DbException, MsgException) as e:
335 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
336 except Exception as e:
337 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
338 exc_info=True)
339
340 def _stop(self):
341 """
342 Close all connections
343 :return: None
344 """
345 try:
346 if self.db:
347 self.db.db_disconnect()
348 if self.msg:
349 self.msg.disconnect()
350 except (DbException, MsgException) as e:
351 raise VimAdminException(str(e), http_code=e.http_code)
352
353 def terminate(self):
354 """
355 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
356 but not immediately.
357 :return: None
358 """
359 self.to_terminate = True
360 self.lock_renew.to_terminate = True
361 if self.aiomain_task_kafka:
362 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
363 if self.aiomain_task_vim:
364 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
365 if self.aiomain_task_renew_lock:
366 self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
367 self.lock_renew.stop()