Code Coverage

Cobertura Coverage Report > osm_nbi >

subscriptions.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
subscriptions.py
0%
0/1
0%
0/172
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
subscriptions.py
0%
0/172
N/A

Source

osm_nbi/subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 0 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 0 import logging
24 0 import threading
25 0 import asyncio
26 0 from http import HTTPStatus
27
28 0 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29 0 from osm_common.dbbase import DbException
30 0 from osm_common.msgbase import MsgException
31 0 from osm_nbi.engine import EngineException
32 0 from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
33
34 0 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37 0 class SubscriptionException(Exception):
38 0     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 0         self.http_code = http_code
40 0         Exception.__init__(self, message)
41
42
43 0 class SubscriptionThread(threading.Thread):
44 0     def __init__(self, config, engine):
45         """
46         Constructor of class
47         :param config: configuration parameters of database and messaging
48         :param engine: an instance of Engine class, used for deleting instances
49         """
50 0         threading.Thread.__init__(self)
51 0         self.to_terminate = False
52 0         self.config = config
53 0         self.db = None
54 0         self.msg = None
55 0         self.engine = engine
56 0         self.logger = logging.getLogger("nbi.subscriptions")
57 0         self.aiomain_task_admin = (
58             None  # asyncio task for receiving admin actions from kafka bus
59         )
60 0         self.aiomain_task = (
61             None  # asyncio task for receiving normal actions from kafka bus
62         )
63 0         self.internal_session = {  # used for a session to the engine methods
64             "project_id": (),
65             "set_project": (),
66             "admin": True,
67             "force": False,
68             "public": None,
69             "method": "delete",
70         }
71 0         self.nslcm = None
72 0         self.vnflcm = None
73
74 0     async def start_kafka(self):
75         # timeout_wait_for_kafka = 3*60
76 0         kafka_working = True
77 0         while not self.to_terminate:
78 0             try:
79                 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
80                 # created.
81                 # Before subscribe, send dummy messages
82 0                 await self.msg.aiowrite(
83                     "admin",
84                     "echo",
85                     "dummy message",
86                 )
87 0                 await self.msg.aiowrite("ns", "echo", "dummy message")
88 0                 await self.msg.aiowrite("nsi", "echo", "dummy message")
89 0                 await self.msg.aiowrite("vnf", "echo", "dummy message")
90 0                 if not kafka_working:
91 0                     self.logger.critical("kafka is working again")
92 0                     kafka_working = True
93 0                 if not self.aiomain_task_admin:
94 0                     await asyncio.sleep(10)
95 0                     self.logger.debug("Starting admin subscription task")
96 0                     self.aiomain_task_admin = asyncio.ensure_future(
97                         self.msg.aioread(
98                             ("admin",),
99                             group_id=False,
100                             aiocallback=self._msg_callback,
101                         ),
102                     )
103 0                 if not self.aiomain_task:
104 0                     await asyncio.sleep(10)
105 0                     self.logger.debug("Starting non-admin subscription task")
106 0                     self.aiomain_task = asyncio.ensure_future(
107                         self.msg.aioread(
108                             ("ns", "nsi", "vnf"),
109                             aiocallback=self._msg_callback,
110                         ),
111                     )
112 0                 done, _ = await asyncio.wait(
113                     [self.aiomain_task, self.aiomain_task_admin],
114                     timeout=None,
115                     return_when=asyncio.FIRST_COMPLETED,
116                 )
117 0                 try:
118 0                     if self.aiomain_task_admin in done:
119 0                         exc = self.aiomain_task_admin.exception()
120 0                         self.logger.error(
121                             "admin subscription task exception: {}".format(exc)
122                         )
123 0                         self.aiomain_task_admin = None
124 0                     if self.aiomain_task in done:
125 0                         exc = self.aiomain_task.exception()
126 0                         self.logger.error(
127                             "non-admin subscription task exception: {}".format(exc)
128                         )
129 0                         self.aiomain_task = None
130 0                 except asyncio.CancelledError:
131 0                     pass
132 0             except Exception as e:
133 0                 if self.to_terminate:
134 0                     return
135 0                 if kafka_working:
136                     # logging only first time
137 0                     self.logger.critical(
138                         "Error accessing kafka '{}'. Retrying ...".format(e)
139                     )
140 0                     kafka_working = False
141 0             await asyncio.sleep(10)
142
143 0     def run(self):
144         """
145         Start of the thread
146         :return: None
147         """
148 0         try:
149 0             if not self.db:
150 0                 if self.config["database"]["driver"] == "mongo":
151 0                     self.db = dbmongo.DbMongo()
152 0                     self.db.db_connect(self.config["database"])
153 0                 elif self.config["database"]["driver"] == "memory":
154 0                     self.db = dbmemory.DbMemory()
155 0                     self.db.db_connect(self.config["database"])
156                 else:
157 0                     raise SubscriptionException(
158                         "Invalid configuration param '{}' at '[database]':'driver'".format(
159                             self.config["database"]["driver"]
160                         )
161                     )
162 0             if not self.msg:
163 0                 config_msg = self.config["message"].copy()
164 0                 if config_msg["driver"] == "local":
165 0                     self.msg = msglocal.MsgLocal()
166 0                     self.msg.connect(config_msg)
167 0                 elif config_msg["driver"] == "kafka":
168 0                     self.msg = msgkafka.MsgKafka()
169 0                     self.msg.connect(config_msg)
170                 else:
171 0                     raise SubscriptionException(
172                         "Invalid configuration param '{}' at '[message]':'driver'".format(
173                             config_msg["driver"]
174                         )
175                     )
176 0             self.nslcm = NsLcmNotification(self.db)
177 0             self.vnflcm = VnfLcmNotification(self.db)
178 0         except (DbException, MsgException) as e:
179 0             raise SubscriptionException(str(e), http_code=e.http_code)
180
181 0         self.logger.debug("Starting")
182 0         while not self.to_terminate:
183 0             try:
184 0                 asyncio.run(self.start_kafka())
185 0             except Exception as e:
186 0                 if not self.to_terminate:
187 0                     self.logger.exception(
188                         "Exception '{}' at messaging read loop".format(e), exc_info=True
189                     )
190
191 0         self.logger.debug("Finishing")
192 0         self._stop()
193
194 0     async def _msg_callback(self, topic, command, params):
195         """
196         Callback to process a received message from kafka
197         :param topic:  topic received
198         :param command:  command received
199         :param params: rest of parameters
200         :return: None
201         """
202 0         msg_to_send = []
203 0         try:
204 0             if topic == "ns":
205 0                 if command == "terminated" and params["operationState"] in (
206                     "COMPLETED",
207                     "PARTIALLY_COMPLETED",
208                 ):
209 0                     self.logger.debug("received ns terminated {}".format(params))
210 0                     if params.get("autoremove"):
211 0                         self.engine.del_item(
212                             self.internal_session,
213                             "nsrs",
214                             _id=params["nsr_id"],
215                             not_send_msg=msg_to_send,
216                         )
217 0                         self.logger.debug(
218                             "ns={} deleted from database".format(params["nsr_id"])
219                         )
220                 # Check for nslcm notification
221 0                 if isinstance(params, dict):
222                     # Check availability of operationState and command
223 0                     if (
224                         (not params.get("operationState"))
225                         or (not command)
226                         or (not params.get("operationParams"))
227                     ):
228 0                         self.logger.debug(
229                             "Message can not be used for notification of nslcm"
230                         )
231                     else:
232 0                         nsd_id = params["operationParams"].get("nsdId")
233 0                         ns_instance_id = params["operationParams"].get("nsInstanceId")
234                         # Any one among nsd_id, ns_instance_id should be present.
235 0                         if not (nsd_id or ns_instance_id):
236 0                             self.logger.debug(
237                                 "Message can not be used for notification of nslcm"
238                             )
239                         else:
240 0                             op_state = params["operationState"]
241 0                             event_details = {
242                                 "topic": topic,
243                                 "command": command.upper(),
244                                 "params": params,
245                             }
246 0                             subscribers = self.nslcm.get_subscribers(
247                                 nsd_id,
248                                 ns_instance_id,
249                                 command.upper(),
250                                 op_state,
251                                 event_details,
252                             )
253                             # self.logger.debug("subscribers list: ")
254                             # self.logger.debug(subscribers)
255 0                             if subscribers:
256 0                                 asyncio.ensure_future(
257                                     self.nslcm.send_notifications(subscribers),
258                                 )
259                 else:
260 0                     self.logger.debug(
261                         "Message can not be used for notification of nslcm"
262                     )
263 0             elif topic == "vnf":
264 0                 if isinstance(params, dict):
265 0                     vnfd_id = params["vnfdId"]
266 0                     vnf_instance_id = params["vnfInstanceId"]
267 0                     if command == "create" or command == "delete":
268 0                         op_state = command
269                     else:
270 0                         op_state = params["operationState"]
271 0                     event_details = {
272                         "topic": topic,
273                         "command": command.upper(),
274                         "params": params,
275                     }
276 0                     subscribers = self.vnflcm.get_subscribers(
277                         vnfd_id,
278                         vnf_instance_id,
279                         command.upper(),
280                         op_state,
281                         event_details,
282                     )
283 0                     if subscribers:
284 0                         asyncio.ensure_future(
285                             self.vnflcm.send_notifications(subscribers),
286                         )
287 0             elif topic == "nsi":
288 0                 if command == "terminated" and params["operationState"] in (
289                     "COMPLETED",
290                     "PARTIALLY_COMPLETED",
291                 ):
292 0                     self.logger.debug("received nsi terminated {}".format(params))
293 0                     if params.get("autoremove"):
294 0                         self.engine.del_item(
295                             self.internal_session,
296                             "nsis",
297                             _id=params["nsir_id"],
298                             not_send_msg=msg_to_send,
299                         )
300 0                         self.logger.debug(
301                             "nsis={} deleted from database".format(params["nsir_id"])
302                         )
303 0             elif topic == "admin":
304 0                 self.logger.debug("received {} {} {}".format(topic, command, params))
305 0                 if command in ["echo", "ping"]:  # ignored commands
306 0                     pass
307 0                 elif command == "revoke_token":
308 0                     if params:
309 0                         if isinstance(params, dict) and "_id" in params:
310 0                             tid = params.get("_id")
311 0                             self.engine.authenticator.tokens_cache.pop(tid, None)
312 0                             self.logger.debug(
313                                 "token '{}' removed from token_cache".format(tid)
314                             )
315                         else:
316 0                             self.logger.debug(
317                                 "unrecognized params in command '{} {}': {}".format(
318                                     topic, command, params
319                                 )
320                             )
321                     else:
322 0                         self.engine.authenticator.tokens_cache.clear()
323 0                         self.logger.debug("token_cache cleared")
324                 else:
325 0                     self.logger.debug(
326                         "unrecognized command '{} {}'".format(topic, command)
327                     )
328             # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
329             # but content to be written is stored at msg_to_send
330 0             for msg in msg_to_send:
331 0                 await self.msg.aiowrite(*msg)
332 0         except (EngineException, DbException, MsgException) as e:
333 0             self.logger.error(
334                 "Error while processing topic={} command={}: {}".format(
335                     topic, command, e
336                 )
337             )
338 0         except Exception as e:
339 0             self.logger.exception(
340                 "Exception while processing topic={} command={}: {}".format(
341                     topic, command, e
342                 ),
343                 exc_info=True,
344             )
345
346 0     def _stop(self):
347         """
348         Close all connections
349         :return: None
350         """
351 0         try:
352 0             if self.db:
353 0                 self.db.db_disconnect()
354 0             if self.msg:
355 0                 self.msg.disconnect()
356 0         except (DbException, MsgException) as e:
357 0             raise SubscriptionException(str(e), http_code=e.http_code)
358
359 0     def terminate(self):
360         """
361         This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
362         but not immediately.
363         :return: None
364         """
365 0         self.to_terminate = True
366 0         if self.aiomain_task:
367 0             asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel)
368 0         if self.aiomain_task_admin:
369 0             asyncio.get_event_loop().call_soon_threadsafe(
370                 self.aiomain_task_admin.cancel
371             )