Code Coverage

Cobertura Coverage Report > osm_nbi >

subscriptions.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
subscriptions.py
0%
0/1
0%
0/162
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
subscriptions.py
0%
0/162
N/A

Source

osm_nbi/subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 0 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 0 import logging
24 0 import threading
25 0 import asyncio
26 0 from http import HTTPStatus
27
28 0 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29 0 from osm_common.dbbase import DbException
30 0 from osm_common.msgbase import MsgException
31 0 from osm_nbi.engine import EngineException
32 0 from osm_nbi.notifications import NsLcmNotification
33
34 0 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37 0 class SubscriptionException(Exception):
38
39 0     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
40 0         self.http_code = http_code
41 0         Exception.__init__(self, message)
42
43
44 0 class SubscriptionThread(threading.Thread):
45
46 0     def __init__(self, config, engine):
47         """
48         Constructor of class
49         :param config: configuration parameters of database and messaging
50         :param engine: an instance of Engine class, used for deleting instances
51         """
52 0         threading.Thread.__init__(self)
53 0         self.to_terminate = False
54 0         self.config = config
55 0         self.db = None
56 0         self.msg = None
57 0         self.engine = engine
58 0         self.loop = None
59 0         self.logger = logging.getLogger("nbi.subscriptions")
60 0         self.aiomain_task_admin = None  # asyncio task for receiving admin actions from kafka bus
61 0         self.aiomain_task = None  # asyncio task for receiving normal actions from kafka bus
62 0         self.internal_session = {  # used for a session to the engine methods
63             "project_id": (),
64             "set_project": (),
65             "admin": True,
66             "force": False,
67             "public": None,
68             "method": "delete",
69         }
70 0         self.nslcm = None
71
72 0     async def start_kafka(self):
73         # timeout_wait_for_kafka = 3*60
74 0         kafka_working = True
75 0         while not self.to_terminate:
76 0             try:
77                 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
78                 # created.
79                 # Before subscribe, send dummy messages
80 0                 await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
81 0                 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
82 0                 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
83 0                 if not kafka_working:
84 0                     self.logger.critical("kafka is working again")
85 0                     kafka_working = True
86 0                 if not self.aiomain_task_admin:
87 0                     await asyncio.sleep(10, loop=self.loop)
88 0                     self.logger.debug("Starting admin subscription task")
89 0                     self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
90                                                                                      group_id=False,
91                                                                                      aiocallback=self._msg_callback),
92                                                                     loop=self.loop)
93 0                 if not self.aiomain_task:
94 0                     await asyncio.sleep(10, loop=self.loop)
95 0                     self.logger.debug("Starting non-admin subscription task")
96 0                     self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
97                                                                                aiocallback=self._msg_callback),
98                                                               loop=self.loop)
99 0                 done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
100                                              timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
101 0                 try:
102 0                     if self.aiomain_task_admin in done:
103 0                         exc = self.aiomain_task_admin.exception()
104 0                         self.logger.error("admin subscription task exception: {}".format(exc))
105 0                         self.aiomain_task_admin = None
106 0                     if self.aiomain_task in done:
107 0                         exc = self.aiomain_task.exception()
108 0                         self.logger.error("non-admin subscription task exception: {}".format(exc))
109 0                         self.aiomain_task = None
110 0                 except asyncio.CancelledError:
111 0                     pass
112 0             except Exception as e:
113 0                 if self.to_terminate:
114 0                     return
115 0                 if kafka_working:
116                     # logging only first time
117 0                     self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
118 0                     kafka_working = False
119 0             await asyncio.sleep(10, loop=self.loop)
120
121 0     def run(self):
122         """
123         Start of the thread
124         :return: None
125         """
126 0         self.loop = asyncio.new_event_loop()
127 0         try:
128 0             if not self.db:
129 0                 if self.config["database"]["driver"] == "mongo":
130 0                     self.db = dbmongo.DbMongo()
131 0                     self.db.db_connect(self.config["database"])
132 0                 elif self.config["database"]["driver"] == "memory":
133 0                     self.db = dbmemory.DbMemory()
134 0                     self.db.db_connect(self.config["database"])
135                 else:
136 0                     raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
137                         self.config["database"]["driver"]))
138 0             if not self.msg:
139 0                 config_msg = self.config["message"].copy()
140 0                 config_msg["loop"] = self.loop
141 0                 if config_msg["driver"] == "local":
142 0                     self.msg = msglocal.MsgLocal()
143 0                     self.msg.connect(config_msg)
144 0                 elif config_msg["driver"] == "kafka":
145 0                     self.msg = msgkafka.MsgKafka()
146 0                     self.msg.connect(config_msg)
147                 else:
148 0                     raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
149                         config_msg["driver"]))
150 0             self.nslcm = NsLcmNotification(self.db)
151 0         except (DbException, MsgException) as e:
152 0             raise SubscriptionException(str(e), http_code=e.http_code)
153
154 0         self.logger.debug("Starting")
155 0         while not self.to_terminate:
156 0             try:
157
158 0                 self.loop.run_until_complete(asyncio.ensure_future(self.start_kafka(), loop=self.loop))
159             # except asyncio.CancelledError:
160             #     break  # if cancelled it should end, breaking loop
161 0             except Exception as e:
162 0                 if not self.to_terminate:
163 0                     self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
164
165 0         self.logger.debug("Finishing")
166 0         self._stop()
167 0         self.loop.close()
168
169 0     async def _msg_callback(self, topic, command, params):
170         """
171         Callback to process a received message from kafka
172         :param topic:  topic received
173         :param command:  command received
174         :param params: rest of parameters
175         :return: None
176         """
177 0         msg_to_send = []
178 0         try:
179 0             if topic == "ns":
180 0                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
181 0                     self.logger.debug("received ns terminated {}".format(params))
182 0                     if params.get("autoremove"):
183 0                         self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
184                                              not_send_msg=msg_to_send)
185 0                         self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
186                 # Check for nslcm notification
187 0                 if isinstance(params, dict):
188                     # Check availability of operationState and command
189 0                     if (not params.get("operationState")) or (not command) or (not params.get("operationParams")):
190 0                         self.logger.debug("Message can not be used for notification of nslcm")
191                     else:
192 0                         nsd_id = params["operationParams"].get("nsdId")
193 0                         ns_instance_id = params["operationParams"].get("nsInstanceId")
194                         # Any one among nsd_id, ns_instance_id should be present.
195 0                         if not (nsd_id or ns_instance_id):
196 0                             self.logger.debug("Message can not be used for notification of nslcm")
197                         else:
198 0                             op_state = params["operationState"]
199 0                             event_details = {"topic": topic, "command": command.upper(), "params": params}
200 0                             subscribers = self.nslcm.get_subscribers(nsd_id, ns_instance_id, command.upper(), op_state,
201                                                                      event_details)
202                             # self.logger.debug("subscribers list: ")
203                             # self.logger.debug(subscribers)
204 0                             if subscribers:
205 0                                 asyncio.ensure_future(self.nslcm.send_notifications(subscribers, loop=self.loop),
206                                                       loop=self.loop)
207                 else:
208 0                     self.logger.debug("Message can not be used for notification of nslcm")
209 0             elif topic == "nsi":
210 0                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
211 0                     self.logger.debug("received nsi terminated {}".format(params))
212 0                     if params.get("autoremove"):
213 0                         self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
214                                              not_send_msg=msg_to_send)
215 0                         self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
216 0             elif topic == "admin":
217 0                 self.logger.debug("received {} {} {}".format(topic, command, params))
218 0                 if command in ["echo", "ping"]:   # ignored commands
219 0                     pass
220 0                 elif command == "revoke_token":
221 0                     if params:
222 0                         if isinstance(params, dict) and "_id" in params:
223 0                             tid = params.get("_id")
224 0                             self.engine.authenticator.tokens_cache.pop(tid, None)
225 0                             self.logger.debug("token '{}' removed from token_cache".format(tid))
226                         else:
227 0                             self.logger.debug("unrecognized params in command '{} {}': {}"
228                                               .format(topic, command, params))
229                     else:
230 0                         self.engine.authenticator.tokens_cache.clear()
231 0                         self.logger.debug("token_cache cleared")
232                 else:
233 0                     self.logger.debug("unrecognized command '{} {}'".format(topic, command))
234             # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
235             # but content to be written is stored at msg_to_send
236 0             for msg in msg_to_send:
237 0                 await self.msg.aiowrite(*msg, loop=self.loop)
238 0         except (EngineException, DbException, MsgException) as e:
239 0             self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
240 0         except Exception as e:
241 0             self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
242                                   exc_info=True)
243
244 0     def _stop(self):
245         """
246         Close all connections
247         :return: None
248         """
249 0         try:
250 0             if self.db:
251 0                 self.db.db_disconnect()
252 0             if self.msg:
253 0                 self.msg.disconnect()
254 0         except (DbException, MsgException) as e:
255 0             raise SubscriptionException(str(e), http_code=e.http_code)
256
257 0     def terminate(self):
258         """
259         This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
260         but not immediately.
261         :return: None
262         """
263 0         self.to_terminate = True
264 0         if self.aiomain_task:
265 0             self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
266 0         if self.aiomain_task_admin:
267 0             self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)