blob: 91d2f2d1954f3af9902388d4dc3778f0019184d4 [file] [log] [blame]
tierno932499c2019-01-28 17:28:10 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus implementing all the subscriptions.
18It is based on asyncio.
19To avoid race conditions it uses same engine class as the main module for database changes
20For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21"""
22
23import logging
24import threading
25import asyncio
26from http import HTTPStatus
K Sai Kiranbb70c812020-04-28 14:48:31 +053027
tierno932499c2019-01-28 17:28:10 +000028from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29from osm_common.dbbase import DbException
30from osm_common.msgbase import MsgException
tierno23acf402019-08-28 13:36:34 +000031from osm_nbi.engine import EngineException
selvi.jf1004592022-04-29 05:42:35 +000032from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
tierno932499c2019-01-28 17:28:10 +000033
34__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37class SubscriptionException(Exception):
tierno932499c2019-01-28 17:28:10 +000038 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43class SubscriptionThread(threading.Thread):
tierno932499c2019-01-28 17:28:10 +000044 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
tierno65ca36d2019-02-12 19:27:52 +010051 self.to_terminate = False
tierno932499c2019-01-28 17:28:10 +000052 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
garciadeblas4568a372021-03-24 09:19:48 +010058 self.aiomain_task_admin = (
59 None # asyncio task for receiving admin actions from kafka bus
60 )
61 self.aiomain_task = (
62 None # asyncio task for receiving normal actions from kafka bus
63 )
tierno932499c2019-01-28 17:28:10 +000064 self.internal_session = { # used for a session to the engine methods
tierno86e916a2019-05-29 21:39:37 +000065 "project_id": (),
66 "set_project": (),
67 "admin": True,
68 "force": False,
69 "public": None,
70 "method": "delete",
tierno932499c2019-01-28 17:28:10 +000071 }
K Sai Kiranbb70c812020-04-28 14:48:31 +053072 self.nslcm = None
selvi.jf1004592022-04-29 05:42:35 +000073 self.vnflcm = None
tierno932499c2019-01-28 17:28:10 +000074
tiernoee270722019-06-07 14:44:09 +000075 async def start_kafka(self):
76 # timeout_wait_for_kafka = 3*60
77 kafka_working = True
78 while not self.to_terminate:
79 try:
80 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
81 # created.
82 # Before subscribe, send dummy messages
garciadeblas4568a372021-03-24 09:19:48 +010083 await self.msg.aiowrite(
84 "admin", "echo", "dummy message", loop=self.loop
85 )
tiernoee270722019-06-07 14:44:09 +000086 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
87 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
selvi.jf1004592022-04-29 05:42:35 +000088 await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop)
tiernoee270722019-06-07 14:44:09 +000089 if not kafka_working:
90 self.logger.critical("kafka is working again")
91 kafka_working = True
tiernof55e7ed2020-01-21 00:10:09 +000092 if not self.aiomain_task_admin:
delacruzramoad682a52019-12-10 16:26:34 +010093 await asyncio.sleep(10, loop=self.loop)
94 self.logger.debug("Starting admin subscription task")
garciadeblas4568a372021-03-24 09:19:48 +010095 self.aiomain_task_admin = asyncio.ensure_future(
96 self.msg.aioread(
97 ("admin",),
98 loop=self.loop,
99 group_id=False,
100 aiocallback=self._msg_callback,
101 ),
102 loop=self.loop,
103 )
tiernof55e7ed2020-01-21 00:10:09 +0000104 if not self.aiomain_task:
delacruzramoad682a52019-12-10 16:26:34 +0100105 await asyncio.sleep(10, loop=self.loop)
106 self.logger.debug("Starting non-admin subscription task")
garciadeblas4568a372021-03-24 09:19:48 +0100107 self.aiomain_task = asyncio.ensure_future(
108 self.msg.aioread(
selvi.jf1004592022-04-29 05:42:35 +0000109 ("ns", "nsi", "vnf"),
garciadeblas4568a372021-03-24 09:19:48 +0100110 loop=self.loop,
111 aiocallback=self._msg_callback,
112 ),
113 loop=self.loop,
114 )
115 done, _ = await asyncio.wait(
116 [self.aiomain_task, self.aiomain_task_admin],
117 timeout=None,
118 loop=self.loop,
119 return_when=asyncio.FIRST_COMPLETED,
120 )
tiernof55e7ed2020-01-21 00:10:09 +0000121 try:
122 if self.aiomain_task_admin in done:
123 exc = self.aiomain_task_admin.exception()
garciadeblas4568a372021-03-24 09:19:48 +0100124 self.logger.error(
125 "admin subscription task exception: {}".format(exc)
126 )
tiernof55e7ed2020-01-21 00:10:09 +0000127 self.aiomain_task_admin = None
128 if self.aiomain_task in done:
129 exc = self.aiomain_task.exception()
garciadeblas4568a372021-03-24 09:19:48 +0100130 self.logger.error(
131 "non-admin subscription task exception: {}".format(exc)
132 )
tiernof55e7ed2020-01-21 00:10:09 +0000133 self.aiomain_task = None
134 except asyncio.CancelledError:
135 pass
tiernoee270722019-06-07 14:44:09 +0000136 except Exception as e:
137 if self.to_terminate:
138 return
139 if kafka_working:
140 # logging only first time
garciadeblas4568a372021-03-24 09:19:48 +0100141 self.logger.critical(
142 "Error accessing kafka '{}'. Retrying ...".format(e)
143 )
tiernoee270722019-06-07 14:44:09 +0000144 kafka_working = False
145 await asyncio.sleep(10, loop=self.loop)
146
tierno932499c2019-01-28 17:28:10 +0000147 def run(self):
148 """
149 Start of the thread
150 :return: None
151 """
152 self.loop = asyncio.new_event_loop()
153 try:
154 if not self.db:
155 if self.config["database"]["driver"] == "mongo":
156 self.db = dbmongo.DbMongo()
157 self.db.db_connect(self.config["database"])
158 elif self.config["database"]["driver"] == "memory":
159 self.db = dbmemory.DbMemory()
160 self.db.db_connect(self.config["database"])
161 else:
garciadeblas4568a372021-03-24 09:19:48 +0100162 raise SubscriptionException(
163 "Invalid configuration param '{}' at '[database]':'driver'".format(
164 self.config["database"]["driver"]
165 )
166 )
tierno932499c2019-01-28 17:28:10 +0000167 if not self.msg:
168 config_msg = self.config["message"].copy()
169 config_msg["loop"] = self.loop
170 if config_msg["driver"] == "local":
171 self.msg = msglocal.MsgLocal()
172 self.msg.connect(config_msg)
173 elif config_msg["driver"] == "kafka":
174 self.msg = msgkafka.MsgKafka()
175 self.msg.connect(config_msg)
176 else:
garciadeblas4568a372021-03-24 09:19:48 +0100177 raise SubscriptionException(
178 "Invalid configuration param '{}' at '[message]':'driver'".format(
179 config_msg["driver"]
180 )
181 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530182 self.nslcm = NsLcmNotification(self.db)
selvi.jf1004592022-04-29 05:42:35 +0000183 self.vnflcm = VnfLcmNotification(self.db)
tierno932499c2019-01-28 17:28:10 +0000184 except (DbException, MsgException) as e:
185 raise SubscriptionException(str(e), http_code=e.http_code)
186
187 self.logger.debug("Starting")
tierno65ca36d2019-02-12 19:27:52 +0100188 while not self.to_terminate:
tierno932499c2019-01-28 17:28:10 +0000189 try:
tiernoee270722019-06-07 14:44:09 +0000190
garciadeblas4568a372021-03-24 09:19:48 +0100191 self.loop.run_until_complete(
192 asyncio.ensure_future(self.start_kafka(), loop=self.loop)
193 )
tierno65ca36d2019-02-12 19:27:52 +0100194 # except asyncio.CancelledError:
195 # break # if cancelled it should end, breaking loop
tierno932499c2019-01-28 17:28:10 +0000196 except Exception as e:
tierno65ca36d2019-02-12 19:27:52 +0100197 if not self.to_terminate:
garciadeblas4568a372021-03-24 09:19:48 +0100198 self.logger.exception(
199 "Exception '{}' at messaging read loop".format(e), exc_info=True
200 )
tierno932499c2019-01-28 17:28:10 +0000201
202 self.logger.debug("Finishing")
203 self._stop()
204 self.loop.close()
205
tiernobee3bad2019-12-05 12:26:01 +0000206 async def _msg_callback(self, topic, command, params):
tierno932499c2019-01-28 17:28:10 +0000207 """
208 Callback to process a received message from kafka
209 :param topic: topic received
210 :param command: command received
211 :param params: rest of parameters
212 :return: None
213 """
tiernobee3bad2019-12-05 12:26:01 +0000214 msg_to_send = []
tierno932499c2019-01-28 17:28:10 +0000215 try:
216 if topic == "ns":
garciadeblas4568a372021-03-24 09:19:48 +0100217 if command == "terminated" and params["operationState"] in (
218 "COMPLETED",
219 "PARTIALLY_COMPLETED",
220 ):
tierno932499c2019-01-28 17:28:10 +0000221 self.logger.debug("received ns terminated {}".format(params))
222 if params.get("autoremove"):
garciadeblas4568a372021-03-24 09:19:48 +0100223 self.engine.del_item(
224 self.internal_session,
225 "nsrs",
226 _id=params["nsr_id"],
227 not_send_msg=msg_to_send,
228 )
229 self.logger.debug(
230 "ns={} deleted from database".format(params["nsr_id"])
231 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530232 # Check for nslcm notification
233 if isinstance(params, dict):
234 # Check availability of operationState and command
garciadeblas4568a372021-03-24 09:19:48 +0100235 if (
236 (not params.get("operationState"))
237 or (not command)
238 or (not params.get("operationParams"))
239 ):
240 self.logger.debug(
241 "Message can not be used for notification of nslcm"
242 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530243 else:
244 nsd_id = params["operationParams"].get("nsdId")
245 ns_instance_id = params["operationParams"].get("nsInstanceId")
246 # Any one among nsd_id, ns_instance_id should be present.
247 if not (nsd_id or ns_instance_id):
garciadeblas4568a372021-03-24 09:19:48 +0100248 self.logger.debug(
249 "Message can not be used for notification of nslcm"
250 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530251 else:
252 op_state = params["operationState"]
garciadeblas4568a372021-03-24 09:19:48 +0100253 event_details = {
254 "topic": topic,
255 "command": command.upper(),
256 "params": params,
257 }
258 subscribers = self.nslcm.get_subscribers(
259 nsd_id,
260 ns_instance_id,
261 command.upper(),
262 op_state,
263 event_details,
264 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530265 # self.logger.debug("subscribers list: ")
266 # self.logger.debug(subscribers)
tierno2278fa42020-08-10 13:53:57 +0000267 if subscribers:
garciadeblas4568a372021-03-24 09:19:48 +0100268 asyncio.ensure_future(
269 self.nslcm.send_notifications(
270 subscribers, loop=self.loop
271 ),
272 loop=self.loop,
273 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530274 else:
garciadeblas4568a372021-03-24 09:19:48 +0100275 self.logger.debug(
276 "Message can not be used for notification of nslcm"
277 )
selvi.jf1004592022-04-29 05:42:35 +0000278 elif topic == "vnf":
279 if isinstance(params, dict):
280 vnfd_id = params["vnfdId"]
281 vnf_instance_id = params["vnfInstanceId"]
282 if command == "create" or command == "delete":
283 op_state = command
284 else:
285 op_state = params["operationState"]
286 event_details = {
garciadeblasf2af4a12023-01-24 16:56:54 +0100287 "topic": topic,
288 "command": command.upper(),
289 "params": params,
290 }
selvi.jf1004592022-04-29 05:42:35 +0000291 subscribers = self.vnflcm.get_subscribers(
garciadeblasf2af4a12023-01-24 16:56:54 +0100292 vnfd_id,
293 vnf_instance_id,
294 command.upper(),
295 op_state,
296 event_details,
297 )
selvi.jf1004592022-04-29 05:42:35 +0000298 if subscribers:
299 asyncio.ensure_future(
garciadeblasf2af4a12023-01-24 16:56:54 +0100300 self.vnflcm.send_notifications(subscribers, loop=self.loop),
301 loop=self.loop,
302 )
tiernobee3bad2019-12-05 12:26:01 +0000303 elif topic == "nsi":
garciadeblas4568a372021-03-24 09:19:48 +0100304 if command == "terminated" and params["operationState"] in (
305 "COMPLETED",
306 "PARTIALLY_COMPLETED",
307 ):
Felipe Vicens09e65422019-01-22 15:06:46 +0100308 self.logger.debug("received nsi terminated {}".format(params))
309 if params.get("autoremove"):
garciadeblas4568a372021-03-24 09:19:48 +0100310 self.engine.del_item(
311 self.internal_session,
312 "nsis",
313 _id=params["nsir_id"],
314 not_send_msg=msg_to_send,
315 )
316 self.logger.debug(
317 "nsis={} deleted from database".format(params["nsir_id"])
318 )
delacruzramoad682a52019-12-10 16:26:34 +0100319 elif topic == "admin":
320 self.logger.debug("received {} {} {}".format(topic, command, params))
garciadeblas4568a372021-03-24 09:19:48 +0100321 if command in ["echo", "ping"]: # ignored commands
delacruzramoad682a52019-12-10 16:26:34 +0100322 pass
323 elif command == "revoke_token":
324 if params:
325 if isinstance(params, dict) and "_id" in params:
326 tid = params.get("_id")
327 self.engine.authenticator.tokens_cache.pop(tid, None)
garciadeblas4568a372021-03-24 09:19:48 +0100328 self.logger.debug(
329 "token '{}' removed from token_cache".format(tid)
330 )
delacruzramoad682a52019-12-10 16:26:34 +0100331 else:
garciadeblas4568a372021-03-24 09:19:48 +0100332 self.logger.debug(
333 "unrecognized params in command '{} {}': {}".format(
334 topic, command, params
335 )
336 )
delacruzramoad682a52019-12-10 16:26:34 +0100337 else:
338 self.engine.authenticator.tokens_cache.clear()
339 self.logger.debug("token_cache cleared")
340 else:
garciadeblas4568a372021-03-24 09:19:48 +0100341 self.logger.debug(
342 "unrecognized command '{} {}'".format(topic, command)
343 )
delacruzramoad682a52019-12-10 16:26:34 +0100344 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
tiernobee3bad2019-12-05 12:26:01 +0000345 # but content to be written is stored at msg_to_send
346 for msg in msg_to_send:
347 await self.msg.aiowrite(*msg, loop=self.loop)
tierno932499c2019-01-28 17:28:10 +0000348 except (EngineException, DbException, MsgException) as e:
garciadeblas4568a372021-03-24 09:19:48 +0100349 self.logger.error(
350 "Error while processing topic={} command={}: {}".format(
351 topic, command, e
352 )
353 )
tierno932499c2019-01-28 17:28:10 +0000354 except Exception as e:
garciadeblas4568a372021-03-24 09:19:48 +0100355 self.logger.exception(
356 "Exception while processing topic={} command={}: {}".format(
357 topic, command, e
358 ),
359 exc_info=True,
360 )
tierno932499c2019-01-28 17:28:10 +0000361
362 def _stop(self):
363 """
364 Close all connections
365 :return: None
366 """
367 try:
368 if self.db:
369 self.db.db_disconnect()
370 if self.msg:
371 self.msg.disconnect()
372 except (DbException, MsgException) as e:
373 raise SubscriptionException(str(e), http_code=e.http_code)
374
375 def terminate(self):
376 """
377 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
378 but not immediately.
379 :return: None
380 """
tierno65ca36d2019-02-12 19:27:52 +0100381 self.to_terminate = True
tiernoee270722019-06-07 14:44:09 +0000382 if self.aiomain_task:
383 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
tiernof55e7ed2020-01-21 00:10:09 +0000384 if self.aiomain_task_admin:
385 self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)