Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 import logging
24 import threading
25 import asyncio
26 from http import HTTPStatus
27
28 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29 from osm_common.dbbase import DbException
30 from osm_common.msgbase import MsgException
31 from osm_nbi.engine import EngineException
32 from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
33
34 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37 class SubscriptionException(Exception):
38 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class SubscriptionThread(threading.Thread):
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
51 self.to_terminate = False
52 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.logger = logging.getLogger("nbi.subscriptions")
57 self.aiomain_task_admin = (
58 None # asyncio task for receiving admin actions from kafka bus
59 )
60 self.aiomain_task = (
61 None # asyncio task for receiving normal actions from kafka bus
62 )
63 self.internal_session = { # used for a session to the engine methods
64 "project_id": (),
65 "set_project": (),
66 "admin": True,
67 "force": False,
68 "public": None,
69 "method": "delete",
70 }
71 self.nslcm = None
72 self.vnflcm = None
73
74 async def start_kafka(self):
75 # timeout_wait_for_kafka = 3*60
76 kafka_working = True
77 while not self.to_terminate:
78 try:
79 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
80 # created.
81 # Before subscribe, send dummy messages
82 await self.msg.aiowrite(
83 "admin",
84 "echo",
85 "dummy message",
86 )
87 await self.msg.aiowrite("ns", "echo", "dummy message")
88 await self.msg.aiowrite("nsi", "echo", "dummy message")
89 await self.msg.aiowrite("vnf", "echo", "dummy message")
90 if not kafka_working:
91 self.logger.critical("kafka is working again")
92 kafka_working = True
93 if not self.aiomain_task_admin:
94 await asyncio.sleep(10)
95 self.logger.debug("Starting admin subscription task")
96 self.aiomain_task_admin = asyncio.ensure_future(
97 self.msg.aioread(
98 ("admin",),
99 group_id=False,
100 aiocallback=self._msg_callback,
101 ),
102 )
103 if not self.aiomain_task:
104 await asyncio.sleep(10)
105 self.logger.debug("Starting non-admin subscription task")
106 self.aiomain_task = asyncio.ensure_future(
107 self.msg.aioread(
108 ("ns", "nsi", "vnf"),
109 aiocallback=self._msg_callback,
110 ),
111 )
112 done, _ = await asyncio.wait(
113 [self.aiomain_task, self.aiomain_task_admin],
114 timeout=None,
115 return_when=asyncio.FIRST_COMPLETED,
116 )
117 try:
118 if self.aiomain_task_admin in done:
119 exc = self.aiomain_task_admin.exception()
120 self.logger.error(
121 "admin subscription task exception: {}".format(exc)
122 )
123 self.aiomain_task_admin = None
124 if self.aiomain_task in done:
125 exc = self.aiomain_task.exception()
126 self.logger.error(
127 "non-admin subscription task exception: {}".format(exc)
128 )
129 self.aiomain_task = None
130 except asyncio.CancelledError:
131 pass
132 except Exception as e:
133 if self.to_terminate:
134 return
135 if kafka_working:
136 # logging only first time
137 self.logger.critical(
138 "Error accessing kafka '{}'. Retrying ...".format(e)
139 )
140 kafka_working = False
141 await asyncio.sleep(10)
142
143 def run(self):
144 """
145 Start of the thread
146 :return: None
147 """
148 try:
149 if not self.db:
150 if self.config["database"]["driver"] == "mongo":
151 self.db = dbmongo.DbMongo()
152 self.db.db_connect(self.config["database"])
153 elif self.config["database"]["driver"] == "memory":
154 self.db = dbmemory.DbMemory()
155 self.db.db_connect(self.config["database"])
156 else:
157 raise SubscriptionException(
158 "Invalid configuration param '{}' at '[database]':'driver'".format(
159 self.config["database"]["driver"]
160 )
161 )
162 if not self.msg:
163 config_msg = self.config["message"].copy()
164 if config_msg["driver"] == "local":
165 self.msg = msglocal.MsgLocal()
166 self.msg.connect(config_msg)
167 elif config_msg["driver"] == "kafka":
168 self.msg = msgkafka.MsgKafka()
169 self.msg.connect(config_msg)
170 else:
171 raise SubscriptionException(
172 "Invalid configuration param '{}' at '[message]':'driver'".format(
173 config_msg["driver"]
174 )
175 )
176 self.nslcm = NsLcmNotification(self.db)
177 self.vnflcm = VnfLcmNotification(self.db)
178 except (DbException, MsgException) as e:
179 raise SubscriptionException(str(e), http_code=e.http_code)
180
181 self.logger.debug("Starting")
182 while not self.to_terminate:
183 try:
184 asyncio.run(self.start_kafka())
185 except Exception as e:
186 if not self.to_terminate:
187 self.logger.exception(
188 "Exception '{}' at messaging read loop".format(e), exc_info=True
189 )
190
191 self.logger.debug("Finishing")
192 self._stop()
193
194 async def _msg_callback(self, topic, command, params):
195 """
196 Callback to process a received message from kafka
197 :param topic: topic received
198 :param command: command received
199 :param params: rest of parameters
200 :return: None
201 """
202 msg_to_send = []
203 try:
204 if topic == "ns":
205 if command == "terminated" and params["operationState"] in (
206 "COMPLETED",
207 "PARTIALLY_COMPLETED",
208 ):
209 self.logger.debug("received ns terminated {}".format(params))
210 if params.get("autoremove"):
211 self.engine.del_item(
212 self.internal_session,
213 "nsrs",
214 _id=params["nsr_id"],
215 not_send_msg=msg_to_send,
216 )
217 self.logger.debug(
218 "ns={} deleted from database".format(params["nsr_id"])
219 )
220 # Check for nslcm notification
221 if isinstance(params, dict):
222 # Check availability of operationState and command
223 if (
224 (not params.get("operationState"))
225 or (not command)
226 or (not params.get("operationParams"))
227 ):
228 self.logger.debug(
229 "Message can not be used for notification of nslcm"
230 )
231 else:
232 nsd_id = params["operationParams"].get("nsdId")
233 ns_instance_id = params["operationParams"].get("nsInstanceId")
234 # Any one among nsd_id, ns_instance_id should be present.
235 if not (nsd_id or ns_instance_id):
236 self.logger.debug(
237 "Message can not be used for notification of nslcm"
238 )
239 else:
240 op_state = params["operationState"]
241 event_details = {
242 "topic": topic,
243 "command": command.upper(),
244 "params": params,
245 }
246 subscribers = self.nslcm.get_subscribers(
247 nsd_id,
248 ns_instance_id,
249 command.upper(),
250 op_state,
251 event_details,
252 )
253 # self.logger.debug("subscribers list: ")
254 # self.logger.debug(subscribers)
255 if subscribers:
256 asyncio.ensure_future(
257 self.nslcm.send_notifications(subscribers),
258 )
259 else:
260 self.logger.debug(
261 "Message can not be used for notification of nslcm"
262 )
263 elif topic == "vnf":
264 if isinstance(params, dict):
265 vnfd_id = params["vnfdId"]
266 vnf_instance_id = params["vnfInstanceId"]
267 if command == "create" or command == "delete":
268 op_state = command
269 else:
270 op_state = params["operationState"]
271 event_details = {
272 "topic": topic,
273 "command": command.upper(),
274 "params": params,
275 }
276 subscribers = self.vnflcm.get_subscribers(
277 vnfd_id,
278 vnf_instance_id,
279 command.upper(),
280 op_state,
281 event_details,
282 )
283 if subscribers:
284 asyncio.ensure_future(
285 self.vnflcm.send_notifications(subscribers),
286 )
287 elif topic == "nsi":
288 if command == "terminated" and params["operationState"] in (
289 "COMPLETED",
290 "PARTIALLY_COMPLETED",
291 ):
292 self.logger.debug("received nsi terminated {}".format(params))
293 if params.get("autoremove"):
294 self.engine.del_item(
295 self.internal_session,
296 "nsis",
297 _id=params["nsir_id"],
298 not_send_msg=msg_to_send,
299 )
300 self.logger.debug(
301 "nsis={} deleted from database".format(params["nsir_id"])
302 )
303 elif topic == "admin":
304 self.logger.debug("received {} {} {}".format(topic, command, params))
305 if command in ["echo", "ping"]: # ignored commands
306 pass
307 elif command == "revoke_token":
308 if params:
309 if isinstance(params, dict) and "_id" in params:
310 tid = params.get("_id")
311 self.engine.authenticator.tokens_cache.pop(tid, None)
312 self.logger.debug(
313 "token '{}' removed from token_cache".format(tid)
314 )
315 else:
316 self.logger.debug(
317 "unrecognized params in command '{} {}': {}".format(
318 topic, command, params
319 )
320 )
321 else:
322 self.engine.authenticator.tokens_cache.clear()
323 self.logger.debug("token_cache cleared")
324 else:
325 self.logger.debug(
326 "unrecognized command '{} {}'".format(topic, command)
327 )
328 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
329 # but content to be written is stored at msg_to_send
330 for msg in msg_to_send:
331 await self.msg.aiowrite(*msg)
332 except (EngineException, DbException, MsgException) as e:
333 self.logger.error(
334 "Error while processing topic={} command={}: {}".format(
335 topic, command, e
336 )
337 )
338 except Exception as e:
339 self.logger.exception(
340 "Exception while processing topic={} command={}: {}".format(
341 topic, command, e
342 ),
343 exc_info=True,
344 )
345
346 def _stop(self):
347 """
348 Close all connections
349 :return: None
350 """
351 try:
352 if self.db:
353 self.db.db_disconnect()
354 if self.msg:
355 self.msg.disconnect()
356 except (DbException, MsgException) as e:
357 raise SubscriptionException(str(e), http_code=e.http_code)
358
359 def terminate(self):
360 """
361 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
362 but not immediately.
363 :return: None
364 """
365 self.to_terminate = True
366 if self.aiomain_task:
367 asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel)
368 if self.aiomain_task_admin:
369 asyncio.get_event_loop().call_soon_threadsafe(
370 self.aiomain_task_admin.cancel
371 )