Feature 7184 New Generation RO enhancemnt
[osm/RO.git] / NG-RO / osm_ng_ro / vim_admin.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
20 """
21
22 import logging
23 import threading
24 import asyncio
25 from http import HTTPStatus
26
27 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28 from osm_common.dbbase import DbException
29 from osm_common.msgbase import MsgException
30 from osm_ng_ro.ns import NsException
31 from time import time
32
33 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
34
35
36 class VimAdminException(Exception):
37
38 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class VimAdminThread(threading.Thread):
44 MAX_TIME_LOCKED = 3600 # 1h
45 MAX_TIME_UNATTENDED = 60 # 600 # 10min
46 kafka_topics = ("vim_account", "wim_account", "sdn")
47
48 def __init__(self, config, engine):
49 """
50 Constructor of class
51 :param config: configuration parameters of database and messaging
52 :param engine: an instance of Engine class, used for deleting instances
53 """
54 threading.Thread.__init__(self)
55 self.to_terminate = False
56 self.config = config
57 self.db = None
58 self.msg = None
59 self.engine = engine
60 self.loop = None
61 self.last_rotask_time = 0
62 self.logger = logging.getLogger("ro.vimadmin")
63 self.aiomain_task_kafka = None # asyncio task for receiving vim actions from kafka bus
64 self.aiomain_task_vim = None # asyncio task for watching ro_tasks not processed by nobody
65
66 async def vim_watcher(self):
67 """ Reads database periodically looking for tasks not processed by nobody because of a restar
68 in order to load this vim"""
69 while not self.to_terminate:
70 now = time()
71 if not self.last_rotask_time:
72 self.last_rotask_time = 0
73 ro_tasks = self.db.get_list("ro_tasks",
74 q_filter={"target_id.ncont": self.engine.assignment_list,
75 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
76 "locked_at.lt": now - self.MAX_TIME_LOCKED,
77 "to_check_at.gt": self.last_rotask_time,
78 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED})
79 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED
80 for ro_task in ro_tasks:
81 if ro_task["target_id"] not in self.engine.assignment_list:
82 self.engine.assign_vim(ro_task["target_id"])
83 self.logger.debug("ordered to load {}. Inactivity detected".format(ro_task["target_id"]))
84
85 await asyncio.sleep(300, loop=self.loop)
86
87 async def aiomain(self):
88 kafka_working = True
89 while not self.to_terminate:
90 try:
91 if not self.aiomain_task_kafka:
92 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
93 await self.msg.aiowrite("vim_account", "echo", "dummy message", loop=self.loop)
94 kafka_working = True
95 self.logger.debug("Starting vim_account subscription task")
96 self.aiomain_task_kafka = asyncio.ensure_future(
97 self.msg.aioread(self.kafka_topics, loop=self.loop, group_id=False,
98 aiocallback=self._msg_callback),
99 loop=self.loop)
100 if not self.aiomain_task_vim:
101 self.aiomain_task_vim = asyncio.ensure_future(
102 self.vim_watcher(),
103 loop=self.loop)
104 done, _ = await asyncio.wait([self.aiomain_task_kafka, self.aiomain_task_vim],
105 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
106 try:
107 if self.aiomain_task_kafka in done:
108 exc = self.aiomain_task_kafka.exception()
109 self.logger.error("kafka subscription task exception: {}".format(exc))
110 self.aiomain_task_kafka = None
111 if self.aiomain_task_vim in done:
112 exc = self.aiomain_task_vim.exception()
113 self.logger.error("vim_account watcher task exception: {}".format(exc))
114 self.aiomain_task_vim = None
115 except asyncio.CancelledError:
116 pass
117
118 except Exception as e:
119 if self.to_terminate:
120 return
121 if kafka_working:
122 # logging only first time
123 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
124 kafka_working = False
125 await asyncio.sleep(10, loop=self.loop)
126
127 def run(self):
128 """
129 Start of the thread
130 :return: None
131 """
132 self.loop = asyncio.new_event_loop()
133 try:
134 if not self.db:
135 if self.config["database"]["driver"] == "mongo":
136 self.db = dbmongo.DbMongo()
137 self.db.db_connect(self.config["database"])
138 elif self.config["database"]["driver"] == "memory":
139 self.db = dbmemory.DbMemory()
140 self.db.db_connect(self.config["database"])
141 else:
142 raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
143 self.config["database"]["driver"]))
144 if not self.msg:
145 config_msg = self.config["message"].copy()
146 config_msg["loop"] = self.loop
147 if config_msg["driver"] == "local":
148 self.msg = msglocal.MsgLocal()
149 self.msg.connect(config_msg)
150 elif config_msg["driver"] == "kafka":
151 self.msg = msgkafka.MsgKafka()
152 self.msg.connect(config_msg)
153 else:
154 raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
155 config_msg["driver"]))
156 except (DbException, MsgException) as e:
157 raise VimAdminException(str(e), http_code=e.http_code)
158
159 self.logger.debug("Starting")
160 while not self.to_terminate:
161 try:
162 self.loop.run_until_complete(asyncio.ensure_future(self.aiomain(), loop=self.loop))
163 # except asyncio.CancelledError:
164 # break # if cancelled it should end, breaking loop
165 except Exception as e:
166 if not self.to_terminate:
167 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
168
169 self.logger.debug("Finishing")
170 self._stop()
171 self.loop.close()
172
173 async def _msg_callback(self, topic, command, params):
174 """
175 Callback to process a received message from kafka
176 :param topic: topic received
177 :param command: command received
178 :param params: rest of parameters
179 :return: None
180 """
181 try:
182 if command == "echo":
183 return
184 if topic in self.kafka_topics:
185 target = topic[0:3] # vim, wim or sdn
186 target_id = target + ":" + params["_id"]
187 if command in ("edited", "edit"):
188 self.engine.reload_vim(target_id)
189 self.logger.debug("ordered to reload {}".format(target_id))
190 elif command in ("deleted", "delete"):
191 self.engine.unload_vim(target_id)
192 self.logger.debug("ordered to unload {}".format(target_id))
193 elif command in ("create", "created"):
194 self.engine.check_vim(target_id)
195 self.logger.debug("ordered to check {}".format(target_id))
196
197 except (NsException, DbException, MsgException) as e:
198 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
199 except Exception as e:
200 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
201 exc_info=True)
202
203 def _stop(self):
204 """
205 Close all connections
206 :return: None
207 """
208 try:
209 if self.db:
210 self.db.db_disconnect()
211 if self.msg:
212 self.msg.disconnect()
213 except (DbException, MsgException) as e:
214 raise VimAdminException(str(e), http_code=e.http_code)
215
216 def terminate(self):
217 """
218 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
219 but not immediately.
220 :return: None
221 """
222 self.to_terminate = True
223 if self.aiomain_task_kafka:
224 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel)
225 if self.aiomain_task_vim:
226 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel)