Replace yaml.load by yaml.safe_load
[osm/NBI.git] / osm_nbi / subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 import logging
24 import threading
25 import asyncio
26 from http import HTTPStatus
27
28 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29 from osm_common.dbbase import DbException
30 from osm_common.msgbase import MsgException
31 from osm_nbi.engine import EngineException
32 from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
33
34 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37 class SubscriptionException(Exception):
38 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class SubscriptionThread(threading.Thread):
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
51 self.to_terminate = False
52 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
58 self.aiomain_task_admin = (
59 None # asyncio task for receiving admin actions from kafka bus
60 )
61 self.aiomain_task = (
62 None # asyncio task for receiving normal actions from kafka bus
63 )
64 self.internal_session = { # used for a session to the engine methods
65 "project_id": (),
66 "set_project": (),
67 "admin": True,
68 "force": False,
69 "public": None,
70 "method": "delete",
71 }
72 self.nslcm = None
73 self.vnflcm = None
74
75 async def start_kafka(self):
76 # timeout_wait_for_kafka = 3*60
77 kafka_working = True
78 while not self.to_terminate:
79 try:
80 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
81 # created.
82 # Before subscribe, send dummy messages
83 await self.msg.aiowrite(
84 "admin", "echo", "dummy message", loop=self.loop
85 )
86 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
87 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
88 await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop)
89 if not kafka_working:
90 self.logger.critical("kafka is working again")
91 kafka_working = True
92 if not self.aiomain_task_admin:
93 await asyncio.sleep(10, loop=self.loop)
94 self.logger.debug("Starting admin subscription task")
95 self.aiomain_task_admin = asyncio.ensure_future(
96 self.msg.aioread(
97 ("admin",),
98 loop=self.loop,
99 group_id=False,
100 aiocallback=self._msg_callback,
101 ),
102 loop=self.loop,
103 )
104 if not self.aiomain_task:
105 await asyncio.sleep(10, loop=self.loop)
106 self.logger.debug("Starting non-admin subscription task")
107 self.aiomain_task = asyncio.ensure_future(
108 self.msg.aioread(
109 ("ns", "nsi", "vnf"),
110 loop=self.loop,
111 aiocallback=self._msg_callback,
112 ),
113 loop=self.loop,
114 )
115 done, _ = await asyncio.wait(
116 [self.aiomain_task, self.aiomain_task_admin],
117 timeout=None,
118 loop=self.loop,
119 return_when=asyncio.FIRST_COMPLETED,
120 )
121 try:
122 if self.aiomain_task_admin in done:
123 exc = self.aiomain_task_admin.exception()
124 self.logger.error(
125 "admin subscription task exception: {}".format(exc)
126 )
127 self.aiomain_task_admin = None
128 if self.aiomain_task in done:
129 exc = self.aiomain_task.exception()
130 self.logger.error(
131 "non-admin subscription task exception: {}".format(exc)
132 )
133 self.aiomain_task = None
134 except asyncio.CancelledError:
135 pass
136 except Exception as e:
137 if self.to_terminate:
138 return
139 if kafka_working:
140 # logging only first time
141 self.logger.critical(
142 "Error accessing kafka '{}'. Retrying ...".format(e)
143 )
144 kafka_working = False
145 await asyncio.sleep(10, loop=self.loop)
146
147 def run(self):
148 """
149 Start of the thread
150 :return: None
151 """
152 self.loop = asyncio.new_event_loop()
153 try:
154 if not self.db:
155 if self.config["database"]["driver"] == "mongo":
156 self.db = dbmongo.DbMongo()
157 self.db.db_connect(self.config["database"])
158 elif self.config["database"]["driver"] == "memory":
159 self.db = dbmemory.DbMemory()
160 self.db.db_connect(self.config["database"])
161 else:
162 raise SubscriptionException(
163 "Invalid configuration param '{}' at '[database]':'driver'".format(
164 self.config["database"]["driver"]
165 )
166 )
167 if not self.msg:
168 config_msg = self.config["message"].copy()
169 config_msg["loop"] = self.loop
170 if config_msg["driver"] == "local":
171 self.msg = msglocal.MsgLocal()
172 self.msg.connect(config_msg)
173 elif config_msg["driver"] == "kafka":
174 self.msg = msgkafka.MsgKafka()
175 self.msg.connect(config_msg)
176 else:
177 raise SubscriptionException(
178 "Invalid configuration param '{}' at '[message]':'driver'".format(
179 config_msg["driver"]
180 )
181 )
182 self.nslcm = NsLcmNotification(self.db)
183 self.vnflcm = VnfLcmNotification(self.db)
184 except (DbException, MsgException) as e:
185 raise SubscriptionException(str(e), http_code=e.http_code)
186
187 self.logger.debug("Starting")
188 while not self.to_terminate:
189 try:
190 self.loop.run_until_complete(
191 asyncio.ensure_future(self.start_kafka(), loop=self.loop)
192 )
193 # except asyncio.CancelledError:
194 # break # if cancelled it should end, breaking loop
195 except Exception as e:
196 if not self.to_terminate:
197 self.logger.exception(
198 "Exception '{}' at messaging read loop".format(e), exc_info=True
199 )
200
201 self.logger.debug("Finishing")
202 self._stop()
203 self.loop.close()
204
205 async def _msg_callback(self, topic, command, params):
206 """
207 Callback to process a received message from kafka
208 :param topic: topic received
209 :param command: command received
210 :param params: rest of parameters
211 :return: None
212 """
213 msg_to_send = []
214 try:
215 if topic == "ns":
216 if command == "terminated" and params["operationState"] in (
217 "COMPLETED",
218 "PARTIALLY_COMPLETED",
219 ):
220 self.logger.debug("received ns terminated {}".format(params))
221 if params.get("autoremove"):
222 self.engine.del_item(
223 self.internal_session,
224 "nsrs",
225 _id=params["nsr_id"],
226 not_send_msg=msg_to_send,
227 )
228 self.logger.debug(
229 "ns={} deleted from database".format(params["nsr_id"])
230 )
231 # Check for nslcm notification
232 if isinstance(params, dict):
233 # Check availability of operationState and command
234 if (
235 (not params.get("operationState"))
236 or (not command)
237 or (not params.get("operationParams"))
238 ):
239 self.logger.debug(
240 "Message can not be used for notification of nslcm"
241 )
242 else:
243 nsd_id = params["operationParams"].get("nsdId")
244 ns_instance_id = params["operationParams"].get("nsInstanceId")
245 # Any one among nsd_id, ns_instance_id should be present.
246 if not (nsd_id or ns_instance_id):
247 self.logger.debug(
248 "Message can not be used for notification of nslcm"
249 )
250 else:
251 op_state = params["operationState"]
252 event_details = {
253 "topic": topic,
254 "command": command.upper(),
255 "params": params,
256 }
257 subscribers = self.nslcm.get_subscribers(
258 nsd_id,
259 ns_instance_id,
260 command.upper(),
261 op_state,
262 event_details,
263 )
264 # self.logger.debug("subscribers list: ")
265 # self.logger.debug(subscribers)
266 if subscribers:
267 asyncio.ensure_future(
268 self.nslcm.send_notifications(
269 subscribers, loop=self.loop
270 ),
271 loop=self.loop,
272 )
273 else:
274 self.logger.debug(
275 "Message can not be used for notification of nslcm"
276 )
277 elif topic == "vnf":
278 if isinstance(params, dict):
279 vnfd_id = params["vnfdId"]
280 vnf_instance_id = params["vnfInstanceId"]
281 if command == "create" or command == "delete":
282 op_state = command
283 else:
284 op_state = params["operationState"]
285 event_details = {
286 "topic": topic,
287 "command": command.upper(),
288 "params": params,
289 }
290 subscribers = self.vnflcm.get_subscribers(
291 vnfd_id,
292 vnf_instance_id,
293 command.upper(),
294 op_state,
295 event_details,
296 )
297 if subscribers:
298 asyncio.ensure_future(
299 self.vnflcm.send_notifications(subscribers, loop=self.loop),
300 loop=self.loop,
301 )
302 elif topic == "nsi":
303 if command == "terminated" and params["operationState"] in (
304 "COMPLETED",
305 "PARTIALLY_COMPLETED",
306 ):
307 self.logger.debug("received nsi terminated {}".format(params))
308 if params.get("autoremove"):
309 self.engine.del_item(
310 self.internal_session,
311 "nsis",
312 _id=params["nsir_id"],
313 not_send_msg=msg_to_send,
314 )
315 self.logger.debug(
316 "nsis={} deleted from database".format(params["nsir_id"])
317 )
318 elif topic == "admin":
319 self.logger.debug("received {} {} {}".format(topic, command, params))
320 if command in ["echo", "ping"]: # ignored commands
321 pass
322 elif command == "revoke_token":
323 if params:
324 if isinstance(params, dict) and "_id" in params:
325 tid = params.get("_id")
326 self.engine.authenticator.tokens_cache.pop(tid, None)
327 self.logger.debug(
328 "token '{}' removed from token_cache".format(tid)
329 )
330 else:
331 self.logger.debug(
332 "unrecognized params in command '{} {}': {}".format(
333 topic, command, params
334 )
335 )
336 else:
337 self.engine.authenticator.tokens_cache.clear()
338 self.logger.debug("token_cache cleared")
339 else:
340 self.logger.debug(
341 "unrecognized command '{} {}'".format(topic, command)
342 )
343 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
344 # but content to be written is stored at msg_to_send
345 for msg in msg_to_send:
346 await self.msg.aiowrite(*msg, loop=self.loop)
347 except (EngineException, DbException, MsgException) as e:
348 self.logger.error(
349 "Error while processing topic={} command={}: {}".format(
350 topic, command, e
351 )
352 )
353 except Exception as e:
354 self.logger.exception(
355 "Exception while processing topic={} command={}: {}".format(
356 topic, command, e
357 ),
358 exc_info=True,
359 )
360
361 def _stop(self):
362 """
363 Close all connections
364 :return: None
365 """
366 try:
367 if self.db:
368 self.db.db_disconnect()
369 if self.msg:
370 self.msg.disconnect()
371 except (DbException, MsgException) as e:
372 raise SubscriptionException(str(e), http_code=e.http_code)
373
374 def terminate(self):
375 """
376 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
377 but not immediately.
378 :return: None
379 """
380 self.to_terminate = True
381 if self.aiomain_task:
382 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
383 if self.aiomain_task_admin:
384 self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)