1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
28 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
29 from osm_common
.dbbase
import DbException
30 from osm_common
.msgbase
import MsgException
31 from osm_nbi
.engine
import EngineException
32 from osm_nbi
.notifications
import NsLcmNotification
34 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
37 class SubscriptionException(Exception):
38 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
39 self
.http_code
= http_code
40 Exception.__init
__(self
, message
)
43 class SubscriptionThread(threading
.Thread
):
44 def __init__(self
, config
, engine
):
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
50 threading
.Thread
.__init
__(self
)
51 self
.to_terminate
= False
57 self
.logger
= logging
.getLogger("nbi.subscriptions")
58 self
.aiomain_task_admin
= (
59 None # asyncio task for receiving admin actions from kafka bus
62 None # asyncio task for receiving normal actions from kafka bus
64 self
.internal_session
= { # used for a session to the engine methods
74 async def start_kafka(self
):
75 # timeout_wait_for_kafka = 3*60
77 while not self
.to_terminate
:
79 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
81 # Before subscribe, send dummy messages
82 await self
.msg
.aiowrite(
83 "admin", "echo", "dummy message", loop
=self
.loop
85 await self
.msg
.aiowrite("ns", "echo", "dummy message", loop
=self
.loop
)
86 await self
.msg
.aiowrite("nsi", "echo", "dummy message", loop
=self
.loop
)
88 self
.logger
.critical("kafka is working again")
90 if not self
.aiomain_task_admin
:
91 await asyncio
.sleep(10, loop
=self
.loop
)
92 self
.logger
.debug("Starting admin subscription task")
93 self
.aiomain_task_admin
= asyncio
.ensure_future(
98 aiocallback
=self
._msg
_callback
,
102 if not self
.aiomain_task
:
103 await asyncio
.sleep(10, loop
=self
.loop
)
104 self
.logger
.debug("Starting non-admin subscription task")
105 self
.aiomain_task
= asyncio
.ensure_future(
109 aiocallback
=self
._msg
_callback
,
113 done
, _
= await asyncio
.wait(
114 [self
.aiomain_task
, self
.aiomain_task_admin
],
117 return_when
=asyncio
.FIRST_COMPLETED
,
120 if self
.aiomain_task_admin
in done
:
121 exc
= self
.aiomain_task_admin
.exception()
123 "admin subscription task exception: {}".format(exc
)
125 self
.aiomain_task_admin
= None
126 if self
.aiomain_task
in done
:
127 exc
= self
.aiomain_task
.exception()
129 "non-admin subscription task exception: {}".format(exc
)
131 self
.aiomain_task
= None
132 except asyncio
.CancelledError
:
134 except Exception as e
:
135 if self
.to_terminate
:
138 # logging only first time
139 self
.logger
.critical(
140 "Error accessing kafka '{}'. Retrying ...".format(e
)
142 kafka_working
= False
143 await asyncio
.sleep(10, loop
=self
.loop
)
150 self
.loop
= asyncio
.new_event_loop()
153 if self
.config
["database"]["driver"] == "mongo":
154 self
.db
= dbmongo
.DbMongo()
155 self
.db
.db_connect(self
.config
["database"])
156 elif self
.config
["database"]["driver"] == "memory":
157 self
.db
= dbmemory
.DbMemory()
158 self
.db
.db_connect(self
.config
["database"])
160 raise SubscriptionException(
161 "Invalid configuration param '{}' at '[database]':'driver'".format(
162 self
.config
["database"]["driver"]
166 config_msg
= self
.config
["message"].copy()
167 config_msg
["loop"] = self
.loop
168 if config_msg
["driver"] == "local":
169 self
.msg
= msglocal
.MsgLocal()
170 self
.msg
.connect(config_msg
)
171 elif config_msg
["driver"] == "kafka":
172 self
.msg
= msgkafka
.MsgKafka()
173 self
.msg
.connect(config_msg
)
175 raise SubscriptionException(
176 "Invalid configuration param '{}' at '[message]':'driver'".format(
180 self
.nslcm
= NsLcmNotification(self
.db
)
181 except (DbException
, MsgException
) as e
:
182 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
184 self
.logger
.debug("Starting")
185 while not self
.to_terminate
:
188 self
.loop
.run_until_complete(
189 asyncio
.ensure_future(self
.start_kafka(), loop
=self
.loop
)
191 # except asyncio.CancelledError:
192 # break # if cancelled it should end, breaking loop
193 except Exception as e
:
194 if not self
.to_terminate
:
195 self
.logger
.exception(
196 "Exception '{}' at messaging read loop".format(e
), exc_info
=True
199 self
.logger
.debug("Finishing")
203 async def _msg_callback(self
, topic
, command
, params
):
205 Callback to process a received message from kafka
206 :param topic: topic received
207 :param command: command received
208 :param params: rest of parameters
214 if command
== "terminated" and params
["operationState"] in (
216 "PARTIALLY_COMPLETED",
218 self
.logger
.debug("received ns terminated {}".format(params
))
219 if params
.get("autoremove"):
220 self
.engine
.del_item(
221 self
.internal_session
,
223 _id
=params
["nsr_id"],
224 not_send_msg
=msg_to_send
,
227 "ns={} deleted from database".format(params
["nsr_id"])
229 # Check for nslcm notification
230 if isinstance(params
, dict):
231 # Check availability of operationState and command
233 (not params
.get("operationState"))
235 or (not params
.get("operationParams"))
238 "Message can not be used for notification of nslcm"
241 nsd_id
= params
["operationParams"].get("nsdId")
242 ns_instance_id
= params
["operationParams"].get("nsInstanceId")
243 # Any one among nsd_id, ns_instance_id should be present.
244 if not (nsd_id
or ns_instance_id
):
246 "Message can not be used for notification of nslcm"
249 op_state
= params
["operationState"]
252 "command": command
.upper(),
255 subscribers
= self
.nslcm
.get_subscribers(
262 # self.logger.debug("subscribers list: ")
263 # self.logger.debug(subscribers)
265 asyncio
.ensure_future(
266 self
.nslcm
.send_notifications(
267 subscribers
, loop
=self
.loop
273 "Message can not be used for notification of nslcm"
276 if command
== "terminated" and params
["operationState"] in (
278 "PARTIALLY_COMPLETED",
280 self
.logger
.debug("received nsi terminated {}".format(params
))
281 if params
.get("autoremove"):
282 self
.engine
.del_item(
283 self
.internal_session
,
285 _id
=params
["nsir_id"],
286 not_send_msg
=msg_to_send
,
289 "nsis={} deleted from database".format(params
["nsir_id"])
291 elif topic
== "admin":
292 self
.logger
.debug("received {} {} {}".format(topic
, command
, params
))
293 if command
in ["echo", "ping"]: # ignored commands
295 elif command
== "revoke_token":
297 if isinstance(params
, dict) and "_id" in params
:
298 tid
= params
.get("_id")
299 self
.engine
.authenticator
.tokens_cache
.pop(tid
, None)
301 "token '{}' removed from token_cache".format(tid
)
305 "unrecognized params in command '{} {}': {}".format(
306 topic
, command
, params
310 self
.engine
.authenticator
.tokens_cache
.clear()
311 self
.logger
.debug("token_cache cleared")
314 "unrecognized command '{} {}'".format(topic
, command
)
316 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
317 # but content to be written is stored at msg_to_send
318 for msg
in msg_to_send
:
319 await self
.msg
.aiowrite(*msg
, loop
=self
.loop
)
320 except (EngineException
, DbException
, MsgException
) as e
:
322 "Error while processing topic={} command={}: {}".format(
326 except Exception as e
:
327 self
.logger
.exception(
328 "Exception while processing topic={} command={}: {}".format(
336 Close all connections
341 self
.db
.db_disconnect()
343 self
.msg
.disconnect()
344 except (DbException
, MsgException
) as e
:
345 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
349 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
353 self
.to_terminate
= True
354 if self
.aiomain_task
:
355 self
.loop
.call_soon_threadsafe(self
.aiomain_task
.cancel
)
356 if self
.aiomain_task_admin
:
357 self
.loop
.call_soon_threadsafe(self
.aiomain_task_admin
.cancel
)