1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
28 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
29 from osm_common
.dbbase
import DbException
30 from osm_common
.msgbase
import MsgException
31 from osm_nbi
.engine
import EngineException
32 from osm_nbi
.notifications
import NsLcmNotification
, VnfLcmNotification
34 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
37 class SubscriptionException(Exception):
38 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
39 self
.http_code
= http_code
40 Exception.__init
__(self
, message
)
43 class SubscriptionThread(threading
.Thread
):
44 def __init__(self
, config
, engine
):
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
50 threading
.Thread
.__init
__(self
)
51 self
.to_terminate
= False
57 self
.logger
= logging
.getLogger("nbi.subscriptions")
58 self
.aiomain_task_admin
= (
59 None # asyncio task for receiving admin actions from kafka bus
62 None # asyncio task for receiving normal actions from kafka bus
64 self
.internal_session
= { # used for a session to the engine methods
75 async def start_kafka(self
):
76 # timeout_wait_for_kafka = 3*60
78 while not self
.to_terminate
:
80 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
82 # Before subscribe, send dummy messages
83 await self
.msg
.aiowrite(
84 "admin", "echo", "dummy message", loop
=self
.loop
86 await self
.msg
.aiowrite("ns", "echo", "dummy message", loop
=self
.loop
)
87 await self
.msg
.aiowrite("nsi", "echo", "dummy message", loop
=self
.loop
)
88 await self
.msg
.aiowrite("vnf", "echo", "dummy message", loop
=self
.loop
)
90 self
.logger
.critical("kafka is working again")
92 if not self
.aiomain_task_admin
:
93 await asyncio
.sleep(10, loop
=self
.loop
)
94 self
.logger
.debug("Starting admin subscription task")
95 self
.aiomain_task_admin
= asyncio
.ensure_future(
100 aiocallback
=self
._msg
_callback
,
104 if not self
.aiomain_task
:
105 await asyncio
.sleep(10, loop
=self
.loop
)
106 self
.logger
.debug("Starting non-admin subscription task")
107 self
.aiomain_task
= asyncio
.ensure_future(
109 ("ns", "nsi", "vnf"),
111 aiocallback
=self
._msg
_callback
,
115 done
, _
= await asyncio
.wait(
116 [self
.aiomain_task
, self
.aiomain_task_admin
],
119 return_when
=asyncio
.FIRST_COMPLETED
,
122 if self
.aiomain_task_admin
in done
:
123 exc
= self
.aiomain_task_admin
.exception()
125 "admin subscription task exception: {}".format(exc
)
127 self
.aiomain_task_admin
= None
128 if self
.aiomain_task
in done
:
129 exc
= self
.aiomain_task
.exception()
131 "non-admin subscription task exception: {}".format(exc
)
133 self
.aiomain_task
= None
134 except asyncio
.CancelledError
:
136 except Exception as e
:
137 if self
.to_terminate
:
140 # logging only first time
141 self
.logger
.critical(
142 "Error accessing kafka '{}'. Retrying ...".format(e
)
144 kafka_working
= False
145 await asyncio
.sleep(10, loop
=self
.loop
)
152 self
.loop
= asyncio
.new_event_loop()
155 if self
.config
["database"]["driver"] == "mongo":
156 self
.db
= dbmongo
.DbMongo()
157 self
.db
.db_connect(self
.config
["database"])
158 elif self
.config
["database"]["driver"] == "memory":
159 self
.db
= dbmemory
.DbMemory()
160 self
.db
.db_connect(self
.config
["database"])
162 raise SubscriptionException(
163 "Invalid configuration param '{}' at '[database]':'driver'".format(
164 self
.config
["database"]["driver"]
168 config_msg
= self
.config
["message"].copy()
169 config_msg
["loop"] = self
.loop
170 if config_msg
["driver"] == "local":
171 self
.msg
= msglocal
.MsgLocal()
172 self
.msg
.connect(config_msg
)
173 elif config_msg
["driver"] == "kafka":
174 self
.msg
= msgkafka
.MsgKafka()
175 self
.msg
.connect(config_msg
)
177 raise SubscriptionException(
178 "Invalid configuration param '{}' at '[message]':'driver'".format(
182 self
.nslcm
= NsLcmNotification(self
.db
)
183 self
.vnflcm
= VnfLcmNotification(self
.db
)
184 except (DbException
, MsgException
) as e
:
185 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
187 self
.logger
.debug("Starting")
188 while not self
.to_terminate
:
191 self
.loop
.run_until_complete(
192 asyncio
.ensure_future(self
.start_kafka(), loop
=self
.loop
)
194 # except asyncio.CancelledError:
195 # break # if cancelled it should end, breaking loop
196 except Exception as e
:
197 if not self
.to_terminate
:
198 self
.logger
.exception(
199 "Exception '{}' at messaging read loop".format(e
), exc_info
=True
202 self
.logger
.debug("Finishing")
206 async def _msg_callback(self
, topic
, command
, params
):
208 Callback to process a received message from kafka
209 :param topic: topic received
210 :param command: command received
211 :param params: rest of parameters
217 if command
== "terminated" and params
["operationState"] in (
219 "PARTIALLY_COMPLETED",
221 self
.logger
.debug("received ns terminated {}".format(params
))
222 if params
.get("autoremove"):
223 self
.engine
.del_item(
224 self
.internal_session
,
226 _id
=params
["nsr_id"],
227 not_send_msg
=msg_to_send
,
230 "ns={} deleted from database".format(params
["nsr_id"])
232 # Check for nslcm notification
233 if isinstance(params
, dict):
234 # Check availability of operationState and command
236 (not params
.get("operationState"))
238 or (not params
.get("operationParams"))
241 "Message can not be used for notification of nslcm"
244 nsd_id
= params
["operationParams"].get("nsdId")
245 ns_instance_id
= params
["operationParams"].get("nsInstanceId")
246 # Any one among nsd_id, ns_instance_id should be present.
247 if not (nsd_id
or ns_instance_id
):
249 "Message can not be used for notification of nslcm"
252 op_state
= params
["operationState"]
255 "command": command
.upper(),
258 subscribers
= self
.nslcm
.get_subscribers(
265 # self.logger.debug("subscribers list: ")
266 # self.logger.debug(subscribers)
268 asyncio
.ensure_future(
269 self
.nslcm
.send_notifications(
270 subscribers
, loop
=self
.loop
276 "Message can not be used for notification of nslcm"
279 if isinstance(params
, dict):
280 vnfd_id
= params
["vnfdId"]
281 vnf_instance_id
= params
["vnfInstanceId"]
282 if command
== "create" or command
== "delete":
285 op_state
= params
["operationState"]
288 "command": command
.upper(),
291 subscribers
= self
.vnflcm
.get_subscribers(
299 asyncio
.ensure_future(
300 self
.vnflcm
.send_notifications(
301 subscribers
, loop
=self
.loop
306 if command
== "terminated" and params
["operationState"] in (
308 "PARTIALLY_COMPLETED",
310 self
.logger
.debug("received nsi terminated {}".format(params
))
311 if params
.get("autoremove"):
312 self
.engine
.del_item(
313 self
.internal_session
,
315 _id
=params
["nsir_id"],
316 not_send_msg
=msg_to_send
,
319 "nsis={} deleted from database".format(params
["nsir_id"])
321 elif topic
== "admin":
322 self
.logger
.debug("received {} {} {}".format(topic
, command
, params
))
323 if command
in ["echo", "ping"]: # ignored commands
325 elif command
== "revoke_token":
327 if isinstance(params
, dict) and "_id" in params
:
328 tid
= params
.get("_id")
329 self
.engine
.authenticator
.tokens_cache
.pop(tid
, None)
331 "token '{}' removed from token_cache".format(tid
)
335 "unrecognized params in command '{} {}': {}".format(
336 topic
, command
, params
340 self
.engine
.authenticator
.tokens_cache
.clear()
341 self
.logger
.debug("token_cache cleared")
344 "unrecognized command '{} {}'".format(topic
, command
)
346 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
347 # but content to be written is stored at msg_to_send
348 for msg
in msg_to_send
:
349 await self
.msg
.aiowrite(*msg
, loop
=self
.loop
)
350 except (EngineException
, DbException
, MsgException
) as e
:
352 "Error while processing topic={} command={}: {}".format(
356 except Exception as e
:
357 self
.logger
.exception(
358 "Exception while processing topic={} command={}: {}".format(
366 Close all connections
371 self
.db
.db_disconnect()
373 self
.msg
.disconnect()
374 except (DbException
, MsgException
) as e
:
375 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
379 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
383 self
.to_terminate
= True
384 if self
.aiomain_task
:
385 self
.loop
.call_soon_threadsafe(self
.aiomain_task
.cancel
)
386 if self
.aiomain_task_admin
:
387 self
.loop
.call_soon_threadsafe(self
.aiomain_task_admin
.cancel
)