Feature 8178 VNF Repositories
[osm/NBI.git] / osm_nbi / subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 import logging
24 import threading
25 import asyncio
26 from http import HTTPStatus
27 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28 from osm_common.dbbase import DbException
29 from osm_common.msgbase import MsgException
30 from osm_nbi.engine import EngineException
31
32 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 class SubscriptionException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
42 class SubscriptionThread(threading.Thread):
43
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
51 self.to_terminate = False
52 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
58 self.aiomain_task_admin = None # asyncio task for receiving admin actions from kafka bus
59 self.aiomain_task = None # asyncio task for receiving normal actions from kafka bus
60 self.internal_session = { # used for a session to the engine methods
61 "project_id": (),
62 "set_project": (),
63 "admin": True,
64 "force": False,
65 "public": None,
66 "method": "delete",
67 }
68
69 async def start_kafka(self):
70 # timeout_wait_for_kafka = 3*60
71 kafka_working = True
72 while not self.to_terminate:
73 try:
74 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
75 # created.
76 # Before subscribe, send dummy messages
77 await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
78 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
79 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
80 if not kafka_working:
81 self.logger.critical("kafka is working again")
82 kafka_working = True
83 if not self.aiomain_task_admin:
84 await asyncio.sleep(10, loop=self.loop)
85 self.logger.debug("Starting admin subscription task")
86 self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
87 group_id=False,
88 aiocallback=self._msg_callback),
89 loop=self.loop)
90 if not self.aiomain_task:
91 await asyncio.sleep(10, loop=self.loop)
92 self.logger.debug("Starting non-admin subscription task")
93 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
94 aiocallback=self._msg_callback),
95 loop=self.loop)
96 done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
97 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
98 try:
99 if self.aiomain_task_admin in done:
100 exc = self.aiomain_task_admin.exception()
101 self.logger.error("admin subscription task exception: {}".format(exc))
102 self.aiomain_task_admin = None
103 if self.aiomain_task in done:
104 exc = self.aiomain_task.exception()
105 self.logger.error("non-admin subscription task exception: {}".format(exc))
106 self.aiomain_task = None
107 except asyncio.CancelledError:
108 pass
109 except Exception as e:
110 if self.to_terminate:
111 return
112 if kafka_working:
113 # logging only first time
114 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
115 kafka_working = False
116 await asyncio.sleep(10, loop=self.loop)
117
118 def run(self):
119 """
120 Start of the thread
121 :return: None
122 """
123 self.loop = asyncio.new_event_loop()
124 try:
125 if not self.db:
126 if self.config["database"]["driver"] == "mongo":
127 self.db = dbmongo.DbMongo()
128 self.db.db_connect(self.config["database"])
129 elif self.config["database"]["driver"] == "memory":
130 self.db = dbmemory.DbMemory()
131 self.db.db_connect(self.config["database"])
132 else:
133 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
134 self.config["database"]["driver"]))
135 if not self.msg:
136 config_msg = self.config["message"].copy()
137 config_msg["loop"] = self.loop
138 if config_msg["driver"] == "local":
139 self.msg = msglocal.MsgLocal()
140 self.msg.connect(config_msg)
141 elif config_msg["driver"] == "kafka":
142 self.msg = msgkafka.MsgKafka()
143 self.msg.connect(config_msg)
144 else:
145 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
146 config_msg["driver"]))
147
148 except (DbException, MsgException) as e:
149 raise SubscriptionException(str(e), http_code=e.http_code)
150
151 self.logger.debug("Starting")
152 while not self.to_terminate:
153 try:
154
155 self.loop.run_until_complete(asyncio.ensure_future(self.start_kafka(), loop=self.loop))
156 # except asyncio.CancelledError:
157 # break # if cancelled it should end, breaking loop
158 except Exception as e:
159 if not self.to_terminate:
160 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
161
162 self.logger.debug("Finishing")
163 self._stop()
164 self.loop.close()
165
166 async def _msg_callback(self, topic, command, params):
167 """
168 Callback to process a received message from kafka
169 :param topic: topic received
170 :param command: command received
171 :param params: rest of parameters
172 :return: None
173 """
174 msg_to_send = []
175 try:
176 if topic == "ns":
177 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
178 self.logger.debug("received ns terminated {}".format(params))
179 if params.get("autoremove"):
180 self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
181 not_send_msg=msg_to_send)
182 self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
183 elif topic == "nsi":
184 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
185 self.logger.debug("received nsi terminated {}".format(params))
186 if params.get("autoremove"):
187 self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
188 not_send_msg=msg_to_send)
189 self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
190 elif topic == "admin":
191 self.logger.debug("received {} {} {}".format(topic, command, params))
192 if command in ["echo", "ping"]: # ignored commands
193 pass
194 elif command == "revoke_token":
195 if params:
196 if isinstance(params, dict) and "_id" in params:
197 tid = params.get("_id")
198 self.engine.authenticator.tokens_cache.pop(tid, None)
199 self.logger.debug("token '{}' removed from token_cache".format(tid))
200 else:
201 self.logger.debug("unrecognized params in command '{} {}': {}"
202 .format(topic, command, params))
203 else:
204 self.engine.authenticator.tokens_cache.clear()
205 self.logger.debug("token_cache cleared")
206 else:
207 self.logger.debug("unrecognized command '{} {}'".format(topic, command))
208 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
209 # but content to be written is stored at msg_to_send
210 for msg in msg_to_send:
211 await self.msg.aiowrite(*msg, loop=self.loop)
212 except (EngineException, DbException, MsgException) as e:
213 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
214 except Exception as e:
215 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
216 exc_info=True)
217
218 def _stop(self):
219 """
220 Close all connections
221 :return: None
222 """
223 try:
224 if self.db:
225 self.db.db_disconnect()
226 if self.msg:
227 self.msg.disconnect()
228 except (DbException, MsgException) as e:
229 raise SubscriptionException(str(e), http_code=e.http_code)
230
231 def terminate(self):
232 """
233 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
234 but not immediately.
235 :return: None
236 """
237 self.to_terminate = True
238 if self.aiomain_task:
239 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
240 if self.aiomain_task_admin:
241 self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)