1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
0 |
""" |
17 |
|
This module implements a thread that reads from kafka bus implementing all the subscriptions. |
18 |
|
It is based on asyncio. |
19 |
|
To avoid race conditions it uses same engine class as the main module for database changes |
20 |
|
For the moment this module only deletes NS instances when they are terminated with the autoremove flag |
21 |
|
""" |
22 |
|
|
23 |
0 |
import logging |
24 |
0 |
import threading |
25 |
0 |
import asyncio |
26 |
0 |
from http import HTTPStatus |
27 |
|
|
28 |
0 |
from osm_common import dbmongo, dbmemory, msglocal, msgkafka |
29 |
0 |
from osm_common.dbbase import DbException |
30 |
0 |
from osm_common.msgbase import MsgException |
31 |
0 |
from osm_nbi.engine import EngineException |
32 |
0 |
from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification |
33 |
|
|
34 |
0 |
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
35 |
|
|
36 |
|
|
37 |
0 |
class SubscriptionException(Exception): |
38 |
0 |
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
39 |
0 |
self.http_code = http_code |
40 |
0 |
Exception.__init__(self, message) |
41 |
|
|
42 |
|
|
43 |
0 |
class SubscriptionThread(threading.Thread): |
44 |
0 |
def __init__(self, config, engine): |
45 |
|
""" |
46 |
|
Constructor of class |
47 |
|
:param config: configuration parameters of database and messaging |
48 |
|
:param engine: an instance of Engine class, used for deleting instances |
49 |
|
""" |
50 |
0 |
threading.Thread.__init__(self) |
51 |
0 |
self.to_terminate = False |
52 |
0 |
self.config = config |
53 |
0 |
self.db = None |
54 |
0 |
self.msg = None |
55 |
0 |
self.engine = engine |
56 |
0 |
self.loop = None |
57 |
0 |
self.logger = logging.getLogger("nbi.subscriptions") |
58 |
0 |
self.aiomain_task_admin = ( |
59 |
|
None # asyncio task for receiving admin actions from kafka bus |
60 |
|
) |
61 |
0 |
self.aiomain_task = ( |
62 |
|
None # asyncio task for receiving normal actions from kafka bus |
63 |
|
) |
64 |
0 |
self.internal_session = { # used for a session to the engine methods |
65 |
|
"project_id": (), |
66 |
|
"set_project": (), |
67 |
|
"admin": True, |
68 |
|
"force": False, |
69 |
|
"public": None, |
70 |
|
"method": "delete", |
71 |
|
} |
72 |
0 |
self.nslcm = None |
73 |
0 |
self.vnflcm = None |
74 |
|
|
75 |
0 |
async def start_kafka(self): |
76 |
|
# timeout_wait_for_kafka = 3*60 |
77 |
0 |
kafka_working = True |
78 |
0 |
while not self.to_terminate: |
79 |
0 |
try: |
80 |
|
# bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been |
81 |
|
# created. |
82 |
|
# Before subscribe, send dummy messages |
83 |
0 |
await self.msg.aiowrite( |
84 |
|
"admin", "echo", "dummy message", loop=self.loop |
85 |
|
) |
86 |
0 |
await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop) |
87 |
0 |
await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop) |
88 |
0 |
await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop) |
89 |
0 |
if not kafka_working: |
90 |
0 |
self.logger.critical("kafka is working again") |
91 |
0 |
kafka_working = True |
92 |
0 |
if not self.aiomain_task_admin: |
93 |
0 |
await asyncio.sleep(10, loop=self.loop) |
94 |
0 |
self.logger.debug("Starting admin subscription task") |
95 |
0 |
self.aiomain_task_admin = asyncio.ensure_future( |
96 |
|
self.msg.aioread( |
97 |
|
("admin",), |
98 |
|
loop=self.loop, |
99 |
|
group_id=False, |
100 |
|
aiocallback=self._msg_callback, |
101 |
|
), |
102 |
|
loop=self.loop, |
103 |
|
) |
104 |
0 |
if not self.aiomain_task: |
105 |
0 |
await asyncio.sleep(10, loop=self.loop) |
106 |
0 |
self.logger.debug("Starting non-admin subscription task") |
107 |
0 |
self.aiomain_task = asyncio.ensure_future( |
108 |
|
self.msg.aioread( |
109 |
|
("ns", "nsi", "vnf"), |
110 |
|
loop=self.loop, |
111 |
|
aiocallback=self._msg_callback, |
112 |
|
), |
113 |
|
loop=self.loop, |
114 |
|
) |
115 |
0 |
done, _ = await asyncio.wait( |
116 |
|
[self.aiomain_task, self.aiomain_task_admin], |
117 |
|
timeout=None, |
118 |
|
loop=self.loop, |
119 |
|
return_when=asyncio.FIRST_COMPLETED, |
120 |
|
) |
121 |
0 |
try: |
122 |
0 |
if self.aiomain_task_admin in done: |
123 |
0 |
exc = self.aiomain_task_admin.exception() |
124 |
0 |
self.logger.error( |
125 |
|
"admin subscription task exception: {}".format(exc) |
126 |
|
) |
127 |
0 |
self.aiomain_task_admin = None |
128 |
0 |
if self.aiomain_task in done: |
129 |
0 |
exc = self.aiomain_task.exception() |
130 |
0 |
self.logger.error( |
131 |
|
"non-admin subscription task exception: {}".format(exc) |
132 |
|
) |
133 |
0 |
self.aiomain_task = None |
134 |
0 |
except asyncio.CancelledError: |
135 |
0 |
pass |
136 |
0 |
except Exception as e: |
137 |
0 |
if self.to_terminate: |
138 |
0 |
return |
139 |
0 |
if kafka_working: |
140 |
|
# logging only first time |
141 |
0 |
self.logger.critical( |
142 |
|
"Error accessing kafka '{}'. Retrying ...".format(e) |
143 |
|
) |
144 |
0 |
kafka_working = False |
145 |
0 |
await asyncio.sleep(10, loop=self.loop) |
146 |
|
|
147 |
0 |
def run(self): |
148 |
|
""" |
149 |
|
Start of the thread |
150 |
|
:return: None |
151 |
|
""" |
152 |
0 |
self.loop = asyncio.new_event_loop() |
153 |
0 |
try: |
154 |
0 |
if not self.db: |
155 |
0 |
if self.config["database"]["driver"] == "mongo": |
156 |
0 |
self.db = dbmongo.DbMongo() |
157 |
0 |
self.db.db_connect(self.config["database"]) |
158 |
0 |
elif self.config["database"]["driver"] == "memory": |
159 |
0 |
self.db = dbmemory.DbMemory() |
160 |
0 |
self.db.db_connect(self.config["database"]) |
161 |
|
else: |
162 |
0 |
raise SubscriptionException( |
163 |
|
"Invalid configuration param '{}' at '[database]':'driver'".format( |
164 |
|
self.config["database"]["driver"] |
165 |
|
) |
166 |
|
) |
167 |
0 |
if not self.msg: |
168 |
0 |
config_msg = self.config["message"].copy() |
169 |
0 |
config_msg["loop"] = self.loop |
170 |
0 |
if config_msg["driver"] == "local": |
171 |
0 |
self.msg = msglocal.MsgLocal() |
172 |
0 |
self.msg.connect(config_msg) |
173 |
0 |
elif config_msg["driver"] == "kafka": |
174 |
0 |
self.msg = msgkafka.MsgKafka() |
175 |
0 |
self.msg.connect(config_msg) |
176 |
|
else: |
177 |
0 |
raise SubscriptionException( |
178 |
|
"Invalid configuration param '{}' at '[message]':'driver'".format( |
179 |
|
config_msg["driver"] |
180 |
|
) |
181 |
|
) |
182 |
0 |
self.nslcm = NsLcmNotification(self.db) |
183 |
0 |
self.vnflcm = VnfLcmNotification(self.db) |
184 |
0 |
except (DbException, MsgException) as e: |
185 |
0 |
raise SubscriptionException(str(e), http_code=e.http_code) |
186 |
|
|
187 |
0 |
self.logger.debug("Starting") |
188 |
0 |
while not self.to_terminate: |
189 |
0 |
try: |
190 |
|
|
191 |
0 |
self.loop.run_until_complete( |
192 |
|
asyncio.ensure_future(self.start_kafka(), loop=self.loop) |
193 |
|
) |
194 |
|
# except asyncio.CancelledError: |
195 |
|
# break # if cancelled it should end, breaking loop |
196 |
0 |
except Exception as e: |
197 |
0 |
if not self.to_terminate: |
198 |
0 |
self.logger.exception( |
199 |
|
"Exception '{}' at messaging read loop".format(e), exc_info=True |
200 |
|
) |
201 |
|
|
202 |
0 |
self.logger.debug("Finishing") |
203 |
0 |
self._stop() |
204 |
0 |
self.loop.close() |
205 |
|
|
206 |
0 |
async def _msg_callback(self, topic, command, params): |
207 |
|
""" |
208 |
|
Callback to process a received message from kafka |
209 |
|
:param topic: topic received |
210 |
|
:param command: command received |
211 |
|
:param params: rest of parameters |
212 |
|
:return: None |
213 |
|
""" |
214 |
0 |
msg_to_send = [] |
215 |
0 |
try: |
216 |
0 |
if topic == "ns": |
217 |
0 |
if command == "terminated" and params["operationState"] in ( |
218 |
|
"COMPLETED", |
219 |
|
"PARTIALLY_COMPLETED", |
220 |
|
): |
221 |
0 |
self.logger.debug("received ns terminated {}".format(params)) |
222 |
0 |
if params.get("autoremove"): |
223 |
0 |
self.engine.del_item( |
224 |
|
self.internal_session, |
225 |
|
"nsrs", |
226 |
|
_id=params["nsr_id"], |
227 |
|
not_send_msg=msg_to_send, |
228 |
|
) |
229 |
0 |
self.logger.debug( |
230 |
|
"ns={} deleted from database".format(params["nsr_id"]) |
231 |
|
) |
232 |
|
# Check for nslcm notification |
233 |
0 |
if isinstance(params, dict): |
234 |
|
# Check availability of operationState and command |
235 |
0 |
if ( |
236 |
|
(not params.get("operationState")) |
237 |
|
or (not command) |
238 |
|
or (not params.get("operationParams")) |
239 |
|
): |
240 |
0 |
self.logger.debug( |
241 |
|
"Message can not be used for notification of nslcm" |
242 |
|
) |
243 |
|
else: |
244 |
0 |
nsd_id = params["operationParams"].get("nsdId") |
245 |
0 |
ns_instance_id = params["operationParams"].get("nsInstanceId") |
246 |
|
# Any one among nsd_id, ns_instance_id should be present. |
247 |
0 |
if not (nsd_id or ns_instance_id): |
248 |
0 |
self.logger.debug( |
249 |
|
"Message can not be used for notification of nslcm" |
250 |
|
) |
251 |
|
else: |
252 |
0 |
op_state = params["operationState"] |
253 |
0 |
event_details = { |
254 |
|
"topic": topic, |
255 |
|
"command": command.upper(), |
256 |
|
"params": params, |
257 |
|
} |
258 |
0 |
subscribers = self.nslcm.get_subscribers( |
259 |
|
nsd_id, |
260 |
|
ns_instance_id, |
261 |
|
command.upper(), |
262 |
|
op_state, |
263 |
|
event_details, |
264 |
|
) |
265 |
|
# self.logger.debug("subscribers list: ") |
266 |
|
# self.logger.debug(subscribers) |
267 |
0 |
if subscribers: |
268 |
0 |
asyncio.ensure_future( |
269 |
|
self.nslcm.send_notifications( |
270 |
|
subscribers, loop=self.loop |
271 |
|
), |
272 |
|
loop=self.loop, |
273 |
|
) |
274 |
|
else: |
275 |
0 |
self.logger.debug( |
276 |
|
"Message can not be used for notification of nslcm" |
277 |
|
) |
278 |
0 |
elif topic == "vnf": |
279 |
0 |
if isinstance(params, dict): |
280 |
0 |
vnfd_id = params["vnfdId"] |
281 |
0 |
vnf_instance_id = params["vnfInstanceId"] |
282 |
0 |
if command == "create" or command == "delete": |
283 |
0 |
op_state = command |
284 |
|
else: |
285 |
0 |
op_state = params["operationState"] |
286 |
0 |
event_details = { |
287 |
|
"topic": topic, |
288 |
|
"command": command.upper(), |
289 |
|
"params": params, |
290 |
|
} |
291 |
0 |
subscribers = self.vnflcm.get_subscribers( |
292 |
|
vnfd_id, |
293 |
|
vnf_instance_id, |
294 |
|
command.upper(), |
295 |
|
op_state, |
296 |
|
event_details |
297 |
|
) |
298 |
0 |
if subscribers: |
299 |
0 |
asyncio.ensure_future( |
300 |
|
self.vnflcm.send_notifications( |
301 |
|
subscribers, loop=self.loop |
302 |
|
), |
303 |
|
loop=self.loop |
304 |
|
) |
305 |
0 |
elif topic == "nsi": |
306 |
0 |
if command == "terminated" and params["operationState"] in ( |
307 |
|
"COMPLETED", |
308 |
|
"PARTIALLY_COMPLETED", |
309 |
|
): |
310 |
0 |
self.logger.debug("received nsi terminated {}".format(params)) |
311 |
0 |
if params.get("autoremove"): |
312 |
0 |
self.engine.del_item( |
313 |
|
self.internal_session, |
314 |
|
"nsis", |
315 |
|
_id=params["nsir_id"], |
316 |
|
not_send_msg=msg_to_send, |
317 |
|
) |
318 |
0 |
self.logger.debug( |
319 |
|
"nsis={} deleted from database".format(params["nsir_id"]) |
320 |
|
) |
321 |
0 |
elif topic == "admin": |
322 |
0 |
self.logger.debug("received {} {} {}".format(topic, command, params)) |
323 |
0 |
if command in ["echo", "ping"]: # ignored commands |
324 |
0 |
pass |
325 |
0 |
elif command == "revoke_token": |
326 |
0 |
if params: |
327 |
0 |
if isinstance(params, dict) and "_id" in params: |
328 |
0 |
tid = params.get("_id") |
329 |
0 |
self.engine.authenticator.tokens_cache.pop(tid, None) |
330 |
0 |
self.logger.debug( |
331 |
|
"token '{}' removed from token_cache".format(tid) |
332 |
|
) |
333 |
|
else: |
334 |
0 |
self.logger.debug( |
335 |
|
"unrecognized params in command '{} {}': {}".format( |
336 |
|
topic, command, params |
337 |
|
) |
338 |
|
) |
339 |
|
else: |
340 |
0 |
self.engine.authenticator.tokens_cache.clear() |
341 |
0 |
self.logger.debug("token_cache cleared") |
342 |
|
else: |
343 |
0 |
self.logger.debug( |
344 |
|
"unrecognized command '{} {}'".format(topic, command) |
345 |
|
) |
346 |
|
# writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, |
347 |
|
# but content to be written is stored at msg_to_send |
348 |
0 |
for msg in msg_to_send: |
349 |
0 |
await self.msg.aiowrite(*msg, loop=self.loop) |
350 |
0 |
except (EngineException, DbException, MsgException) as e: |
351 |
0 |
self.logger.error( |
352 |
|
"Error while processing topic={} command={}: {}".format( |
353 |
|
topic, command, e |
354 |
|
) |
355 |
|
) |
356 |
0 |
except Exception as e: |
357 |
0 |
self.logger.exception( |
358 |
|
"Exception while processing topic={} command={}: {}".format( |
359 |
|
topic, command, e |
360 |
|
), |
361 |
|
exc_info=True, |
362 |
|
) |
363 |
|
|
364 |
0 |
def _stop(self): |
365 |
|
""" |
366 |
|
Close all connections |
367 |
|
:return: None |
368 |
|
""" |
369 |
0 |
try: |
370 |
0 |
if self.db: |
371 |
0 |
self.db.db_disconnect() |
372 |
0 |
if self.msg: |
373 |
0 |
self.msg.disconnect() |
374 |
0 |
except (DbException, MsgException) as e: |
375 |
0 |
raise SubscriptionException(str(e), http_code=e.http_code) |
376 |
|
|
377 |
0 |
def terminate(self): |
378 |
|
""" |
379 |
|
This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
380 |
|
but not immediately. |
381 |
|
:return: None |
382 |
|
""" |
383 |
0 |
self.to_terminate = True |
384 |
0 |
if self.aiomain_task: |
385 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task.cancel) |
386 |
0 |
if self.aiomain_task_admin: |
387 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel) |