Token Cache Management
[osm/NBI.git] / osm_nbi / subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 import logging
24 import threading
25 import asyncio
26 from http import HTTPStatus
27 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28 from osm_common.dbbase import DbException
29 from osm_common.msgbase import MsgException
30 from osm_nbi.engine import EngineException
31
32 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 class SubscriptionException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
42 class SubscriptionThread(threading.Thread):
43
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
51 self.to_terminate = False
52 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
58 self.aiomain_task_admin = None # asyncio task for receiving admin actions from kafka bus
59 self.aiomain_task = None # asyncio task for receiving normal actions from kafka bus
60 self.internal_session = { # used for a session to the engine methods
61 "project_id": (),
62 "set_project": (),
63 "admin": True,
64 "force": False,
65 "public": None,
66 "method": "delete",
67 }
68
69 async def start_kafka(self):
70 # timeout_wait_for_kafka = 3*60
71 kafka_working = True
72 while not self.to_terminate:
73 try:
74 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
75 # created.
76 # Before subscribe, send dummy messages
77 await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
78 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
79 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
80 if not kafka_working:
81 self.logger.critical("kafka is working again")
82 kafka_working = True
83 if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED":
84 await asyncio.sleep(10, loop=self.loop)
85 self.logger.debug("Starting admin subscription task")
86 self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
87 group_id=False,
88 aiocallback=self._msg_callback),
89 loop=self.loop)
90 if not self.aiomain_task or self.aiomain_task._state == "FINISHED":
91 await asyncio.sleep(10, loop=self.loop)
92 self.logger.debug("Starting non-admin subscription task")
93 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
94 aiocallback=self._msg_callback),
95 loop=self.loop)
96 await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
97 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
98 except Exception as e:
99 if self.to_terminate:
100 return
101 if kafka_working:
102 # logging only first time
103 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
104 kafka_working = False
105 await asyncio.sleep(10, loop=self.loop)
106
107 def run(self):
108 """
109 Start of the thread
110 :return: None
111 """
112 self.loop = asyncio.new_event_loop()
113 try:
114 if not self.db:
115 if self.config["database"]["driver"] == "mongo":
116 self.db = dbmongo.DbMongo()
117 self.db.db_connect(self.config["database"])
118 elif self.config["database"]["driver"] == "memory":
119 self.db = dbmemory.DbMemory()
120 self.db.db_connect(self.config["database"])
121 else:
122 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
123 self.config["database"]["driver"]))
124 if not self.msg:
125 config_msg = self.config["message"].copy()
126 config_msg["loop"] = self.loop
127 if config_msg["driver"] == "local":
128 self.msg = msglocal.MsgLocal()
129 self.msg.connect(config_msg)
130 elif config_msg["driver"] == "kafka":
131 self.msg = msgkafka.MsgKafka()
132 self.msg.connect(config_msg)
133 else:
134 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
135 config_msg["driver"]))
136
137 except (DbException, MsgException) as e:
138 raise SubscriptionException(str(e), http_code=e.http_code)
139
140 self.logger.debug("Starting")
141 while not self.to_terminate:
142 try:
143
144 self.loop.run_until_complete(asyncio.ensure_future(self.start_kafka(), loop=self.loop))
145 # except asyncio.CancelledError:
146 # break # if cancelled it should end, breaking loop
147 except Exception as e:
148 if not self.to_terminate:
149 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
150
151 self.logger.debug("Finishing")
152 self._stop()
153 self.loop.close()
154
155 async def _msg_callback(self, topic, command, params):
156 """
157 Callback to process a received message from kafka
158 :param topic: topic received
159 :param command: command received
160 :param params: rest of parameters
161 :return: None
162 """
163 msg_to_send = []
164 try:
165 if topic == "ns":
166 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
167 self.logger.debug("received ns terminated {}".format(params))
168 if params.get("autoremove"):
169 self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
170 not_send_msg=msg_to_send)
171 self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
172 elif topic == "nsi":
173 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
174 self.logger.debug("received nsi terminated {}".format(params))
175 if params.get("autoremove"):
176 self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
177 not_send_msg=msg_to_send)
178 self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
179 elif topic == "admin":
180 self.logger.debug("received {} {} {}".format(topic, command, params))
181 if command in ["echo", "ping"]: # ignored commands
182 pass
183 elif command == "revoke_token":
184 if params:
185 if isinstance(params, dict) and "_id" in params:
186 tid = params.get("_id")
187 self.engine.authenticator.tokens_cache.pop(tid, None)
188 self.logger.debug("token '{}' removed from token_cache".format(tid))
189 else:
190 self.logger.debug("unrecognized params in command '{} {}': {}"
191 .format(topic, command, params))
192 else:
193 self.engine.authenticator.tokens_cache.clear()
194 self.logger.debug("token_cache cleared")
195 else:
196 self.logger.debug("unrecognized command '{} {}'".format(topic, command))
197 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
198 # but content to be written is stored at msg_to_send
199 for msg in msg_to_send:
200 await self.msg.aiowrite(*msg, loop=self.loop)
201 except (EngineException, DbException, MsgException) as e:
202 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
203 except Exception as e:
204 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
205 exc_info=True)
206
207 def _stop(self):
208 """
209 Close all connections
210 :return: None
211 """
212 try:
213 if self.db:
214 self.db.db_disconnect()
215 if self.msg:
216 self.msg.disconnect()
217 except (DbException, MsgException) as e:
218 raise SubscriptionException(str(e), http_code=e.http_code)
219
220 def terminate(self):
221 """
222 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
223 but not immediately.
224 :return: None
225 """
226 self.to_terminate = True
227 if self.aiomain_task:
228 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)