1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
28 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
29 from osm_common
.dbbase
import DbException
30 from osm_common
.msgbase
import MsgException
31 from osm_nbi
.engine
import EngineException
32 from osm_nbi
.notifications
import NsLcmNotification
, VnfLcmNotification
34 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
37 class SubscriptionException(Exception):
38 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
39 self
.http_code
= http_code
40 Exception.__init
__(self
, message
)
43 class SubscriptionThread(threading
.Thread
):
44 def __init__(self
, config
, engine
):
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
50 threading
.Thread
.__init
__(self
)
51 self
.to_terminate
= False
56 self
.logger
= logging
.getLogger("nbi.subscriptions")
57 self
.aiomain_task_admin
= (
58 None # asyncio task for receiving admin actions from kafka bus
61 None # asyncio task for receiving normal actions from kafka bus
63 self
.internal_session
= { # used for a session to the engine methods
74 async def start_kafka(self
):
75 # timeout_wait_for_kafka = 3*60
77 while not self
.to_terminate
:
79 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
81 # Before subscribe, send dummy messages
82 await self
.msg
.aiowrite(
87 await self
.msg
.aiowrite("ns", "echo", "dummy message")
88 await self
.msg
.aiowrite("nsi", "echo", "dummy message")
89 await self
.msg
.aiowrite("vnf", "echo", "dummy message")
91 self
.logger
.critical("kafka is working again")
93 if not self
.aiomain_task_admin
:
94 await asyncio
.sleep(10)
95 self
.logger
.debug("Starting admin subscription task")
96 self
.aiomain_task_admin
= asyncio
.ensure_future(
100 aiocallback
=self
._msg
_callback
,
103 if not self
.aiomain_task
:
104 await asyncio
.sleep(10)
105 self
.logger
.debug("Starting non-admin subscription task")
106 self
.aiomain_task
= asyncio
.ensure_future(
108 ("ns", "nsi", "vnf"),
109 aiocallback
=self
._msg
_callback
,
112 done
, _
= await asyncio
.wait(
113 [self
.aiomain_task
, self
.aiomain_task_admin
],
115 return_when
=asyncio
.FIRST_COMPLETED
,
118 if self
.aiomain_task_admin
in done
:
119 exc
= self
.aiomain_task_admin
.exception()
121 "admin subscription task exception: {}".format(exc
)
123 self
.aiomain_task_admin
= None
124 if self
.aiomain_task
in done
:
125 exc
= self
.aiomain_task
.exception()
127 "non-admin subscription task exception: {}".format(exc
)
129 self
.aiomain_task
= None
130 except asyncio
.CancelledError
:
132 except Exception as e
:
133 if self
.to_terminate
:
136 # logging only first time
137 self
.logger
.critical(
138 "Error accessing kafka '{}'. Retrying ...".format(e
)
140 kafka_working
= False
141 await asyncio
.sleep(10)
150 if self
.config
["database"]["driver"] == "mongo":
151 self
.db
= dbmongo
.DbMongo()
152 self
.db
.db_connect(self
.config
["database"])
153 elif self
.config
["database"]["driver"] == "memory":
154 self
.db
= dbmemory
.DbMemory()
155 self
.db
.db_connect(self
.config
["database"])
157 raise SubscriptionException(
158 "Invalid configuration param '{}' at '[database]':'driver'".format(
159 self
.config
["database"]["driver"]
163 config_msg
= self
.config
["message"].copy()
164 if config_msg
["driver"] == "local":
165 self
.msg
= msglocal
.MsgLocal()
166 self
.msg
.connect(config_msg
)
167 elif config_msg
["driver"] == "kafka":
168 self
.msg
= msgkafka
.MsgKafka()
169 self
.msg
.connect(config_msg
)
171 raise SubscriptionException(
172 "Invalid configuration param '{}' at '[message]':'driver'".format(
176 self
.nslcm
= NsLcmNotification(self
.db
)
177 self
.vnflcm
= VnfLcmNotification(self
.db
)
178 except (DbException
, MsgException
) as e
:
179 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
181 self
.logger
.debug("Starting")
182 while not self
.to_terminate
:
184 asyncio
.run(self
.start_kafka())
185 except Exception as e
:
186 if not self
.to_terminate
:
187 self
.logger
.exception(
188 "Exception '{}' at messaging read loop".format(e
), exc_info
=True
191 self
.logger
.debug("Finishing")
194 async def _msg_callback(self
, topic
, command
, params
):
196 Callback to process a received message from kafka
197 :param topic: topic received
198 :param command: command received
199 :param params: rest of parameters
205 if command
== "terminated" and params
["operationState"] in (
207 "PARTIALLY_COMPLETED",
209 self
.logger
.debug("received ns terminated {}".format(params
))
210 if params
.get("autoremove"):
211 self
.engine
.del_item(
212 self
.internal_session
,
214 _id
=params
["nsr_id"],
215 not_send_msg
=msg_to_send
,
218 "ns={} deleted from database".format(params
["nsr_id"])
220 # Check for nslcm notification
221 if isinstance(params
, dict):
222 # Check availability of operationState and command
224 (not params
.get("operationState"))
226 or (not params
.get("operationParams"))
229 "Message can not be used for notification of nslcm"
232 nsd_id
= params
["operationParams"].get("nsdId")
233 ns_instance_id
= params
["operationParams"].get("nsInstanceId")
234 # Any one among nsd_id, ns_instance_id should be present.
235 if not (nsd_id
or ns_instance_id
):
237 "Message can not be used for notification of nslcm"
240 op_state
= params
["operationState"]
243 "command": command
.upper(),
246 subscribers
= self
.nslcm
.get_subscribers(
253 # self.logger.debug("subscribers list: ")
254 # self.logger.debug(subscribers)
256 asyncio
.ensure_future(
257 self
.nslcm
.send_notifications(subscribers
),
261 "Message can not be used for notification of nslcm"
264 if isinstance(params
, dict):
265 vnfd_id
= params
["vnfdId"]
266 vnf_instance_id
= params
["vnfInstanceId"]
267 if command
== "create" or command
== "delete":
270 op_state
= params
["operationState"]
273 "command": command
.upper(),
276 subscribers
= self
.vnflcm
.get_subscribers(
284 asyncio
.ensure_future(
285 self
.vnflcm
.send_notifications(subscribers
),
288 if command
== "terminated" and params
["operationState"] in (
290 "PARTIALLY_COMPLETED",
292 self
.logger
.debug("received nsi terminated {}".format(params
))
293 if params
.get("autoremove"):
294 self
.engine
.del_item(
295 self
.internal_session
,
297 _id
=params
["nsir_id"],
298 not_send_msg
=msg_to_send
,
301 "nsis={} deleted from database".format(params
["nsir_id"])
303 elif topic
== "admin":
304 self
.logger
.debug("received {} {} {}".format(topic
, command
, params
))
305 if command
in ["echo", "ping"]: # ignored commands
307 elif command
== "revoke_token":
309 if isinstance(params
, dict) and "_id" in params
:
310 tid
= params
.get("_id")
311 self
.engine
.authenticator
.tokens_cache
.pop(tid
, None)
313 "token '{}' removed from token_cache".format(tid
)
317 "unrecognized params in command '{} {}': {}".format(
318 topic
, command
, params
322 self
.engine
.authenticator
.tokens_cache
.clear()
323 self
.logger
.debug("token_cache cleared")
326 "unrecognized command '{} {}'".format(topic
, command
)
328 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
329 # but content to be written is stored at msg_to_send
330 for msg
in msg_to_send
:
331 await self
.msg
.aiowrite(*msg
)
332 except (EngineException
, DbException
, MsgException
) as e
:
334 "Error while processing topic={} command={}: {}".format(
338 except Exception as e
:
339 self
.logger
.exception(
340 "Exception while processing topic={} command={}: {}".format(
348 Close all connections
353 self
.db
.db_disconnect()
355 self
.msg
.disconnect()
356 except (DbException
, MsgException
) as e
:
357 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
361 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
365 self
.to_terminate
= True
366 if self
.aiomain_task
:
367 asyncio
.get_event_loop().call_soon_threadsafe(self
.aiomain_task
.cancel
)
368 if self
.aiomain_task_admin
:
369 asyncio
.get_event_loop().call_soon_threadsafe(
370 self
.aiomain_task_admin
.cancel