5af766c70fa9df14233a2ac1d264a6db4f850b5c
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20 """
21
22 import logging
23 import threading
24 import asyncio
25 from http import HTTPStatus
26
27 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28 from osm_common.dbbase import DbException
29 from osm_common.msgbase import MsgException
30 from osm_ng_ro.ns import NsException
31 from time import time
32
33 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
34
35
36 class VimAdminException(Exception):
37
38 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class VimAdminThread(threading.Thread):
44 MAX_TIME_LOCKED = 3600 # 1h
45 MAX_TIME_UNATTENDED = 600 # 10min
46 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h
47 kafka_topics = ("vim_account", "wim_account", "sdn")
48
49 def __init__(self, config, engine):
50 """
51 Constructor of class
52 :param config: configuration parameters of database and messaging
53 :param engine: an instance of Engine class, used for deleting instances
54 """
55 threading.Thread.__init__(self)
56 self.to_terminate = False
57 self.config = config
58 self.db = None
59 self.msg = None
60 self.engine = engine
61 self.loop = None
62 self.last_rotask_time = 0
63 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM
64 self.logger = logging.getLogger("ro.vimadmin")
65 self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
66 self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
67
68 async def vim_watcher(self):
69 """ Reads database periodically looking for tasks not processed by nobody because of a reboot
70 in order to load this vim"""
71 # firstly read VIMS not processed
72 for target_database in ("vim_accounts", "wim_accounts", "sdns"):
73 unattended_targets = self.db.get_list(target_database,
74 q_filter={"_admin.operations.operationState": "PROCESSING"})
75 for target in unattended_targets:
76 target_id = "{}:{}".format(target_database[:3], target["_id"])
77 self.logger.info("ordered to check {}".format(target_id))
78 self.engine.check_vim(target_id)
79
80 while not self.to_terminate:
81 now = time()
82 processed_vims = []
83 if not self.last_rotask_time:
84 self.last_rotask_time = 0
85 ro_tasks = self.db.get_list("ro_tasks",
86 q_filter={"target_id.ncont": self.engine.get_assigned_vims(),
87 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
88 "locked_at.lt": now - self.MAX_TIME_LOCKED,
89 "to_check_at.gt": self.last_rotask_time,
90 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
91 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
92 for ro_task in ro_tasks:
93 # if already checked ignore
94 if ro_task["target_id"] in processed_vims:
95 continue
96 processed_vims.append(ro_task["target_id"])
97 # if already assigned ignore
98 if ro_task["target_id"] in self.engine.get_assigned_vims():
99 continue
100 # if there is some task locked on this VIM, there is an RO working on it, so ignore
101 if self.db.get_list("ro_tasks",
102 q_filter={"target_id": ro_task["target_id"],
103 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
104 "locked_at.gt": now - self.MAX_TIME_LOCKED}):
105 continue
106 # unattended, assign vim
107 self.engine.assign_vim(ro_task["target_id"])
108 self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
109
110 # every 2 hours check if there are vims without any ro_task and unload it
111 if now > self.next_check_unused_vim:
112 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM
113 self.engine.unload_unused_vims()
114 await asyncio.sleep(self.MAX_TIME_UNATTENDED, loop=self.loop)
115
116 async def aiomain(self):
117 kafka_working = True
118 while not self.to_terminate:
119 try:
120 if not self.aiomain_task_kafka:
121 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
122 await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
123 kafka_working = True
124 self.logger.debug("Starting vim_account subscription task")
125 self.aiomain_task_kafka = asyncio.ensure_future(
126 self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
127 aiocallback=self._msg_callback),
128 loop=self.loop)
129 if not self.aiomain_task_vim:
130 self.aiomain_task_vim = asyncio.ensure_future(
131 self.vim_watcher(),
132 loop=self.loop)
133 done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
134 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
135 try:
136 if self.aiomain_task_kafka in done:
137 exc = self.aiomain_task_kafka.exception()
138 self.logger.error("kafka subscription task exception: {}".format(exc))
139 self.aiomain_task_kafka = None
140 if self.aiomain_task_vim in done:
141 exc = self.aiomain_task_vim.exception()
142 self.logger.error("vim_account watcher task exception: {}".format(exc))
143 self.aiomain_task_vim = None
144 except asyncio.CancelledError:
145 pass
146
147 except Exception as e:
148 if self.to_terminate:
149 return
150 if kafka_working:
151 # logging only first time
152 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
153 kafka_working = False
154 await asyncio.sleep(10, loop=self.loop)
155
156 def run(self):
157 """
158 Start of the thread
159 :return: None
160 """
161 self.loop = asyncio.new_event_loop()
162 try:
163 if not self.db:
164 if self.config["database"]["driver"] == "mongo":
165 self.db = dbmongo.DbMongo()
166 self.db.db_connect(self.config["database"])
167 elif self.config["database"]["driver"] == "memory":
168 self.db = dbmemory.DbMemory()
169 self.db.db_connect(self.config["database"])
170 else:
171 raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
172 self.config["database"]["driver"]))
173 if not self.msg:
174 config_msg = self.config["message"].copy()
175 config_msg["loop"] = self.loop
176 if config_msg["driver"] == "local":
177 self.msg = msglocal.MsgLocal()
178 self.msg.connect(config_msg)
179 elif config_msg["driver"] == "kafka":
180 self.msg = msgkafka.MsgKafka()
181 self.msg.connect(config_msg)
182 else:
183 raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
184 config_msg["driver"]))
185 except (DbException, MsgException) as e:
186 raise VimAdminException(str(e), http_code=e.http_code)
187
188 self.logger.info("Starting")
189 while not self.to_terminate:
190 try:
191 self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
192 # except asyncio.CancelledError:
193 # break # if cancelled it should end, breaking loop
194 except Exception as e:
195 if not self.to_terminate:
196 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
197
198 self.logger.info("Finishing")
199 self._stop()
200 self.loop.close()
201
202 async def _msg_callback(self, topic, command, params):
203 """
204 Callback to process a received message from kafka
205 :param topic: topic received
206 :param command: command received
207 :param params: rest of parameters
208 :return: None
209 """
210 try:
211 if command == "echo":
212 return
213 if topic in self.kafka_topics:
214 target = topic[0:3] # vim, wim or sdn
215 target_id = target + ":" + params["_id"]
216 if command in ("edited", "edit"):
217 self.engine.reload_vim(target_id)
218 self.logger.debug("ordered to reload {}".format(target_id))
219 elif command in ("deleted", "delete"):
220 self.engine.unload_vim(target_id)
221 self.logger.debug("ordered to unload {}".format(target_id))
222 elif command in ("create", "created"):
223 self.engine.check_vim(target_id)
224 self.logger.debug("ordered to check {}".format(target_id))
225
226 except (NsException, DbException, MsgException) as e:
227 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
228 except Exception as e:
229 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
230 exc_info=True)
231
232 def _stop(self):
233 """
234 Close all connections
235 :return: None
236 """
237 try:
238 if self.db:
239 self.db.db_disconnect()
240 if self.msg:
241 self.msg.disconnect()
242 except (DbException, MsgException) as e:
243 raise VimAdminException(str(e), http_code=e.http_code)
244
245 def terminate(self):
246 """
247 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
248 but not immediately.
249 :return: None
250 """
251 self.to_terminate = True
252 if self.aiomain_task_kafka:
253 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
254 if self.aiomain_task_vim:
255 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)