1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
0 |
""" |
17 |
|
This module implements a thread that reads from kafka bus implementing all the subscriptions. |
18 |
|
It is based on asyncio. |
19 |
|
To avoid race conditions it uses same engine class as the main module for database changes |
20 |
|
For the moment this module only deletes NS instances when they are terminated with the autoremove flag |
21 |
|
""" |
22 |
|
|
23 |
0 |
import logging |
24 |
0 |
import threading |
25 |
0 |
import asyncio |
26 |
0 |
from http import HTTPStatus |
27 |
|
|
28 |
0 |
from osm_common import dbmongo, dbmemory, msglocal, msgkafka |
29 |
0 |
from osm_common.dbbase import DbException |
30 |
0 |
from osm_common.msgbase import MsgException |
31 |
0 |
from osm_nbi.engine import EngineException |
32 |
0 |
from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification |
33 |
|
|
34 |
0 |
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
35 |
|
|
36 |
|
|
37 |
0 |
class SubscriptionException(Exception): |
38 |
0 |
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
39 |
0 |
self.http_code = http_code |
40 |
0 |
Exception.__init__(self, message) |
41 |
|
|
42 |
|
|
43 |
0 |
class SubscriptionThread(threading.Thread): |
44 |
0 |
def __init__(self, config, engine): |
45 |
|
""" |
46 |
|
Constructor of class |
47 |
|
:param config: configuration parameters of database and messaging |
48 |
|
:param engine: an instance of Engine class, used for deleting instances |
49 |
|
""" |
50 |
0 |
threading.Thread.__init__(self) |
51 |
0 |
self.to_terminate = False |
52 |
0 |
self.config = config |
53 |
0 |
self.db = None |
54 |
0 |
self.msg = None |
55 |
0 |
self.engine = engine |
56 |
0 |
self.logger = logging.getLogger("nbi.subscriptions") |
57 |
0 |
self.aiomain_task_admin = ( |
58 |
|
None # asyncio task for receiving admin actions from kafka bus |
59 |
|
) |
60 |
0 |
self.aiomain_task = ( |
61 |
|
None # asyncio task for receiving normal actions from kafka bus |
62 |
|
) |
63 |
0 |
self.internal_session = { # used for a session to the engine methods |
64 |
|
"project_id": (), |
65 |
|
"set_project": (), |
66 |
|
"admin": True, |
67 |
|
"force": False, |
68 |
|
"public": None, |
69 |
|
"method": "delete", |
70 |
|
} |
71 |
0 |
self.nslcm = None |
72 |
0 |
self.vnflcm = None |
73 |
|
|
74 |
0 |
async def start_kafka(self): |
75 |
|
# timeout_wait_for_kafka = 3*60 |
76 |
0 |
kafka_working = True |
77 |
0 |
while not self.to_terminate: |
78 |
0 |
try: |
79 |
|
# bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been |
80 |
|
# created. |
81 |
|
# Before subscribe, send dummy messages |
82 |
0 |
await self.msg.aiowrite( |
83 |
|
"admin", |
84 |
|
"echo", |
85 |
|
"dummy message", |
86 |
|
) |
87 |
0 |
await self.msg.aiowrite("ns", "echo", "dummy message") |
88 |
0 |
await self.msg.aiowrite("nsi", "echo", "dummy message") |
89 |
0 |
await self.msg.aiowrite("vnf", "echo", "dummy message") |
90 |
0 |
if not kafka_working: |
91 |
0 |
self.logger.critical("kafka is working again") |
92 |
0 |
kafka_working = True |
93 |
0 |
if not self.aiomain_task_admin: |
94 |
0 |
await asyncio.sleep(10) |
95 |
0 |
self.logger.debug("Starting admin subscription task") |
96 |
0 |
self.aiomain_task_admin = asyncio.ensure_future( |
97 |
|
self.msg.aioread( |
98 |
|
("admin",), |
99 |
|
group_id=False, |
100 |
|
aiocallback=self._msg_callback, |
101 |
|
), |
102 |
|
) |
103 |
0 |
if not self.aiomain_task: |
104 |
0 |
await asyncio.sleep(10) |
105 |
0 |
self.logger.debug("Starting non-admin subscription task") |
106 |
0 |
self.aiomain_task = asyncio.ensure_future( |
107 |
|
self.msg.aioread( |
108 |
|
("ns", "nsi", "vnf"), |
109 |
|
aiocallback=self._msg_callback, |
110 |
|
), |
111 |
|
) |
112 |
0 |
done, _ = await asyncio.wait( |
113 |
|
[self.aiomain_task, self.aiomain_task_admin], |
114 |
|
timeout=None, |
115 |
|
return_when=asyncio.FIRST_COMPLETED, |
116 |
|
) |
117 |
0 |
try: |
118 |
0 |
if self.aiomain_task_admin in done: |
119 |
0 |
exc = self.aiomain_task_admin.exception() |
120 |
0 |
self.logger.error( |
121 |
|
"admin subscription task exception: {}".format(exc) |
122 |
|
) |
123 |
0 |
self.aiomain_task_admin = None |
124 |
0 |
if self.aiomain_task in done: |
125 |
0 |
exc = self.aiomain_task.exception() |
126 |
0 |
self.logger.error( |
127 |
|
"non-admin subscription task exception: {}".format(exc) |
128 |
|
) |
129 |
0 |
self.aiomain_task = None |
130 |
0 |
except asyncio.CancelledError: |
131 |
0 |
pass |
132 |
0 |
except Exception as e: |
133 |
0 |
if self.to_terminate: |
134 |
0 |
return |
135 |
0 |
if kafka_working: |
136 |
|
# logging only first time |
137 |
0 |
self.logger.critical( |
138 |
|
"Error accessing kafka '{}'. Retrying ...".format(e) |
139 |
|
) |
140 |
0 |
kafka_working = False |
141 |
0 |
await asyncio.sleep(10) |
142 |
|
|
143 |
0 |
def run(self): |
144 |
|
""" |
145 |
|
Start of the thread |
146 |
|
:return: None |
147 |
|
""" |
148 |
0 |
try: |
149 |
0 |
if not self.db: |
150 |
0 |
if self.config["database"]["driver"] == "mongo": |
151 |
0 |
self.db = dbmongo.DbMongo() |
152 |
0 |
self.db.db_connect(self.config["database"]) |
153 |
0 |
elif self.config["database"]["driver"] == "memory": |
154 |
0 |
self.db = dbmemory.DbMemory() |
155 |
0 |
self.db.db_connect(self.config["database"]) |
156 |
|
else: |
157 |
0 |
raise SubscriptionException( |
158 |
|
"Invalid configuration param '{}' at '[database]':'driver'".format( |
159 |
|
self.config["database"]["driver"] |
160 |
|
) |
161 |
|
) |
162 |
0 |
if not self.msg: |
163 |
0 |
config_msg = self.config["message"].copy() |
164 |
0 |
if config_msg["driver"] == "local": |
165 |
0 |
self.msg = msglocal.MsgLocal() |
166 |
0 |
self.msg.connect(config_msg) |
167 |
0 |
elif config_msg["driver"] == "kafka": |
168 |
0 |
self.msg = msgkafka.MsgKafka() |
169 |
0 |
self.msg.connect(config_msg) |
170 |
|
else: |
171 |
0 |
raise SubscriptionException( |
172 |
|
"Invalid configuration param '{}' at '[message]':'driver'".format( |
173 |
|
config_msg["driver"] |
174 |
|
) |
175 |
|
) |
176 |
0 |
self.nslcm = NsLcmNotification(self.db) |
177 |
0 |
self.vnflcm = VnfLcmNotification(self.db) |
178 |
0 |
except (DbException, MsgException) as e: |
179 |
0 |
raise SubscriptionException(str(e), http_code=e.http_code) |
180 |
|
|
181 |
0 |
self.logger.debug("Starting") |
182 |
0 |
while not self.to_terminate: |
183 |
0 |
try: |
184 |
0 |
asyncio.run(self.start_kafka()) |
185 |
0 |
except Exception as e: |
186 |
0 |
if not self.to_terminate: |
187 |
0 |
self.logger.exception( |
188 |
|
"Exception '{}' at messaging read loop".format(e), exc_info=True |
189 |
|
) |
190 |
|
|
191 |
0 |
self.logger.debug("Finishing") |
192 |
0 |
self._stop() |
193 |
|
|
194 |
0 |
async def _msg_callback(self, topic, command, params): |
195 |
|
""" |
196 |
|
Callback to process a received message from kafka |
197 |
|
:param topic: topic received |
198 |
|
:param command: command received |
199 |
|
:param params: rest of parameters |
200 |
|
:return: None |
201 |
|
""" |
202 |
0 |
msg_to_send = [] |
203 |
0 |
try: |
204 |
0 |
if topic == "ns": |
205 |
0 |
if command == "terminated" and params["operationState"] in ( |
206 |
|
"COMPLETED", |
207 |
|
"PARTIALLY_COMPLETED", |
208 |
|
): |
209 |
0 |
self.logger.debug("received ns terminated {}".format(params)) |
210 |
0 |
if params.get("autoremove"): |
211 |
0 |
self.engine.del_item( |
212 |
|
self.internal_session, |
213 |
|
"nsrs", |
214 |
|
_id=params["nsr_id"], |
215 |
|
not_send_msg=msg_to_send, |
216 |
|
) |
217 |
0 |
self.logger.debug( |
218 |
|
"ns={} deleted from database".format(params["nsr_id"]) |
219 |
|
) |
220 |
|
# Check for nslcm notification |
221 |
0 |
if isinstance(params, dict): |
222 |
|
# Check availability of operationState and command |
223 |
0 |
if ( |
224 |
|
(not params.get("operationState")) |
225 |
|
or (not command) |
226 |
|
or (not params.get("operationParams")) |
227 |
|
): |
228 |
0 |
self.logger.debug( |
229 |
|
"Message can not be used for notification of nslcm" |
230 |
|
) |
231 |
|
else: |
232 |
0 |
nsd_id = params["operationParams"].get("nsdId") |
233 |
0 |
ns_instance_id = params["operationParams"].get("nsInstanceId") |
234 |
|
# Any one among nsd_id, ns_instance_id should be present. |
235 |
0 |
if not (nsd_id or ns_instance_id): |
236 |
0 |
self.logger.debug( |
237 |
|
"Message can not be used for notification of nslcm" |
238 |
|
) |
239 |
|
else: |
240 |
0 |
op_state = params["operationState"] |
241 |
0 |
event_details = { |
242 |
|
"topic": topic, |
243 |
|
"command": command.upper(), |
244 |
|
"params": params, |
245 |
|
} |
246 |
0 |
subscribers = self.nslcm.get_subscribers( |
247 |
|
nsd_id, |
248 |
|
ns_instance_id, |
249 |
|
command.upper(), |
250 |
|
op_state, |
251 |
|
event_details, |
252 |
|
) |
253 |
|
# self.logger.debug("subscribers list: ") |
254 |
|
# self.logger.debug(subscribers) |
255 |
0 |
if subscribers: |
256 |
0 |
asyncio.ensure_future( |
257 |
|
self.nslcm.send_notifications(subscribers), |
258 |
|
) |
259 |
|
else: |
260 |
0 |
self.logger.debug( |
261 |
|
"Message can not be used for notification of nslcm" |
262 |
|
) |
263 |
0 |
elif topic == "vnf": |
264 |
0 |
if isinstance(params, dict): |
265 |
0 |
vnfd_id = params["vnfdId"] |
266 |
0 |
vnf_instance_id = params["vnfInstanceId"] |
267 |
0 |
if command == "create" or command == "delete": |
268 |
0 |
op_state = command |
269 |
|
else: |
270 |
0 |
op_state = params["operationState"] |
271 |
0 |
event_details = { |
272 |
|
"topic": topic, |
273 |
|
"command": command.upper(), |
274 |
|
"params": params, |
275 |
|
} |
276 |
0 |
subscribers = self.vnflcm.get_subscribers( |
277 |
|
vnfd_id, |
278 |
|
vnf_instance_id, |
279 |
|
command.upper(), |
280 |
|
op_state, |
281 |
|
event_details, |
282 |
|
) |
283 |
0 |
if subscribers: |
284 |
0 |
asyncio.ensure_future( |
285 |
|
self.vnflcm.send_notifications(subscribers), |
286 |
|
) |
287 |
0 |
elif topic == "nsi": |
288 |
0 |
if command == "terminated" and params["operationState"] in ( |
289 |
|
"COMPLETED", |
290 |
|
"PARTIALLY_COMPLETED", |
291 |
|
): |
292 |
0 |
self.logger.debug("received nsi terminated {}".format(params)) |
293 |
0 |
if params.get("autoremove"): |
294 |
0 |
self.engine.del_item( |
295 |
|
self.internal_session, |
296 |
|
"nsis", |
297 |
|
_id=params["nsir_id"], |
298 |
|
not_send_msg=msg_to_send, |
299 |
|
) |
300 |
0 |
self.logger.debug( |
301 |
|
"nsis={} deleted from database".format(params["nsir_id"]) |
302 |
|
) |
303 |
0 |
elif topic == "admin": |
304 |
0 |
self.logger.debug("received {} {} {}".format(topic, command, params)) |
305 |
0 |
if command in ["echo", "ping"]: # ignored commands |
306 |
0 |
pass |
307 |
0 |
elif command == "revoke_token": |
308 |
0 |
if params: |
309 |
0 |
if isinstance(params, dict) and "_id" in params: |
310 |
0 |
tid = params.get("_id") |
311 |
0 |
self.engine.authenticator.tokens_cache.pop(tid, None) |
312 |
0 |
self.logger.debug( |
313 |
|
"token '{}' removed from token_cache".format(tid) |
314 |
|
) |
315 |
|
else: |
316 |
0 |
self.logger.debug( |
317 |
|
"unrecognized params in command '{} {}': {}".format( |
318 |
|
topic, command, params |
319 |
|
) |
320 |
|
) |
321 |
|
else: |
322 |
0 |
self.engine.authenticator.tokens_cache.clear() |
323 |
0 |
self.logger.debug("token_cache cleared") |
324 |
|
else: |
325 |
0 |
self.logger.debug( |
326 |
|
"unrecognized command '{} {}'".format(topic, command) |
327 |
|
) |
328 |
|
# writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, |
329 |
|
# but content to be written is stored at msg_to_send |
330 |
0 |
for msg in msg_to_send: |
331 |
0 |
await self.msg.aiowrite(*msg) |
332 |
0 |
except (EngineException, DbException, MsgException) as e: |
333 |
0 |
self.logger.error( |
334 |
|
"Error while processing topic={} command={}: {}".format( |
335 |
|
topic, command, e |
336 |
|
) |
337 |
|
) |
338 |
0 |
except Exception as e: |
339 |
0 |
self.logger.exception( |
340 |
|
"Exception while processing topic={} command={}: {}".format( |
341 |
|
topic, command, e |
342 |
|
), |
343 |
|
exc_info=True, |
344 |
|
) |
345 |
|
|
346 |
0 |
def _stop(self): |
347 |
|
""" |
348 |
|
Close all connections |
349 |
|
:return: None |
350 |
|
""" |
351 |
0 |
try: |
352 |
0 |
if self.db: |
353 |
0 |
self.db.db_disconnect() |
354 |
0 |
if self.msg: |
355 |
0 |
self.msg.disconnect() |
356 |
0 |
except (DbException, MsgException) as e: |
357 |
0 |
raise SubscriptionException(str(e), http_code=e.http_code) |
358 |
|
|
359 |
0 |
def terminate(self): |
360 |
|
""" |
361 |
|
This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
362 |
|
but not immediately. |
363 |
|
:return: None |
364 |
|
""" |
365 |
0 |
self.to_terminate = True |
366 |
0 |
if self.aiomain_task: |
367 |
0 |
asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel) |
368 |
0 |
if self.aiomain_task_admin: |
369 |
0 |
asyncio.get_event_loop().call_soon_threadsafe( |
370 |
|
self.aiomain_task_admin.cancel |
371 |
|
) |