Bugfix 1550: Setting a custom release name for Helm based kdus
[osm/NBI.git] / osm_nbi / subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 import logging
24 import threading
25 import asyncio
26 from http import HTTPStatus
27
28 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29 from osm_common.dbbase import DbException
30 from osm_common.msgbase import MsgException
31 from osm_nbi.engine import EngineException
32 from osm_nbi.notifications import NsLcmNotification
33
34 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37 class SubscriptionException(Exception):
38 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class SubscriptionThread(threading.Thread):
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
51 self.to_terminate = False
52 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
58 self.aiomain_task_admin = (
59 None # asyncio task for receiving admin actions from kafka bus
60 )
61 self.aiomain_task = (
62 None # asyncio task for receiving normal actions from kafka bus
63 )
64 self.internal_session = { # used for a session to the engine methods
65 "project_id": (),
66 "set_project": (),
67 "admin": True,
68 "force": False,
69 "public": None,
70 "method": "delete",
71 }
72 self.nslcm = None
73
74 async def start_kafka(self):
75 # timeout_wait_for_kafka = 3*60
76 kafka_working = True
77 while not self.to_terminate:
78 try:
79 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
80 # created.
81 # Before subscribe, send dummy messages
82 await self.msg.aiowrite(
83 "admin", "echo", "dummy message", loop=self.loop
84 )
85 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
86 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
87 if not kafka_working:
88 self.logger.critical("kafka is working again")
89 kafka_working = True
90 if not self.aiomain_task_admin:
91 await asyncio.sleep(10, loop=self.loop)
92 self.logger.debug("Starting admin subscription task")
93 self.aiomain_task_admin = asyncio.ensure_future(
94 self.msg.aioread(
95 ("admin",),
96 loop=self.loop,
97 group_id=False,
98 aiocallback=self._msg_callback,
99 ),
100 loop=self.loop,
101 )
102 if not self.aiomain_task:
103 await asyncio.sleep(10, loop=self.loop)
104 self.logger.debug("Starting non-admin subscription task")
105 self.aiomain_task = asyncio.ensure_future(
106 self.msg.aioread(
107 ("ns", "nsi"),
108 loop=self.loop,
109 aiocallback=self._msg_callback,
110 ),
111 loop=self.loop,
112 )
113 done, _ = await asyncio.wait(
114 [self.aiomain_task, self.aiomain_task_admin],
115 timeout=None,
116 loop=self.loop,
117 return_when=asyncio.FIRST_COMPLETED,
118 )
119 try:
120 if self.aiomain_task_admin in done:
121 exc = self.aiomain_task_admin.exception()
122 self.logger.error(
123 "admin subscription task exception: {}".format(exc)
124 )
125 self.aiomain_task_admin = None
126 if self.aiomain_task in done:
127 exc = self.aiomain_task.exception()
128 self.logger.error(
129 "non-admin subscription task exception: {}".format(exc)
130 )
131 self.aiomain_task = None
132 except asyncio.CancelledError:
133 pass
134 except Exception as e:
135 if self.to_terminate:
136 return
137 if kafka_working:
138 # logging only first time
139 self.logger.critical(
140 "Error accessing kafka '{}'. Retrying ...".format(e)
141 )
142 kafka_working = False
143 await asyncio.sleep(10, loop=self.loop)
144
145 def run(self):
146 """
147 Start of the thread
148 :return: None
149 """
150 self.loop = asyncio.new_event_loop()
151 try:
152 if not self.db:
153 if self.config["database"]["driver"] == "mongo":
154 self.db = dbmongo.DbMongo()
155 self.db.db_connect(self.config["database"])
156 elif self.config["database"]["driver"] == "memory":
157 self.db = dbmemory.DbMemory()
158 self.db.db_connect(self.config["database"])
159 else:
160 raise SubscriptionException(
161 "Invalid configuration param '{}' at '[database]':'driver'".format(
162 self.config["database"]["driver"]
163 )
164 )
165 if not self.msg:
166 config_msg = self.config["message"].copy()
167 config_msg["loop"] = self.loop
168 if config_msg["driver"] == "local":
169 self.msg = msglocal.MsgLocal()
170 self.msg.connect(config_msg)
171 elif config_msg["driver"] == "kafka":
172 self.msg = msgkafka.MsgKafka()
173 self.msg.connect(config_msg)
174 else:
175 raise SubscriptionException(
176 "Invalid configuration param '{}' at '[message]':'driver'".format(
177 config_msg["driver"]
178 )
179 )
180 self.nslcm = NsLcmNotification(self.db)
181 except (DbException, MsgException) as e:
182 raise SubscriptionException(str(e), http_code=e.http_code)
183
184 self.logger.debug("Starting")
185 while not self.to_terminate:
186 try:
187
188 self.loop.run_until_complete(
189 asyncio.ensure_future(self.start_kafka(), loop=self.loop)
190 )
191 # except asyncio.CancelledError:
192 # break # if cancelled it should end, breaking loop
193 except Exception as e:
194 if not self.to_terminate:
195 self.logger.exception(
196 "Exception '{}' at messaging read loop".format(e), exc_info=True
197 )
198
199 self.logger.debug("Finishing")
200 self._stop()
201 self.loop.close()
202
203 async def _msg_callback(self, topic, command, params):
204 """
205 Callback to process a received message from kafka
206 :param topic: topic received
207 :param command: command received
208 :param params: rest of parameters
209 :return: None
210 """
211 msg_to_send = []
212 try:
213 if topic == "ns":
214 if command == "terminated" and params["operationState"] in (
215 "COMPLETED",
216 "PARTIALLY_COMPLETED",
217 ):
218 self.logger.debug("received ns terminated {}".format(params))
219 if params.get("autoremove"):
220 self.engine.del_item(
221 self.internal_session,
222 "nsrs",
223 _id=params["nsr_id"],
224 not_send_msg=msg_to_send,
225 )
226 self.logger.debug(
227 "ns={} deleted from database".format(params["nsr_id"])
228 )
229 # Check for nslcm notification
230 if isinstance(params, dict):
231 # Check availability of operationState and command
232 if (
233 (not params.get("operationState"))
234 or (not command)
235 or (not params.get("operationParams"))
236 ):
237 self.logger.debug(
238 "Message can not be used for notification of nslcm"
239 )
240 else:
241 nsd_id = params["operationParams"].get("nsdId")
242 ns_instance_id = params["operationParams"].get("nsInstanceId")
243 # Any one among nsd_id, ns_instance_id should be present.
244 if not (nsd_id or ns_instance_id):
245 self.logger.debug(
246 "Message can not be used for notification of nslcm"
247 )
248 else:
249 op_state = params["operationState"]
250 event_details = {
251 "topic": topic,
252 "command": command.upper(),
253 "params": params,
254 }
255 subscribers = self.nslcm.get_subscribers(
256 nsd_id,
257 ns_instance_id,
258 command.upper(),
259 op_state,
260 event_details,
261 )
262 # self.logger.debug("subscribers list: ")
263 # self.logger.debug(subscribers)
264 if subscribers:
265 asyncio.ensure_future(
266 self.nslcm.send_notifications(
267 subscribers, loop=self.loop
268 ),
269 loop=self.loop,
270 )
271 else:
272 self.logger.debug(
273 "Message can not be used for notification of nslcm"
274 )
275 elif topic == "nsi":
276 if command == "terminated" and params["operationState"] in (
277 "COMPLETED",
278 "PARTIALLY_COMPLETED",
279 ):
280 self.logger.debug("received nsi terminated {}".format(params))
281 if params.get("autoremove"):
282 self.engine.del_item(
283 self.internal_session,
284 "nsis",
285 _id=params["nsir_id"],
286 not_send_msg=msg_to_send,
287 )
288 self.logger.debug(
289 "nsis={} deleted from database".format(params["nsir_id"])
290 )
291 elif topic == "admin":
292 self.logger.debug("received {} {} {}".format(topic, command, params))
293 if command in ["echo", "ping"]: # ignored commands
294 pass
295 elif command == "revoke_token":
296 if params:
297 if isinstance(params, dict) and "_id" in params:
298 tid = params.get("_id")
299 self.engine.authenticator.tokens_cache.pop(tid, None)
300 self.logger.debug(
301 "token '{}' removed from token_cache".format(tid)
302 )
303 else:
304 self.logger.debug(
305 "unrecognized params in command '{} {}': {}".format(
306 topic, command, params
307 )
308 )
309 else:
310 self.engine.authenticator.tokens_cache.clear()
311 self.logger.debug("token_cache cleared")
312 else:
313 self.logger.debug(
314 "unrecognized command '{} {}'".format(topic, command)
315 )
316 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
317 # but content to be written is stored at msg_to_send
318 for msg in msg_to_send:
319 await self.msg.aiowrite(*msg, loop=self.loop)
320 except (EngineException, DbException, MsgException) as e:
321 self.logger.error(
322 "Error while processing topic={} command={}: {}".format(
323 topic, command, e
324 )
325 )
326 except Exception as e:
327 self.logger.exception(
328 "Exception while processing topic={} command={}: {}".format(
329 topic, command, e
330 ),
331 exc_info=True,
332 )
333
334 def _stop(self):
335 """
336 Close all connections
337 :return: None
338 """
339 try:
340 if self.db:
341 self.db.db_disconnect()
342 if self.msg:
343 self.msg.disconnect()
344 except (DbException, MsgException) as e:
345 raise SubscriptionException(str(e), http_code=e.http_code)
346
347 def terminate(self):
348 """
349 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
350 but not immediately.
351 :return: None
352 """
353 self.to_terminate = True
354 if self.aiomain_task:
355 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
356 if self.aiomain_task_admin:
357 self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)