Code Coverage

Cobertura Coverage Report > osm_nbi >

subscriptions.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
subscriptions.py
0%
0/1
0%
0/176
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
subscriptions.py
0%
0/176
N/A

Source

osm_nbi/subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 0 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 0 import logging
24 0 import threading
25 0 import asyncio
26 0 from http import HTTPStatus
27
28 0 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29 0 from osm_common.dbbase import DbException
30 0 from osm_common.msgbase import MsgException
31 0 from osm_nbi.engine import EngineException
32 0 from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
33
34 0 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37 0 class SubscriptionException(Exception):
38 0     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 0         self.http_code = http_code
40 0         Exception.__init__(self, message)
41
42
43 0 class SubscriptionThread(threading.Thread):
44 0     def __init__(self, config, engine):
45         """
46         Constructor of class
47         :param config: configuration parameters of database and messaging
48         :param engine: an instance of Engine class, used for deleting instances
49         """
50 0         threading.Thread.__init__(self)
51 0         self.to_terminate = False
52 0         self.config = config
53 0         self.db = None
54 0         self.msg = None
55 0         self.engine = engine
56 0         self.loop = None
57 0         self.logger = logging.getLogger("nbi.subscriptions")
58 0         self.aiomain_task_admin = (
59             None  # asyncio task for receiving admin actions from kafka bus
60         )
61 0         self.aiomain_task = (
62             None  # asyncio task for receiving normal actions from kafka bus
63         )
64 0         self.internal_session = {  # used for a session to the engine methods
65             "project_id": (),
66             "set_project": (),
67             "admin": True,
68             "force": False,
69             "public": None,
70             "method": "delete",
71         }
72 0         self.nslcm = None
73 0         self.vnflcm = None
74
75 0     async def start_kafka(self):
76         # timeout_wait_for_kafka = 3*60
77 0         kafka_working = True
78 0         while not self.to_terminate:
79 0             try:
80                 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
81                 # created.
82                 # Before subscribe, send dummy messages
83 0                 await self.msg.aiowrite(
84                     "admin", "echo", "dummy message", loop=self.loop
85                 )
86 0                 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
87 0                 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
88 0                 await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop)
89 0                 if not kafka_working:
90 0                     self.logger.critical("kafka is working again")
91 0                     kafka_working = True
92 0                 if not self.aiomain_task_admin:
93 0                     await asyncio.sleep(10, loop=self.loop)
94 0                     self.logger.debug("Starting admin subscription task")
95 0                     self.aiomain_task_admin = asyncio.ensure_future(
96                         self.msg.aioread(
97                             ("admin",),
98                             loop=self.loop,
99                             group_id=False,
100                             aiocallback=self._msg_callback,
101                         ),
102                         loop=self.loop,
103                     )
104 0                 if not self.aiomain_task:
105 0                     await asyncio.sleep(10, loop=self.loop)
106 0                     self.logger.debug("Starting non-admin subscription task")
107 0                     self.aiomain_task = asyncio.ensure_future(
108                         self.msg.aioread(
109                             ("ns", "nsi", "vnf"),
110                             loop=self.loop,
111                             aiocallback=self._msg_callback,
112                         ),
113                         loop=self.loop,
114                     )
115 0                 done, _ = await asyncio.wait(
116                     [self.aiomain_task, self.aiomain_task_admin],
117                     timeout=None,
118                     loop=self.loop,
119                     return_when=asyncio.FIRST_COMPLETED,
120                 )
121 0                 try:
122 0                     if self.aiomain_task_admin in done:
123 0                         exc = self.aiomain_task_admin.exception()
124 0                         self.logger.error(
125                             "admin subscription task exception: {}".format(exc)
126                         )
127 0                         self.aiomain_task_admin = None
128 0                     if self.aiomain_task in done:
129 0                         exc = self.aiomain_task.exception()
130 0                         self.logger.error(
131                             "non-admin subscription task exception: {}".format(exc)
132                         )
133 0                         self.aiomain_task = None
134 0                 except asyncio.CancelledError:
135 0                     pass
136 0             except Exception as e:
137 0                 if self.to_terminate:
138 0                     return
139 0                 if kafka_working:
140                     # logging only first time
141 0                     self.logger.critical(
142                         "Error accessing kafka '{}'. Retrying ...".format(e)
143                     )
144 0                     kafka_working = False
145 0             await asyncio.sleep(10, loop=self.loop)
146
147 0     def run(self):
148         """
149         Start of the thread
150         :return: None
151         """
152 0         self.loop = asyncio.new_event_loop()
153 0         try:
154 0             if not self.db:
155 0                 if self.config["database"]["driver"] == "mongo":
156 0                     self.db = dbmongo.DbMongo()
157 0                     self.db.db_connect(self.config["database"])
158 0                 elif self.config["database"]["driver"] == "memory":
159 0                     self.db = dbmemory.DbMemory()
160 0                     self.db.db_connect(self.config["database"])
161                 else:
162 0                     raise SubscriptionException(
163                         "Invalid configuration param '{}' at '[database]':'driver'".format(
164                             self.config["database"]["driver"]
165                         )
166                     )
167 0             if not self.msg:
168 0                 config_msg = self.config["message"].copy()
169 0                 config_msg["loop"] = self.loop
170 0                 if config_msg["driver"] == "local":
171 0                     self.msg = msglocal.MsgLocal()
172 0                     self.msg.connect(config_msg)
173 0                 elif config_msg["driver"] == "kafka":
174 0                     self.msg = msgkafka.MsgKafka()
175 0                     self.msg.connect(config_msg)
176                 else:
177 0                     raise SubscriptionException(
178                         "Invalid configuration param '{}' at '[message]':'driver'".format(
179                             config_msg["driver"]
180                         )
181                     )
182 0             self.nslcm = NsLcmNotification(self.db)
183 0             self.vnflcm = VnfLcmNotification(self.db)
184 0         except (DbException, MsgException) as e:
185 0             raise SubscriptionException(str(e), http_code=e.http_code)
186
187 0         self.logger.debug("Starting")
188 0         while not self.to_terminate:
189 0             try:
190
191 0                 self.loop.run_until_complete(
192                     asyncio.ensure_future(self.start_kafka(), loop=self.loop)
193                 )
194             # except asyncio.CancelledError:
195             #     break  # if cancelled it should end, breaking loop
196 0             except Exception as e:
197 0                 if not self.to_terminate:
198 0                     self.logger.exception(
199                         "Exception '{}' at messaging read loop".format(e), exc_info=True
200                     )
201
202 0         self.logger.debug("Finishing")
203 0         self._stop()
204 0         self.loop.close()
205
206 0     async def _msg_callback(self, topic, command, params):
207         """
208         Callback to process a received message from kafka
209         :param topic:  topic received
210         :param command:  command received
211         :param params: rest of parameters
212         :return: None
213         """
214 0         msg_to_send = []
215 0         try:
216 0             if topic == "ns":
217 0                 if command == "terminated" and params["operationState"] in (
218                     "COMPLETED",
219                     "PARTIALLY_COMPLETED",
220                 ):
221 0                     self.logger.debug("received ns terminated {}".format(params))
222 0                     if params.get("autoremove"):
223 0                         self.engine.del_item(
224                             self.internal_session,
225                             "nsrs",
226                             _id=params["nsr_id"],
227                             not_send_msg=msg_to_send,
228                         )
229 0                         self.logger.debug(
230                             "ns={} deleted from database".format(params["nsr_id"])
231                         )
232                 # Check for nslcm notification
233 0                 if isinstance(params, dict):
234                     # Check availability of operationState and command
235 0                     if (
236                         (not params.get("operationState"))
237                         or (not command)
238                         or (not params.get("operationParams"))
239                     ):
240 0                         self.logger.debug(
241                             "Message can not be used for notification of nslcm"
242                         )
243                     else:
244 0                         nsd_id = params["operationParams"].get("nsdId")
245 0                         ns_instance_id = params["operationParams"].get("nsInstanceId")
246                         # Any one among nsd_id, ns_instance_id should be present.
247 0                         if not (nsd_id or ns_instance_id):
248 0                             self.logger.debug(
249                                 "Message can not be used for notification of nslcm"
250                             )
251                         else:
252 0                             op_state = params["operationState"]
253 0                             event_details = {
254                                 "topic": topic,
255                                 "command": command.upper(),
256                                 "params": params,
257                             }
258 0                             subscribers = self.nslcm.get_subscribers(
259                                 nsd_id,
260                                 ns_instance_id,
261                                 command.upper(),
262                                 op_state,
263                                 event_details,
264                             )
265                             # self.logger.debug("subscribers list: ")
266                             # self.logger.debug(subscribers)
267 0                             if subscribers:
268 0                                 asyncio.ensure_future(
269                                     self.nslcm.send_notifications(
270                                         subscribers, loop=self.loop
271                                     ),
272                                     loop=self.loop,
273                                 )
274                 else:
275 0                     self.logger.debug(
276                         "Message can not be used for notification of nslcm"
277                     )
278 0             elif topic == "vnf":
279 0                 if isinstance(params, dict):
280 0                     vnfd_id = params["vnfdId"]
281 0                     vnf_instance_id = params["vnfInstanceId"]
282 0                     if command == "create" or command == "delete":
283 0                         op_state = command
284                     else:
285 0                         op_state = params["operationState"]
286 0                     event_details = {
287                             "topic": topic,
288                             "command": command.upper(),
289                             "params": params,
290                             }
291 0                     subscribers = self.vnflcm.get_subscribers(
292                             vnfd_id,
293                             vnf_instance_id,
294                             command.upper(),
295                             op_state,
296                             event_details
297                             )
298 0                     if subscribers:
299 0                         asyncio.ensure_future(
300                                 self.vnflcm.send_notifications(
301                                     subscribers, loop=self.loop
302                                 ),
303                                 loop=self.loop
304                             )
305 0             elif topic == "nsi":
306 0                 if command == "terminated" and params["operationState"] in (
307                     "COMPLETED",
308                     "PARTIALLY_COMPLETED",
309                 ):
310 0                     self.logger.debug("received nsi terminated {}".format(params))
311 0                     if params.get("autoremove"):
312 0                         self.engine.del_item(
313                             self.internal_session,
314                             "nsis",
315                             _id=params["nsir_id"],
316                             not_send_msg=msg_to_send,
317                         )
318 0                         self.logger.debug(
319                             "nsis={} deleted from database".format(params["nsir_id"])
320                         )
321 0             elif topic == "admin":
322 0                 self.logger.debug("received {} {} {}".format(topic, command, params))
323 0                 if command in ["echo", "ping"]:  # ignored commands
324 0                     pass
325 0                 elif command == "revoke_token":
326 0                     if params:
327 0                         if isinstance(params, dict) and "_id" in params:
328 0                             tid = params.get("_id")
329 0                             self.engine.authenticator.tokens_cache.pop(tid, None)
330 0                             self.logger.debug(
331                                 "token '{}' removed from token_cache".format(tid)
332                             )
333                         else:
334 0                             self.logger.debug(
335                                 "unrecognized params in command '{} {}': {}".format(
336                                     topic, command, params
337                                 )
338                             )
339                     else:
340 0                         self.engine.authenticator.tokens_cache.clear()
341 0                         self.logger.debug("token_cache cleared")
342                 else:
343 0                     self.logger.debug(
344                         "unrecognized command '{} {}'".format(topic, command)
345                     )
346             # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
347             # but content to be written is stored at msg_to_send
348 0             for msg in msg_to_send:
349 0                 await self.msg.aiowrite(*msg, loop=self.loop)
350 0         except (EngineException, DbException, MsgException) as e:
351 0             self.logger.error(
352                 "Error while processing topic={} command={}: {}".format(
353                     topic, command, e
354                 )
355             )
356 0         except Exception as e:
357 0             self.logger.exception(
358                 "Exception while processing topic={} command={}: {}".format(
359                     topic, command, e
360                 ),
361                 exc_info=True,
362             )
363
364 0     def _stop(self):
365         """
366         Close all connections
367         :return: None
368         """
369 0         try:
370 0             if self.db:
371 0                 self.db.db_disconnect()
372 0             if self.msg:
373 0                 self.msg.disconnect()
374 0         except (DbException, MsgException) as e:
375 0             raise SubscriptionException(str(e), http_code=e.http_code)
376
377 0     def terminate(self):
378         """
379         This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
380         but not immediately.
381         :return: None
382         """
383 0         self.to_terminate = True
384 0         if self.aiomain_task:
385 0             self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
386 0         if self.aiomain_task_admin:
387 0             self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)