blob: 6810ccd6a0a37a159e37cbda9d5618f0c8480643 [file] [log] [blame]
tierno932499c2019-01-28 17:28:10 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus implementing all the subscriptions.
18It is based on asyncio.
19To avoid race conditions it uses same engine class as the main module for database changes
20For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21"""
22
23import logging
24import threading
25import asyncio
26from http import HTTPStatus
K Sai Kiranbb70c812020-04-28 14:48:31 +053027
tierno932499c2019-01-28 17:28:10 +000028from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29from osm_common.dbbase import DbException
30from osm_common.msgbase import MsgException
tierno23acf402019-08-28 13:36:34 +000031from osm_nbi.engine import EngineException
K Sai Kiranbb70c812020-04-28 14:48:31 +053032from osm_nbi.notifications import NsLcmNotification
tierno932499c2019-01-28 17:28:10 +000033
34__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37class SubscriptionException(Exception):
tierno932499c2019-01-28 17:28:10 +000038 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43class SubscriptionThread(threading.Thread):
tierno932499c2019-01-28 17:28:10 +000044 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
tierno65ca36d2019-02-12 19:27:52 +010051 self.to_terminate = False
tierno932499c2019-01-28 17:28:10 +000052 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
garciadeblas4568a372021-03-24 09:19:48 +010058 self.aiomain_task_admin = (
59 None # asyncio task for receiving admin actions from kafka bus
60 )
61 self.aiomain_task = (
62 None # asyncio task for receiving normal actions from kafka bus
63 )
tierno932499c2019-01-28 17:28:10 +000064 self.internal_session = { # used for a session to the engine methods
tierno86e916a2019-05-29 21:39:37 +000065 "project_id": (),
66 "set_project": (),
67 "admin": True,
68 "force": False,
69 "public": None,
70 "method": "delete",
tierno932499c2019-01-28 17:28:10 +000071 }
K Sai Kiranbb70c812020-04-28 14:48:31 +053072 self.nslcm = None
tierno932499c2019-01-28 17:28:10 +000073
tiernoee270722019-06-07 14:44:09 +000074 async def start_kafka(self):
75 # timeout_wait_for_kafka = 3*60
76 kafka_working = True
77 while not self.to_terminate:
78 try:
79 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
80 # created.
81 # Before subscribe, send dummy messages
garciadeblas4568a372021-03-24 09:19:48 +010082 await self.msg.aiowrite(
83 "admin", "echo", "dummy message", loop=self.loop
84 )
tiernoee270722019-06-07 14:44:09 +000085 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
86 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
87 if not kafka_working:
88 self.logger.critical("kafka is working again")
89 kafka_working = True
tiernof55e7ed2020-01-21 00:10:09 +000090 if not self.aiomain_task_admin:
delacruzramoad682a52019-12-10 16:26:34 +010091 await asyncio.sleep(10, loop=self.loop)
92 self.logger.debug("Starting admin subscription task")
garciadeblas4568a372021-03-24 09:19:48 +010093 self.aiomain_task_admin = asyncio.ensure_future(
94 self.msg.aioread(
95 ("admin",),
96 loop=self.loop,
97 group_id=False,
98 aiocallback=self._msg_callback,
99 ),
100 loop=self.loop,
101 )
tiernof55e7ed2020-01-21 00:10:09 +0000102 if not self.aiomain_task:
delacruzramoad682a52019-12-10 16:26:34 +0100103 await asyncio.sleep(10, loop=self.loop)
104 self.logger.debug("Starting non-admin subscription task")
garciadeblas4568a372021-03-24 09:19:48 +0100105 self.aiomain_task = asyncio.ensure_future(
106 self.msg.aioread(
107 ("ns", "nsi"),
108 loop=self.loop,
109 aiocallback=self._msg_callback,
110 ),
111 loop=self.loop,
112 )
113 done, _ = await asyncio.wait(
114 [self.aiomain_task, self.aiomain_task_admin],
115 timeout=None,
116 loop=self.loop,
117 return_when=asyncio.FIRST_COMPLETED,
118 )
tiernof55e7ed2020-01-21 00:10:09 +0000119 try:
120 if self.aiomain_task_admin in done:
121 exc = self.aiomain_task_admin.exception()
garciadeblas4568a372021-03-24 09:19:48 +0100122 self.logger.error(
123 "admin subscription task exception: {}".format(exc)
124 )
tiernof55e7ed2020-01-21 00:10:09 +0000125 self.aiomain_task_admin = None
126 if self.aiomain_task in done:
127 exc = self.aiomain_task.exception()
garciadeblas4568a372021-03-24 09:19:48 +0100128 self.logger.error(
129 "non-admin subscription task exception: {}".format(exc)
130 )
tiernof55e7ed2020-01-21 00:10:09 +0000131 self.aiomain_task = None
132 except asyncio.CancelledError:
133 pass
tiernoee270722019-06-07 14:44:09 +0000134 except Exception as e:
135 if self.to_terminate:
136 return
137 if kafka_working:
138 # logging only first time
garciadeblas4568a372021-03-24 09:19:48 +0100139 self.logger.critical(
140 "Error accessing kafka '{}'. Retrying ...".format(e)
141 )
tiernoee270722019-06-07 14:44:09 +0000142 kafka_working = False
143 await asyncio.sleep(10, loop=self.loop)
144
tierno932499c2019-01-28 17:28:10 +0000145 def run(self):
146 """
147 Start of the thread
148 :return: None
149 """
150 self.loop = asyncio.new_event_loop()
151 try:
152 if not self.db:
153 if self.config["database"]["driver"] == "mongo":
154 self.db = dbmongo.DbMongo()
155 self.db.db_connect(self.config["database"])
156 elif self.config["database"]["driver"] == "memory":
157 self.db = dbmemory.DbMemory()
158 self.db.db_connect(self.config["database"])
159 else:
garciadeblas4568a372021-03-24 09:19:48 +0100160 raise SubscriptionException(
161 "Invalid configuration param '{}' at '[database]':'driver'".format(
162 self.config["database"]["driver"]
163 )
164 )
tierno932499c2019-01-28 17:28:10 +0000165 if not self.msg:
166 config_msg = self.config["message"].copy()
167 config_msg["loop"] = self.loop
168 if config_msg["driver"] == "local":
169 self.msg = msglocal.MsgLocal()
170 self.msg.connect(config_msg)
171 elif config_msg["driver"] == "kafka":
172 self.msg = msgkafka.MsgKafka()
173 self.msg.connect(config_msg)
174 else:
garciadeblas4568a372021-03-24 09:19:48 +0100175 raise SubscriptionException(
176 "Invalid configuration param '{}' at '[message]':'driver'".format(
177 config_msg["driver"]
178 )
179 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530180 self.nslcm = NsLcmNotification(self.db)
tierno932499c2019-01-28 17:28:10 +0000181 except (DbException, MsgException) as e:
182 raise SubscriptionException(str(e), http_code=e.http_code)
183
184 self.logger.debug("Starting")
tierno65ca36d2019-02-12 19:27:52 +0100185 while not self.to_terminate:
tierno932499c2019-01-28 17:28:10 +0000186 try:
tiernoee270722019-06-07 14:44:09 +0000187
garciadeblas4568a372021-03-24 09:19:48 +0100188 self.loop.run_until_complete(
189 asyncio.ensure_future(self.start_kafka(), loop=self.loop)
190 )
tierno65ca36d2019-02-12 19:27:52 +0100191 # except asyncio.CancelledError:
192 # break # if cancelled it should end, breaking loop
tierno932499c2019-01-28 17:28:10 +0000193 except Exception as e:
tierno65ca36d2019-02-12 19:27:52 +0100194 if not self.to_terminate:
garciadeblas4568a372021-03-24 09:19:48 +0100195 self.logger.exception(
196 "Exception '{}' at messaging read loop".format(e), exc_info=True
197 )
tierno932499c2019-01-28 17:28:10 +0000198
199 self.logger.debug("Finishing")
200 self._stop()
201 self.loop.close()
202
tiernobee3bad2019-12-05 12:26:01 +0000203 async def _msg_callback(self, topic, command, params):
tierno932499c2019-01-28 17:28:10 +0000204 """
205 Callback to process a received message from kafka
206 :param topic: topic received
207 :param command: command received
208 :param params: rest of parameters
209 :return: None
210 """
tiernobee3bad2019-12-05 12:26:01 +0000211 msg_to_send = []
tierno932499c2019-01-28 17:28:10 +0000212 try:
213 if topic == "ns":
garciadeblas4568a372021-03-24 09:19:48 +0100214 if command == "terminated" and params["operationState"] in (
215 "COMPLETED",
216 "PARTIALLY_COMPLETED",
217 ):
tierno932499c2019-01-28 17:28:10 +0000218 self.logger.debug("received ns terminated {}".format(params))
219 if params.get("autoremove"):
garciadeblas4568a372021-03-24 09:19:48 +0100220 self.engine.del_item(
221 self.internal_session,
222 "nsrs",
223 _id=params["nsr_id"],
224 not_send_msg=msg_to_send,
225 )
226 self.logger.debug(
227 "ns={} deleted from database".format(params["nsr_id"])
228 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530229 # Check for nslcm notification
230 if isinstance(params, dict):
231 # Check availability of operationState and command
garciadeblas4568a372021-03-24 09:19:48 +0100232 if (
233 (not params.get("operationState"))
234 or (not command)
235 or (not params.get("operationParams"))
236 ):
237 self.logger.debug(
238 "Message can not be used for notification of nslcm"
239 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530240 else:
241 nsd_id = params["operationParams"].get("nsdId")
242 ns_instance_id = params["operationParams"].get("nsInstanceId")
243 # Any one among nsd_id, ns_instance_id should be present.
244 if not (nsd_id or ns_instance_id):
garciadeblas4568a372021-03-24 09:19:48 +0100245 self.logger.debug(
246 "Message can not be used for notification of nslcm"
247 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530248 else:
249 op_state = params["operationState"]
garciadeblas4568a372021-03-24 09:19:48 +0100250 event_details = {
251 "topic": topic,
252 "command": command.upper(),
253 "params": params,
254 }
255 subscribers = self.nslcm.get_subscribers(
256 nsd_id,
257 ns_instance_id,
258 command.upper(),
259 op_state,
260 event_details,
261 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530262 # self.logger.debug("subscribers list: ")
263 # self.logger.debug(subscribers)
tierno2278fa42020-08-10 13:53:57 +0000264 if subscribers:
garciadeblas4568a372021-03-24 09:19:48 +0100265 asyncio.ensure_future(
266 self.nslcm.send_notifications(
267 subscribers, loop=self.loop
268 ),
269 loop=self.loop,
270 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530271 else:
garciadeblas4568a372021-03-24 09:19:48 +0100272 self.logger.debug(
273 "Message can not be used for notification of nslcm"
274 )
tiernobee3bad2019-12-05 12:26:01 +0000275 elif topic == "nsi":
garciadeblas4568a372021-03-24 09:19:48 +0100276 if command == "terminated" and params["operationState"] in (
277 "COMPLETED",
278 "PARTIALLY_COMPLETED",
279 ):
Felipe Vicens09e65422019-01-22 15:06:46 +0100280 self.logger.debug("received nsi terminated {}".format(params))
281 if params.get("autoremove"):
garciadeblas4568a372021-03-24 09:19:48 +0100282 self.engine.del_item(
283 self.internal_session,
284 "nsis",
285 _id=params["nsir_id"],
286 not_send_msg=msg_to_send,
287 )
288 self.logger.debug(
289 "nsis={} deleted from database".format(params["nsir_id"])
290 )
delacruzramoad682a52019-12-10 16:26:34 +0100291 elif topic == "admin":
292 self.logger.debug("received {} {} {}".format(topic, command, params))
garciadeblas4568a372021-03-24 09:19:48 +0100293 if command in ["echo", "ping"]: # ignored commands
delacruzramoad682a52019-12-10 16:26:34 +0100294 pass
295 elif command == "revoke_token":
296 if params:
297 if isinstance(params, dict) and "_id" in params:
298 tid = params.get("_id")
299 self.engine.authenticator.tokens_cache.pop(tid, None)
garciadeblas4568a372021-03-24 09:19:48 +0100300 self.logger.debug(
301 "token '{}' removed from token_cache".format(tid)
302 )
delacruzramoad682a52019-12-10 16:26:34 +0100303 else:
garciadeblas4568a372021-03-24 09:19:48 +0100304 self.logger.debug(
305 "unrecognized params in command '{} {}': {}".format(
306 topic, command, params
307 )
308 )
delacruzramoad682a52019-12-10 16:26:34 +0100309 else:
310 self.engine.authenticator.tokens_cache.clear()
311 self.logger.debug("token_cache cleared")
312 else:
garciadeblas4568a372021-03-24 09:19:48 +0100313 self.logger.debug(
314 "unrecognized command '{} {}'".format(topic, command)
315 )
delacruzramoad682a52019-12-10 16:26:34 +0100316 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
tiernobee3bad2019-12-05 12:26:01 +0000317 # but content to be written is stored at msg_to_send
318 for msg in msg_to_send:
319 await self.msg.aiowrite(*msg, loop=self.loop)
tierno932499c2019-01-28 17:28:10 +0000320 except (EngineException, DbException, MsgException) as e:
garciadeblas4568a372021-03-24 09:19:48 +0100321 self.logger.error(
322 "Error while processing topic={} command={}: {}".format(
323 topic, command, e
324 )
325 )
tierno932499c2019-01-28 17:28:10 +0000326 except Exception as e:
garciadeblas4568a372021-03-24 09:19:48 +0100327 self.logger.exception(
328 "Exception while processing topic={} command={}: {}".format(
329 topic, command, e
330 ),
331 exc_info=True,
332 )
tierno932499c2019-01-28 17:28:10 +0000333
334 def _stop(self):
335 """
336 Close all connections
337 :return: None
338 """
339 try:
340 if self.db:
341 self.db.db_disconnect()
342 if self.msg:
343 self.msg.disconnect()
344 except (DbException, MsgException) as e:
345 raise SubscriptionException(str(e), http_code=e.http_code)
346
347 def terminate(self):
348 """
349 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
350 but not immediately.
351 :return: None
352 """
tierno65ca36d2019-02-12 19:27:52 +0100353 self.to_terminate = True
tiernoee270722019-06-07 14:44:09 +0000354 if self.aiomain_task:
355 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
tiernof55e7ed2020-01-21 00:10:09 +0000356 if self.aiomain_task_admin:
357 self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)