1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
28 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
29 from osm_common
.dbbase
import DbException
30 from osm_common
.msgbase
import MsgException
31 from osm_nbi
.engine
import EngineException
32 from osm_nbi
.notifications
import NsLcmNotification
34 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
37 class SubscriptionException(Exception):
39 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
40 self
.http_code
= http_code
41 Exception.__init
__(self
, message
)
44 class SubscriptionThread(threading
.Thread
):
46 def __init__(self
, config
, engine
):
49 :param config: configuration parameters of database and messaging
50 :param engine: an instance of Engine class, used for deleting instances
52 threading
.Thread
.__init
__(self
)
53 self
.to_terminate
= False
59 self
.logger
= logging
.getLogger("nbi.subscriptions")
60 self
.aiomain_task_admin
= None # asyncio task for receiving admin actions from kafka bus
61 self
.aiomain_task
= None # asyncio task for receiving normal actions from kafka bus
62 self
.internal_session
= { # used for a session to the engine methods
72 async def start_kafka(self
):
73 # timeout_wait_for_kafka = 3*60
75 while not self
.to_terminate
:
77 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
79 # Before subscribe, send dummy messages
80 await self
.msg
.aiowrite("admin", "echo", "dummy message", loop
=self
.loop
)
81 await self
.msg
.aiowrite("ns", "echo", "dummy message", loop
=self
.loop
)
82 await self
.msg
.aiowrite("nsi", "echo", "dummy message", loop
=self
.loop
)
84 self
.logger
.critical("kafka is working again")
86 if not self
.aiomain_task_admin
:
87 await asyncio
.sleep(10, loop
=self
.loop
)
88 self
.logger
.debug("Starting admin subscription task")
89 self
.aiomain_task_admin
= asyncio
.ensure_future(self
.msg
.aioread(("admin",), loop
=self
.loop
,
91 aiocallback
=self
._msg
_callback
),
93 if not self
.aiomain_task
:
94 await asyncio
.sleep(10, loop
=self
.loop
)
95 self
.logger
.debug("Starting non-admin subscription task")
96 self
.aiomain_task
= asyncio
.ensure_future(self
.msg
.aioread(("ns", "nsi"), loop
=self
.loop
,
97 aiocallback
=self
._msg
_callback
),
99 done
, _
= await asyncio
.wait([self
.aiomain_task
, self
.aiomain_task_admin
],
100 timeout
=None, loop
=self
.loop
, return_when
=asyncio
.FIRST_COMPLETED
)
102 if self
.aiomain_task_admin
in done
:
103 exc
= self
.aiomain_task_admin
.exception()
104 self
.logger
.error("admin subscription task exception: {}".format(exc
))
105 self
.aiomain_task_admin
= None
106 if self
.aiomain_task
in done
:
107 exc
= self
.aiomain_task
.exception()
108 self
.logger
.error("non-admin subscription task exception: {}".format(exc
))
109 self
.aiomain_task
= None
110 except asyncio
.CancelledError
:
112 except Exception as e
:
113 if self
.to_terminate
:
116 # logging only first time
117 self
.logger
.critical("Error accessing kafka '{}'. Retrying ...".format(e
))
118 kafka_working
= False
119 await asyncio
.sleep(10, loop
=self
.loop
)
126 self
.loop
= asyncio
.new_event_loop()
129 if self
.config
["database"]["driver"] == "mongo":
130 self
.db
= dbmongo
.DbMongo()
131 self
.db
.db_connect(self
.config
["database"])
132 elif self
.config
["database"]["driver"] == "memory":
133 self
.db
= dbmemory
.DbMemory()
134 self
.db
.db_connect(self
.config
["database"])
136 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
137 self
.config
["database"]["driver"]))
139 config_msg
= self
.config
["message"].copy()
140 config_msg
["loop"] = self
.loop
141 if config_msg
["driver"] == "local":
142 self
.msg
= msglocal
.MsgLocal()
143 self
.msg
.connect(config_msg
)
144 elif config_msg
["driver"] == "kafka":
145 self
.msg
= msgkafka
.MsgKafka()
146 self
.msg
.connect(config_msg
)
148 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
149 config_msg
["driver"]))
150 self
.nslcm
= NsLcmNotification(self
.db
)
151 except (DbException
, MsgException
) as e
:
152 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
154 self
.logger
.debug("Starting")
155 while not self
.to_terminate
:
158 self
.loop
.run_until_complete(asyncio
.ensure_future(self
.start_kafka(), loop
=self
.loop
))
159 # except asyncio.CancelledError:
160 # break # if cancelled it should end, breaking loop
161 except Exception as e
:
162 if not self
.to_terminate
:
163 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
165 self
.logger
.debug("Finishing")
169 async def _msg_callback(self
, topic
, command
, params
):
171 Callback to process a received message from kafka
172 :param topic: topic received
173 :param command: command received
174 :param params: rest of parameters
180 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
181 self
.logger
.debug("received ns terminated {}".format(params
))
182 if params
.get("autoremove"):
183 self
.engine
.del_item(self
.internal_session
, "nsrs", _id
=params
["nsr_id"],
184 not_send_msg
=msg_to_send
)
185 self
.logger
.debug("ns={} deleted from database".format(params
["nsr_id"]))
186 # Check for nslcm notification
187 if isinstance(params
, dict):
188 # Check availability of operationState and command
189 if (not params
.get("operationState")) or (not command
) or (not params
.get("operationParams")):
190 self
.logger
.debug("Message can not be used for notification of nslcm")
192 nsd_id
= params
["operationParams"].get("nsdId")
193 ns_instance_id
= params
["operationParams"].get("nsInstanceId")
194 # Any one among nsd_id, ns_instance_id should be present.
195 if not (nsd_id
or ns_instance_id
):
196 self
.logger
.debug("Message can not be used for notification of nslcm")
198 op_state
= params
["operationState"]
199 event_details
= {"topic": topic
, "command": command
.upper(), "params": params
}
200 subscribers
= self
.nslcm
.get_subscribers(nsd_id
, ns_instance_id
, command
.upper(), op_state
,
202 # self.logger.debug("subscribers list: ")
203 # self.logger.debug(subscribers)
204 asyncio
.ensure_future(self
.nslcm
.send_notifications(subscribers
, loop
=self
.loop
))
206 self
.logger
.debug("Message can not be used for notification of nslcm")
208 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
209 self
.logger
.debug("received nsi terminated {}".format(params
))
210 if params
.get("autoremove"):
211 self
.engine
.del_item(self
.internal_session
, "nsis", _id
=params
["nsir_id"],
212 not_send_msg
=msg_to_send
)
213 self
.logger
.debug("nsis={} deleted from database".format(params
["nsir_id"]))
214 elif topic
== "admin":
215 self
.logger
.debug("received {} {} {}".format(topic
, command
, params
))
216 if command
in ["echo", "ping"]: # ignored commands
218 elif command
== "revoke_token":
220 if isinstance(params
, dict) and "_id" in params
:
221 tid
= params
.get("_id")
222 self
.engine
.authenticator
.tokens_cache
.pop(tid
, None)
223 self
.logger
.debug("token '{}' removed from token_cache".format(tid
))
225 self
.logger
.debug("unrecognized params in command '{} {}': {}"
226 .format(topic
, command
, params
))
228 self
.engine
.authenticator
.tokens_cache
.clear()
229 self
.logger
.debug("token_cache cleared")
231 self
.logger
.debug("unrecognized command '{} {}'".format(topic
, command
))
232 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
233 # but content to be written is stored at msg_to_send
234 for msg
in msg_to_send
:
235 await self
.msg
.aiowrite(*msg
, loop
=self
.loop
)
236 except (EngineException
, DbException
, MsgException
) as e
:
237 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
238 except Exception as e
:
239 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
244 Close all connections
249 self
.db
.db_disconnect()
251 self
.msg
.disconnect()
252 except (DbException
, MsgException
) as e
:
253 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
257 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
261 self
.to_terminate
= True
262 if self
.aiomain_task
:
263 self
.loop
.call_soon_threadsafe(self
.aiomain_task
.cancel
)
264 if self
.aiomain_task_admin
:
265 self
.loop
.call_soon_threadsafe(self
.aiomain_task_admin
.cancel
)