blob: ec70d0c2755cb0405facbd8df0b0a19b5c4e03a0 [file] [log] [blame]
tierno932499c2019-01-28 17:28:10 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus implementing all the subscriptions.
18It is based on asyncio.
19To avoid race conditions it uses same engine class as the main module for database changes
20For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21"""
22
23import logging
24import threading
25import asyncio
26from http import HTTPStatus
K Sai Kiranbb70c812020-04-28 14:48:31 +053027
tierno932499c2019-01-28 17:28:10 +000028from osm_common import dbmongo, dbmemory, msglocal, msgkafka
29from osm_common.dbbase import DbException
30from osm_common.msgbase import MsgException
tierno23acf402019-08-28 13:36:34 +000031from osm_nbi.engine import EngineException
K Sai Kiranbb70c812020-04-28 14:48:31 +053032from osm_nbi.notifications import NsLcmNotification
tierno932499c2019-01-28 17:28:10 +000033
34__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35
36
37class SubscriptionException(Exception):
38
39 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
40 self.http_code = http_code
41 Exception.__init__(self, message)
42
43
44class SubscriptionThread(threading.Thread):
45
46 def __init__(self, config, engine):
47 """
48 Constructor of class
49 :param config: configuration parameters of database and messaging
50 :param engine: an instance of Engine class, used for deleting instances
51 """
52 threading.Thread.__init__(self)
tierno65ca36d2019-02-12 19:27:52 +010053 self.to_terminate = False
tierno932499c2019-01-28 17:28:10 +000054 self.config = config
55 self.db = None
56 self.msg = None
57 self.engine = engine
58 self.loop = None
59 self.logger = logging.getLogger("nbi.subscriptions")
delacruzramoad682a52019-12-10 16:26:34 +010060 self.aiomain_task_admin = None # asyncio task for receiving admin actions from kafka bus
61 self.aiomain_task = None # asyncio task for receiving normal actions from kafka bus
tierno932499c2019-01-28 17:28:10 +000062 self.internal_session = { # used for a session to the engine methods
tierno86e916a2019-05-29 21:39:37 +000063 "project_id": (),
64 "set_project": (),
65 "admin": True,
66 "force": False,
67 "public": None,
68 "method": "delete",
tierno932499c2019-01-28 17:28:10 +000069 }
K Sai Kiranbb70c812020-04-28 14:48:31 +053070 self.nslcm = None
tierno932499c2019-01-28 17:28:10 +000071
tiernoee270722019-06-07 14:44:09 +000072 async def start_kafka(self):
73 # timeout_wait_for_kafka = 3*60
74 kafka_working = True
75 while not self.to_terminate:
76 try:
77 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
78 # created.
79 # Before subscribe, send dummy messages
80 await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
81 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
82 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
83 if not kafka_working:
84 self.logger.critical("kafka is working again")
85 kafka_working = True
tiernof55e7ed2020-01-21 00:10:09 +000086 if not self.aiomain_task_admin:
delacruzramoad682a52019-12-10 16:26:34 +010087 await asyncio.sleep(10, loop=self.loop)
88 self.logger.debug("Starting admin subscription task")
89 self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
90 group_id=False,
91 aiocallback=self._msg_callback),
92 loop=self.loop)
tiernof55e7ed2020-01-21 00:10:09 +000093 if not self.aiomain_task:
delacruzramoad682a52019-12-10 16:26:34 +010094 await asyncio.sleep(10, loop=self.loop)
95 self.logger.debug("Starting non-admin subscription task")
96 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
97 aiocallback=self._msg_callback),
98 loop=self.loop)
tiernof55e7ed2020-01-21 00:10:09 +000099 done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
100 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
101 try:
102 if self.aiomain_task_admin in done:
103 exc = self.aiomain_task_admin.exception()
104 self.logger.error("admin subscription task exception: {}".format(exc))
105 self.aiomain_task_admin = None
106 if self.aiomain_task in done:
107 exc = self.aiomain_task.exception()
108 self.logger.error("non-admin subscription task exception: {}".format(exc))
109 self.aiomain_task = None
110 except asyncio.CancelledError:
111 pass
tiernoee270722019-06-07 14:44:09 +0000112 except Exception as e:
113 if self.to_terminate:
114 return
115 if kafka_working:
116 # logging only first time
117 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
118 kafka_working = False
119 await asyncio.sleep(10, loop=self.loop)
120
tierno932499c2019-01-28 17:28:10 +0000121 def run(self):
122 """
123 Start of the thread
124 :return: None
125 """
126 self.loop = asyncio.new_event_loop()
127 try:
128 if not self.db:
129 if self.config["database"]["driver"] == "mongo":
130 self.db = dbmongo.DbMongo()
131 self.db.db_connect(self.config["database"])
132 elif self.config["database"]["driver"] == "memory":
133 self.db = dbmemory.DbMemory()
134 self.db.db_connect(self.config["database"])
135 else:
136 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
137 self.config["database"]["driver"]))
138 if not self.msg:
139 config_msg = self.config["message"].copy()
140 config_msg["loop"] = self.loop
141 if config_msg["driver"] == "local":
142 self.msg = msglocal.MsgLocal()
143 self.msg.connect(config_msg)
144 elif config_msg["driver"] == "kafka":
145 self.msg = msgkafka.MsgKafka()
146 self.msg.connect(config_msg)
147 else:
148 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
149 config_msg["driver"]))
K Sai Kiranbb70c812020-04-28 14:48:31 +0530150 self.nslcm = NsLcmNotification(self.db)
tierno932499c2019-01-28 17:28:10 +0000151 except (DbException, MsgException) as e:
152 raise SubscriptionException(str(e), http_code=e.http_code)
153
154 self.logger.debug("Starting")
tierno65ca36d2019-02-12 19:27:52 +0100155 while not self.to_terminate:
tierno932499c2019-01-28 17:28:10 +0000156 try:
tiernoee270722019-06-07 14:44:09 +0000157
158 self.loop.run_until_complete(asyncio.ensure_future(self.start_kafka(), loop=self.loop))
tierno65ca36d2019-02-12 19:27:52 +0100159 # except asyncio.CancelledError:
160 # break # if cancelled it should end, breaking loop
tierno932499c2019-01-28 17:28:10 +0000161 except Exception as e:
tierno65ca36d2019-02-12 19:27:52 +0100162 if not self.to_terminate:
163 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
tierno932499c2019-01-28 17:28:10 +0000164
165 self.logger.debug("Finishing")
166 self._stop()
167 self.loop.close()
168
tiernobee3bad2019-12-05 12:26:01 +0000169 async def _msg_callback(self, topic, command, params):
tierno932499c2019-01-28 17:28:10 +0000170 """
171 Callback to process a received message from kafka
172 :param topic: topic received
173 :param command: command received
174 :param params: rest of parameters
175 :return: None
176 """
tiernobee3bad2019-12-05 12:26:01 +0000177 msg_to_send = []
tierno932499c2019-01-28 17:28:10 +0000178 try:
179 if topic == "ns":
Felipe Vicens09e65422019-01-22 15:06:46 +0100180 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
tierno932499c2019-01-28 17:28:10 +0000181 self.logger.debug("received ns terminated {}".format(params))
182 if params.get("autoremove"):
tiernobee3bad2019-12-05 12:26:01 +0000183 self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
184 not_send_msg=msg_to_send)
tierno932499c2019-01-28 17:28:10 +0000185 self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
K Sai Kiranbb70c812020-04-28 14:48:31 +0530186 # Check for nslcm notification
187 if isinstance(params, dict):
188 # Check availability of operationState and command
189 if (not params.get("operationState")) or (not command) or (not params.get("operationParams")):
190 self.logger.debug("Message can not be used for notification of nslcm")
191 else:
192 nsd_id = params["operationParams"].get("nsdId")
193 ns_instance_id = params["operationParams"].get("nsInstanceId")
194 # Any one among nsd_id, ns_instance_id should be present.
195 if not (nsd_id or ns_instance_id):
196 self.logger.debug("Message can not be used for notification of nslcm")
197 else:
198 op_state = params["operationState"]
199 event_details = {"topic": topic, "command": command.upper(), "params": params}
200 subscribers = self.nslcm.get_subscribers(nsd_id, ns_instance_id, command.upper(), op_state,
201 event_details)
202 # self.logger.debug("subscribers list: ")
203 # self.logger.debug(subscribers)
tierno2278fa42020-08-10 13:53:57 +0000204 if subscribers:
205 asyncio.ensure_future(self.nslcm.send_notifications(subscribers, loop=self.loop),
206 loop=self.loop)
K Sai Kiranbb70c812020-04-28 14:48:31 +0530207 else:
208 self.logger.debug("Message can not be used for notification of nslcm")
tiernobee3bad2019-12-05 12:26:01 +0000209 elif topic == "nsi":
Felipe Vicens09e65422019-01-22 15:06:46 +0100210 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
211 self.logger.debug("received nsi terminated {}".format(params))
212 if params.get("autoremove"):
tiernobee3bad2019-12-05 12:26:01 +0000213 self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
214 not_send_msg=msg_to_send)
Felipe Vicens09e65422019-01-22 15:06:46 +0100215 self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
delacruzramoad682a52019-12-10 16:26:34 +0100216 elif topic == "admin":
217 self.logger.debug("received {} {} {}".format(topic, command, params))
218 if command in ["echo", "ping"]: # ignored commands
219 pass
220 elif command == "revoke_token":
221 if params:
222 if isinstance(params, dict) and "_id" in params:
223 tid = params.get("_id")
224 self.engine.authenticator.tokens_cache.pop(tid, None)
225 self.logger.debug("token '{}' removed from token_cache".format(tid))
226 else:
227 self.logger.debug("unrecognized params in command '{} {}': {}"
228 .format(topic, command, params))
229 else:
230 self.engine.authenticator.tokens_cache.clear()
231 self.logger.debug("token_cache cleared")
232 else:
233 self.logger.debug("unrecognized command '{} {}'".format(topic, command))
234 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
tiernobee3bad2019-12-05 12:26:01 +0000235 # but content to be written is stored at msg_to_send
236 for msg in msg_to_send:
237 await self.msg.aiowrite(*msg, loop=self.loop)
tierno932499c2019-01-28 17:28:10 +0000238 except (EngineException, DbException, MsgException) as e:
239 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
240 except Exception as e:
241 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
242 exc_info=True)
243
244 def _stop(self):
245 """
246 Close all connections
247 :return: None
248 """
249 try:
250 if self.db:
251 self.db.db_disconnect()
252 if self.msg:
253 self.msg.disconnect()
254 except (DbException, MsgException) as e:
255 raise SubscriptionException(str(e), http_code=e.http_code)
256
257 def terminate(self):
258 """
259 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
260 but not immediately.
261 :return: None
262 """
tierno65ca36d2019-02-12 19:27:52 +0100263 self.to_terminate = True
tiernoee270722019-06-07 14:44:09 +0000264 if self.aiomain_task:
265 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
tiernof55e7ed2020-01-21 00:10:09 +0000266 if self.aiomain_task_admin:
267 self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)