1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
0 |
""" |
17 |
|
This module implements a thread that reads from kafka bus implementing all the subscriptions. |
18 |
|
It is based on asyncio. |
19 |
|
To avoid race conditions it uses same engine class as the main module for database changes |
20 |
|
For the moment this module only deletes NS instances when they are terminated with the autoremove flag |
21 |
|
""" |
22 |
|
|
23 |
0 |
import logging |
24 |
0 |
import threading |
25 |
0 |
import asyncio |
26 |
0 |
from http import HTTPStatus |
27 |
|
|
28 |
0 |
from osm_common import dbmongo, dbmemory, msglocal, msgkafka |
29 |
0 |
from osm_common.dbbase import DbException |
30 |
0 |
from osm_common.msgbase import MsgException |
31 |
0 |
from osm_nbi.engine import EngineException |
32 |
0 |
from osm_nbi.notifications import NsLcmNotification |
33 |
|
|
34 |
0 |
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" |
35 |
|
|
36 |
|
|
37 |
0 |
class SubscriptionException(Exception): |
38 |
|
|
39 |
0 |
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): |
40 |
0 |
self.http_code = http_code |
41 |
0 |
Exception.__init__(self, message) |
42 |
|
|
43 |
|
|
44 |
0 |
class SubscriptionThread(threading.Thread): |
45 |
|
|
46 |
0 |
def __init__(self, config, engine): |
47 |
|
""" |
48 |
|
Constructor of class |
49 |
|
:param config: configuration parameters of database and messaging |
50 |
|
:param engine: an instance of Engine class, used for deleting instances |
51 |
|
""" |
52 |
0 |
threading.Thread.__init__(self) |
53 |
0 |
self.to_terminate = False |
54 |
0 |
self.config = config |
55 |
0 |
self.db = None |
56 |
0 |
self.msg = None |
57 |
0 |
self.engine = engine |
58 |
0 |
self.loop = None |
59 |
0 |
self.logger = logging.getLogger("nbi.subscriptions") |
60 |
0 |
self.aiomain_task_admin = None # asyncio task for receiving admin actions from kafka bus |
61 |
0 |
self.aiomain_task = None # asyncio task for receiving normal actions from kafka bus |
62 |
0 |
self.internal_session = { # used for a session to the engine methods |
63 |
|
"project_id": (), |
64 |
|
"set_project": (), |
65 |
|
"admin": True, |
66 |
|
"force": False, |
67 |
|
"public": None, |
68 |
|
"method": "delete", |
69 |
|
} |
70 |
0 |
self.nslcm = None |
71 |
|
|
72 |
0 |
async def start_kafka(self): |
73 |
|
# timeout_wait_for_kafka = 3*60 |
74 |
0 |
kafka_working = True |
75 |
0 |
while not self.to_terminate: |
76 |
0 |
try: |
77 |
|
# bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been |
78 |
|
# created. |
79 |
|
# Before subscribe, send dummy messages |
80 |
0 |
await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop) |
81 |
0 |
await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop) |
82 |
0 |
await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop) |
83 |
0 |
if not kafka_working: |
84 |
0 |
self.logger.critical("kafka is working again") |
85 |
0 |
kafka_working = True |
86 |
0 |
if not self.aiomain_task_admin: |
87 |
0 |
await asyncio.sleep(10, loop=self.loop) |
88 |
0 |
self.logger.debug("Starting admin subscription task") |
89 |
0 |
self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop, |
90 |
|
group_id=False, |
91 |
|
aiocallback=self._msg_callback), |
92 |
|
loop=self.loop) |
93 |
0 |
if not self.aiomain_task: |
94 |
0 |
await asyncio.sleep(10, loop=self.loop) |
95 |
0 |
self.logger.debug("Starting non-admin subscription task") |
96 |
0 |
self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop, |
97 |
|
aiocallback=self._msg_callback), |
98 |
|
loop=self.loop) |
99 |
0 |
done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin], |
100 |
|
timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED) |
101 |
0 |
try: |
102 |
0 |
if self.aiomain_task_admin in done: |
103 |
0 |
exc = self.aiomain_task_admin.exception() |
104 |
0 |
self.logger.error("admin subscription task exception: {}".format(exc)) |
105 |
0 |
self.aiomain_task_admin = None |
106 |
0 |
if self.aiomain_task in done: |
107 |
0 |
exc = self.aiomain_task.exception() |
108 |
0 |
self.logger.error("non-admin subscription task exception: {}".format(exc)) |
109 |
0 |
self.aiomain_task = None |
110 |
0 |
except asyncio.CancelledError: |
111 |
0 |
pass |
112 |
0 |
except Exception as e: |
113 |
0 |
if self.to_terminate: |
114 |
0 |
return |
115 |
0 |
if kafka_working: |
116 |
|
# logging only first time |
117 |
0 |
self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e)) |
118 |
0 |
kafka_working = False |
119 |
0 |
await asyncio.sleep(10, loop=self.loop) |
120 |
|
|
121 |
0 |
def run(self): |
122 |
|
""" |
123 |
|
Start of the thread |
124 |
|
:return: None |
125 |
|
""" |
126 |
0 |
self.loop = asyncio.new_event_loop() |
127 |
0 |
try: |
128 |
0 |
if not self.db: |
129 |
0 |
if self.config["database"]["driver"] == "mongo": |
130 |
0 |
self.db = dbmongo.DbMongo() |
131 |
0 |
self.db.db_connect(self.config["database"]) |
132 |
0 |
elif self.config["database"]["driver"] == "memory": |
133 |
0 |
self.db = dbmemory.DbMemory() |
134 |
0 |
self.db.db_connect(self.config["database"]) |
135 |
|
else: |
136 |
0 |
raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format( |
137 |
|
self.config["database"]["driver"])) |
138 |
0 |
if not self.msg: |
139 |
0 |
config_msg = self.config["message"].copy() |
140 |
0 |
config_msg["loop"] = self.loop |
141 |
0 |
if config_msg["driver"] == "local": |
142 |
0 |
self.msg = msglocal.MsgLocal() |
143 |
0 |
self.msg.connect(config_msg) |
144 |
0 |
elif config_msg["driver"] == "kafka": |
145 |
0 |
self.msg = msgkafka.MsgKafka() |
146 |
0 |
self.msg.connect(config_msg) |
147 |
|
else: |
148 |
0 |
raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format( |
149 |
|
config_msg["driver"])) |
150 |
0 |
self.nslcm = NsLcmNotification(self.db) |
151 |
0 |
except (DbException, MsgException) as e: |
152 |
0 |
raise SubscriptionException(str(e), http_code=e.http_code) |
153 |
|
|
154 |
0 |
self.logger.debug("Starting") |
155 |
0 |
while not self.to_terminate: |
156 |
0 |
try: |
157 |
|
|
158 |
0 |
self.loop.run_until_complete(asyncio.ensure_future(self.start_kafka(), loop=self.loop)) |
159 |
|
# except asyncio.CancelledError: |
160 |
|
# break # if cancelled it should end, breaking loop |
161 |
0 |
except Exception as e: |
162 |
0 |
if not self.to_terminate: |
163 |
0 |
self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True) |
164 |
|
|
165 |
0 |
self.logger.debug("Finishing") |
166 |
0 |
self._stop() |
167 |
0 |
self.loop.close() |
168 |
|
|
169 |
0 |
async def _msg_callback(self, topic, command, params): |
170 |
|
""" |
171 |
|
Callback to process a received message from kafka |
172 |
|
:param topic: topic received |
173 |
|
:param command: command received |
174 |
|
:param params: rest of parameters |
175 |
|
:return: None |
176 |
|
""" |
177 |
0 |
msg_to_send = [] |
178 |
0 |
try: |
179 |
0 |
if topic == "ns": |
180 |
0 |
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): |
181 |
0 |
self.logger.debug("received ns terminated {}".format(params)) |
182 |
0 |
if params.get("autoremove"): |
183 |
0 |
self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"], |
184 |
|
not_send_msg=msg_to_send) |
185 |
0 |
self.logger.debug("ns={} deleted from database".format(params["nsr_id"])) |
186 |
|
# Check for nslcm notification |
187 |
0 |
if isinstance(params, dict): |
188 |
|
# Check availability of operationState and command |
189 |
0 |
if (not params.get("operationState")) or (not command) or (not params.get("operationParams")): |
190 |
0 |
self.logger.debug("Message can not be used for notification of nslcm") |
191 |
|
else: |
192 |
0 |
nsd_id = params["operationParams"].get("nsdId") |
193 |
0 |
ns_instance_id = params["operationParams"].get("nsInstanceId") |
194 |
|
# Any one among nsd_id, ns_instance_id should be present. |
195 |
0 |
if not (nsd_id or ns_instance_id): |
196 |
0 |
self.logger.debug("Message can not be used for notification of nslcm") |
197 |
|
else: |
198 |
0 |
op_state = params["operationState"] |
199 |
0 |
event_details = {"topic": topic, "command": command.upper(), "params": params} |
200 |
0 |
subscribers = self.nslcm.get_subscribers(nsd_id, ns_instance_id, command.upper(), op_state, |
201 |
|
event_details) |
202 |
|
# self.logger.debug("subscribers list: ") |
203 |
|
# self.logger.debug(subscribers) |
204 |
0 |
if subscribers: |
205 |
0 |
asyncio.ensure_future(self.nslcm.send_notifications(subscribers, loop=self.loop), |
206 |
|
loop=self.loop) |
207 |
|
else: |
208 |
0 |
self.logger.debug("Message can not be used for notification of nslcm") |
209 |
0 |
elif topic == "nsi": |
210 |
0 |
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): |
211 |
0 |
self.logger.debug("received nsi terminated {}".format(params)) |
212 |
0 |
if params.get("autoremove"): |
213 |
0 |
self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"], |
214 |
|
not_send_msg=msg_to_send) |
215 |
0 |
self.logger.debug("nsis={} deleted from database".format(params["nsir_id"])) |
216 |
0 |
elif topic == "admin": |
217 |
0 |
self.logger.debug("received {} {} {}".format(topic, command, params)) |
218 |
0 |
if command in ["echo", "ping"]: # ignored commands |
219 |
0 |
pass |
220 |
0 |
elif command == "revoke_token": |
221 |
0 |
if params: |
222 |
0 |
if isinstance(params, dict) and "_id" in params: |
223 |
0 |
tid = params.get("_id") |
224 |
0 |
self.engine.authenticator.tokens_cache.pop(tid, None) |
225 |
0 |
self.logger.debug("token '{}' removed from token_cache".format(tid)) |
226 |
|
else: |
227 |
0 |
self.logger.debug("unrecognized params in command '{} {}': {}" |
228 |
|
.format(topic, command, params)) |
229 |
|
else: |
230 |
0 |
self.engine.authenticator.tokens_cache.clear() |
231 |
0 |
self.logger.debug("token_cache cleared") |
232 |
|
else: |
233 |
0 |
self.logger.debug("unrecognized command '{} {}'".format(topic, command)) |
234 |
|
# writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, |
235 |
|
# but content to be written is stored at msg_to_send |
236 |
0 |
for msg in msg_to_send: |
237 |
0 |
await self.msg.aiowrite(*msg, loop=self.loop) |
238 |
0 |
except (EngineException, DbException, MsgException) as e: |
239 |
0 |
self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e)) |
240 |
0 |
except Exception as e: |
241 |
0 |
self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e), |
242 |
|
exc_info=True) |
243 |
|
|
244 |
0 |
def _stop(self): |
245 |
|
""" |
246 |
|
Close all connections |
247 |
|
:return: None |
248 |
|
""" |
249 |
0 |
try: |
250 |
0 |
if self.db: |
251 |
0 |
self.db.db_disconnect() |
252 |
0 |
if self.msg: |
253 |
0 |
self.msg.disconnect() |
254 |
0 |
except (DbException, MsgException) as e: |
255 |
0 |
raise SubscriptionException(str(e), http_code=e.http_code) |
256 |
|
|
257 |
0 |
def terminate(self): |
258 |
|
""" |
259 |
|
This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, |
260 |
|
but not immediately. |
261 |
|
:return: None |
262 |
|
""" |
263 |
0 |
self.to_terminate = True |
264 |
0 |
if self.aiomain_task: |
265 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task.cancel) |
266 |
0 |
if self.aiomain_task_admin: |
267 |
0 |
self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel) |