blob: e843c80a47fe8b76395e6f31048fcb33404565be [file] [log] [blame]
tierno70eeb182020-10-19 16:38:00 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus reading VIM messages.
18It is based on asyncio.
19It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20"""
21
22import logging
23import threading
24import asyncio
25from http import HTTPStatus
26
27from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28from osm_common.dbbase import DbException
29from osm_common.msgbase import MsgException
tierno70eeb182020-10-19 16:38:00 +000030from time import time
31
32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35class VimAdminException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
tiernoa8ebeb12020-12-09 15:06:01 +000042class LockRenew:
43
44 renew_list = []
45 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to
46 # be renewed. The time order is achieved as it is appended at the end
47
48 def __init__(self, config, logger):
49 """
50 Constructor of class
51 :param config: configuration parameters of database and messaging
52 """
53 self.config = config
54 self.logger = logger
55 self.to_terminate = False
56 self.loop = None
57 self.db = None
58 self.task_locked_time = config["global"]["task_locked_time"]
59 self.task_relock_time = config["global"]["task_relock_time"]
60 self.task_max_locked_time = config["global"]["task_max_locked_time"]
61
62 def start(self, db, loop):
63 self.db = db
64 self.loop = loop
65
66 @staticmethod
67 def add_lock_object(database_table, database_object, thread_object):
68 """
69 Insert a task to renew the locking
70 :param database_table: database collection where to maintain the lock
71 :param database_object: database object. '_id' and 'locked_at' are mandatory keys
72 :param thread_object: Thread object that has locked to check if it is alive
73 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at'
74 """
75 lock_object = {
76 "table": database_table,
77 "_id": database_object["_id"],
78 "initial_lock_time": database_object["locked_at"],
79 "locked_at": database_object["locked_at"],
80 "thread": thread_object,
81 "unlocked": False # True when it is not needed any more
82 }
83 LockRenew.renew_list.append(lock_object)
84 return lock_object
85
86 @staticmethod
87 def remove_lock_object(lock_object):
88 lock_object["unlocked"] = True
89
90 async def renew_locks(self):
91 while not self.to_terminate:
92 if not self.renew_list:
93 await asyncio.sleep(self.task_locked_time - self.task_relock_time, loop=self.loop)
94 continue
95 lock_object = self.renew_list[0]
96 if lock_object["unlocked"] or not lock_object["thread"] or not lock_object["thread"].is_alive():
97 # task has been finished or locker thread is dead, not needed to re-locked.
98 self.renew_list.pop(0)
99 continue
100
101 locked_at = lock_object["locked_at"]
102 now = time()
103 time_to_relock = locked_at + self.task_locked_time - self.task_relock_time - now
104 if time_to_relock < 1:
105 if lock_object["initial_lock_time"] + self.task_max_locked_time < now:
106 self.renew_list.pop(0)
107 # re-lock
108 new_locked_at = locked_at + self.task_locked_time
109 try:
110 if self.db.set_one(lock_object["table"],
111 update_dict={"locked_at": new_locked_at, "modified_at": now},
112 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
113 fail_on_empty=False):
114 self.logger.debug("Renew lock for {}.{}".format(lock_object["table"], lock_object["_id"]))
115 lock_object["locked_at"] = new_locked_at
116 self.renew_list.append(lock_object)
117 else:
118 self.logger.info("Cannot renew lock for {}.{}".format(lock_object["table"],
119 lock_object["_id"]))
120 except Exception as e:
121 self.logger.error("Exception when trying to renew lock for {}.{}: {}".format(
122 lock_object["table"], lock_object["_id"], e))
123 else:
124 # wait until it is time to re-lock it
125 await asyncio.sleep(time_to_relock, loop=self.loop)
126
127 def stop(self):
128 # unlock all locked items
129 now = time()
130 for lock_object in self.renew_list:
131 locked_at = lock_object["locked_at"]
132 if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now:
133 self.db.set_one(lock_object["table"], update_dict={"locked_at": 0},
134 q_filter={"_id": lock_object["_id"], "locked_at": locked_at},
135 fail_on_empty=False)
136
137
tierno70eeb182020-10-19 16:38:00 +0000138class VimAdminThread(threading.Thread):
tiernobc891ce2020-12-06 18:27:16 +0000139 MAX_TIME_UNATTENDED = 600 # 10min
140 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
tierno70eeb182020-10-19 16:38:00 +0000141 kafka_topics = ("vim_account", "wim_account", "sdn")
142
143 def __init__(self, config, engine):
144 """
145 Constructor of class
146 :param config: configuration parameters of database and messaging
147 :param engine: an instance of Engine class, used for deleting instances
148 """
149 threading.Thread.__init__(self)
150 self.to_terminate = False
151 self.config = config
152 self.db = None
153 self.msg = None
154 self.engine = engine
155 self.loop = None
156 self.last_rotask_time = 0
tiernobc891ce2020-12-06 18:27:16 +0000157 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
tierno70eeb182020-10-19 16:38:00 +0000158 self.logger = logging.getLogger("ro.vimadmin")
159 self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
160 self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
tiernoa8ebeb12020-12-09 15:06:01 +0000161 self.aiomain_task_renew_lock = None
162 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order
163 self.lock_renew = LockRenew(config, self.logger)
164 self.task_locked_time = config["global"]["task_locked_time"]
tierno70eeb182020-10-19 16:38:00 +0000165
166 async def vim_watcher(self):
tiernobc891ce2020-12-06 18:27:16 +0000167 """ Reads database periodically looking for tasks not processed by nobody because of a reboot
tierno70eeb182020-10-19 16:38:00 +0000168 in order to load this vim"""
tiernobc891ce2020-12-06 18:27:16 +0000169 # firstly read VIMS not processed
170 for target_database in ("vim_accounts", "wim_accounts", "sdns"):
171 unattended_targets = self.db.get_list(target_database,
172 q_filter={"_admin.operations.operationState": "PROCESSING"})
173 for target in unattended_targets:
174 target_id = "{}:{}".format(target_database[:3], target["_id"])
175 self.logger.info("ordered to check {}".format(target_id))
176 self.engine.check_vim(target_id)
177
tierno70eeb182020-10-19 16:38:00 +0000178 while not self.to_terminate:
179 now = time()
tiernobc891ce2020-12-06 18:27:16 +0000180 processed_vims = []
tierno70eeb182020-10-19 16:38:00 +0000181 if not self.last_rotask_time:
182 self.last_rotask_time = 0
183 ro_tasks = self.db.get_list("ro_tasks",
tiernobc891ce2020-12-06 18:27:16 +0000184 q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
tierno70eeb182020-10-19 16:38:00 +0000185 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
tiernoa8ebeb12020-12-09 15:06:01 +0000186 "locked_at.lt": now - self.task_locked_time,
tierno70eeb182020-10-19 16:38:00 +0000187 "to_check_at.gt": self.last_rotask_time,
188 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
189 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
190 for ro_task in ro_tasks:
tiernobc891ce2020-12-06 18:27:16 +0000191 # if already checked ignore
192 if ro_task["target_id"] in processed_vims:
193 continue
194 processed_vims.append(ro_task["target_id"])
195 # if already assigned ignore
196 if ro_task["target_id"] in self.engine.get_assigned_vims():
197 continue
198 # if there is some task locked on this VIM, there is an RO working on it, so ignore
199 if self.db.get_list("ro_tasks",
200 q_filter={"target_id": ro_task["target_id"],
201 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
tiernoa8ebeb12020-12-09 15:06:01 +0000202 "locked_at.gt": now - self.task_locked_time}):
tiernobc891ce2020-12-06 18:27:16 +0000203 continue
204 # unattended, assign vim
205 self.engine.assign_vim(ro_task["target_id"])
206 self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
tierno70eeb182020-10-19 16:38:00 +0000207
tiernobc891ce2020-12-06 18:27:16 +0000208 # every 2 hours check if there are vims without any ro_task and unload it
209 if now > self.next_check_unused_vim:
210 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
211 self.engine.unload_unused_vims()
212 await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
tierno70eeb182020-10-19 16:38:00 +0000213
214 async def aiomain(self):
215 kafka_working = True
216 while not self.to_terminate:
217 try:
218 if not self.aiomain_task_kafka:
219 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
220 await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
221 kafka_working = True
222 self.logger.debug("Starting vim_account subscription task")
223 self.aiomain_task_kafka = asyncio.ensure_future(
224 self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
225 aiocallback=self._msg_callback),
226 loop=self.loop)
227 if not self.aiomain_task_vim:
228 self.aiomain_task_vim = asyncio.ensure_future(
229 self.vim_watcher(),
230 loop=self.loop)
tiernoa8ebeb12020-12-09 15:06:01 +0000231 if not self.aiomain_task_renew_lock:
232 self.aiomain_task_renew_lock = asyncio.ensure_future(self.lock_renew.renew_locks(), loop=self.loop)
233
234 done, _ = await asyncio.wait(
235 [self.aiomain_task_kafka, self.aiomain_task_vim, self.aiomain_task_renew_lock],
236 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
tierno70eeb182020-10-19 16:38:00 +0000237 try:
238 if self.aiomain_task_kafka in done:
239 exc = self.aiomain_task_kafka.exception()
240 self.logger.error("kafka subscription task exception: {}".format(exc))
241 self.aiomain_task_kafka = None
242 if self.aiomain_task_vim in done:
243 exc = self.aiomain_task_vim.exception()
244 self.logger.error("vim_account watcher task exception: {}".format(exc))
245 self.aiomain_task_vim = None
tiernoa8ebeb12020-12-09 15:06:01 +0000246 if self.aiomain_task_renew_lock in done:
247 exc = self.aiomain_task_renew_lock.exception()
248 self.logger.error("renew_locks task exception: {}".format(exc))
249 self.aiomain_task_renew_lock = None
tierno70eeb182020-10-19 16:38:00 +0000250 except asyncio.CancelledError:
251 pass
252
253 except Exception as e:
254 if self.to_terminate:
255 return
256 if kafka_working:
257 # logging only first time
258 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
259 kafka_working = False
260 await asyncio.sleep(10, loop=self.loop)
261
262 def run(self):
263 """
264 Start of the thread
265 :return: None
266 """
267 self.loop = asyncio.new_event_loop()
268 try:
269 if not self.db:
270 if self.config["database"]["driver"] == "mongo":
271 self.db = dbmongo.DbMongo()
272 self.db.db_connect(self.config["database"])
273 elif self.config["database"]["driver"] == "memory":
274 self.db = dbmemory.DbMemory()
275 self.db.db_connect(self.config["database"])
276 else:
277 raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
278 self.config["database"]["driver"]))
tiernoa8ebeb12020-12-09 15:06:01 +0000279 self.lock_renew.start(self.db, self.loop)
280
tierno70eeb182020-10-19 16:38:00 +0000281 if not self.msg:
282 config_msg = self.config["message"].copy()
283 config_msg["loop"] = self.loop
284 if config_msg["driver"] == "local":
285 self.msg = msglocal.MsgLocal()
286 self.msg.connect(config_msg)
287 elif config_msg["driver"] == "kafka":
288 self.msg = msgkafka.MsgKafka()
289 self.msg.connect(config_msg)
290 else:
291 raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
292 config_msg["driver"]))
293 except (DbException, MsgException) as e:
294 raise VimAdminException(str(e), http_code=e.http_code)
295
tiernobc891ce2020-12-06 18:27:16 +0000296 self.logger.info("Starting")
tierno70eeb182020-10-19 16:38:00 +0000297 while not self.to_terminate:
298 try:
299 self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
300 # except asyncio.CancelledError:
301 # break # if cancelled it should end, breaking loop
302 except Exception as e:
303 if not self.to_terminate:
304 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
305
tiernobc891ce2020-12-06 18:27:16 +0000306 self.logger.info("Finishing")
tierno70eeb182020-10-19 16:38:00 +0000307 self._stop()
308 self.loop.close()
309
310 async def _msg_callback(self, topic, command, params):
311 """
312 Callback to process a received message from kafka
313 :param topic: topic received
314 :param command: command received
315 :param params: rest of parameters
316 :return: None
317 """
318 try:
319 if command == "echo":
320 return
321 if topic in self.kafka_topics:
322 target = topic[0:3] # vim, wim or sdn
323 target_id = target + ":" + params["_id"]
324 if command in ("edited", "edit"):
325 self.engine.reload_vim(target_id)
326 self.logger.debug("ordered to reload {}".format(target_id))
327 elif command in ("deleted", "delete"):
328 self.engine.unload_vim(target_id)
329 self.logger.debug("ordered to unload {}".format(target_id))
330 elif command in ("create", "created"):
331 self.engine.check_vim(target_id)
332 self.logger.debug("ordered to check {}".format(target_id))
333
tiernoa8ebeb12020-12-09 15:06:01 +0000334 except (DbException, MsgException) as e:
tierno70eeb182020-10-19 16:38:00 +0000335 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
336 except Exception as e:
337 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
338 exc_info=True)
339
340 def _stop(self):
341 """
342 Close all connections
343 :return: None
344 """
345 try:
346 if self.db:
347 self.db.db_disconnect()
348 if self.msg:
349 self.msg.disconnect()
350 except (DbException, MsgException) as e:
351 raise VimAdminException(str(e), http_code=e.http_code)
352
353 def terminate(self):
354 """
355 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
356 but not immediately.
357 :return: None
358 """
359 self.to_terminate = True
tiernoa8ebeb12020-12-09 15:06:01 +0000360 self.lock_renew.to_terminate = True
tierno70eeb182020-10-19 16:38:00 +0000361 if self.aiomain_task_kafka:
362 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
363 if self.aiomain_task_vim:
364 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)
tiernoa8ebeb12020-12-09 15:06:01 +0000365 if self.aiomain_task_renew_lock:
366 self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel)
367 self.lock_renew.stop()