1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
27 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
28 from osm_common
.dbbase
import DbException
29 from osm_common
.msgbase
import MsgException
30 from osm_nbi
.engine
import EngineException
32 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35 class SubscriptionException(Exception):
37 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
38 self
.http_code
= http_code
39 Exception.__init
__(self
, message
)
42 class SubscriptionThread(threading
.Thread
):
44 def __init__(self
, config
, engine
):
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
50 threading
.Thread
.__init
__(self
)
51 self
.to_terminate
= False
57 self
.logger
= logging
.getLogger("nbi.subscriptions")
58 self
.aiomain_task_admin
= None # asyncio task for receiving admin actions from kafka bus
59 self
.aiomain_task
= None # asyncio task for receiving normal actions from kafka bus
60 self
.internal_session
= { # used for a session to the engine methods
69 async def start_kafka(self
):
70 # timeout_wait_for_kafka = 3*60
72 while not self
.to_terminate
:
74 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
76 # Before subscribe, send dummy messages
77 await self
.msg
.aiowrite("admin", "echo", "dummy message", loop
=self
.loop
)
78 await self
.msg
.aiowrite("ns", "echo", "dummy message", loop
=self
.loop
)
79 await self
.msg
.aiowrite("nsi", "echo", "dummy message", loop
=self
.loop
)
81 self
.logger
.critical("kafka is working again")
83 if not self
.aiomain_task_admin
:
84 await asyncio
.sleep(10, loop
=self
.loop
)
85 self
.logger
.debug("Starting admin subscription task")
86 self
.aiomain_task_admin
= asyncio
.ensure_future(self
.msg
.aioread(("admin",), loop
=self
.loop
,
88 aiocallback
=self
._msg
_callback
),
90 if not self
.aiomain_task
:
91 await asyncio
.sleep(10, loop
=self
.loop
)
92 self
.logger
.debug("Starting non-admin subscription task")
93 self
.aiomain_task
= asyncio
.ensure_future(self
.msg
.aioread(("ns", "nsi"), loop
=self
.loop
,
94 aiocallback
=self
._msg
_callback
),
96 done
, _
= await asyncio
.wait([self
.aiomain_task
, self
.aiomain_task_admin
],
97 timeout
=None, loop
=self
.loop
, return_when
=asyncio
.FIRST_COMPLETED
)
99 if self
.aiomain_task_admin
in done
:
100 exc
= self
.aiomain_task_admin
.exception()
101 self
.logger
.error("admin subscription task exception: {}".format(exc
))
102 self
.aiomain_task_admin
= None
103 if self
.aiomain_task
in done
:
104 exc
= self
.aiomain_task
.exception()
105 self
.logger
.error("non-admin subscription task exception: {}".format(exc
))
106 self
.aiomain_task
= None
107 except asyncio
.CancelledError
:
109 except Exception as e
:
110 if self
.to_terminate
:
113 # logging only first time
114 self
.logger
.critical("Error accessing kafka '{}'. Retrying ...".format(e
))
115 kafka_working
= False
116 await asyncio
.sleep(10, loop
=self
.loop
)
123 self
.loop
= asyncio
.new_event_loop()
126 if self
.config
["database"]["driver"] == "mongo":
127 self
.db
= dbmongo
.DbMongo()
128 self
.db
.db_connect(self
.config
["database"])
129 elif self
.config
["database"]["driver"] == "memory":
130 self
.db
= dbmemory
.DbMemory()
131 self
.db
.db_connect(self
.config
["database"])
133 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
134 self
.config
["database"]["driver"]))
136 config_msg
= self
.config
["message"].copy()
137 config_msg
["loop"] = self
.loop
138 if config_msg
["driver"] == "local":
139 self
.msg
= msglocal
.MsgLocal()
140 self
.msg
.connect(config_msg
)
141 elif config_msg
["driver"] == "kafka":
142 self
.msg
= msgkafka
.MsgKafka()
143 self
.msg
.connect(config_msg
)
145 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
146 config_msg
["driver"]))
148 except (DbException
, MsgException
) as e
:
149 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
151 self
.logger
.debug("Starting")
152 while not self
.to_terminate
:
155 self
.loop
.run_until_complete(asyncio
.ensure_future(self
.start_kafka(), loop
=self
.loop
))
156 # except asyncio.CancelledError:
157 # break # if cancelled it should end, breaking loop
158 except Exception as e
:
159 if not self
.to_terminate
:
160 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
162 self
.logger
.debug("Finishing")
166 async def _msg_callback(self
, topic
, command
, params
):
168 Callback to process a received message from kafka
169 :param topic: topic received
170 :param command: command received
171 :param params: rest of parameters
177 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
178 self
.logger
.debug("received ns terminated {}".format(params
))
179 if params
.get("autoremove"):
180 self
.engine
.del_item(self
.internal_session
, "nsrs", _id
=params
["nsr_id"],
181 not_send_msg
=msg_to_send
)
182 self
.logger
.debug("ns={} deleted from database".format(params
["nsr_id"]))
184 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
185 self
.logger
.debug("received nsi terminated {}".format(params
))
186 if params
.get("autoremove"):
187 self
.engine
.del_item(self
.internal_session
, "nsis", _id
=params
["nsir_id"],
188 not_send_msg
=msg_to_send
)
189 self
.logger
.debug("nsis={} deleted from database".format(params
["nsir_id"]))
190 elif topic
== "admin":
191 self
.logger
.debug("received {} {} {}".format(topic
, command
, params
))
192 if command
in ["echo", "ping"]: # ignored commands
194 elif command
== "revoke_token":
196 if isinstance(params
, dict) and "_id" in params
:
197 tid
= params
.get("_id")
198 self
.engine
.authenticator
.tokens_cache
.pop(tid
, None)
199 self
.logger
.debug("token '{}' removed from token_cache".format(tid
))
201 self
.logger
.debug("unrecognized params in command '{} {}': {}"
202 .format(topic
, command
, params
))
204 self
.engine
.authenticator
.tokens_cache
.clear()
205 self
.logger
.debug("token_cache cleared")
207 self
.logger
.debug("unrecognized command '{} {}'".format(topic
, command
))
208 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
209 # but content to be written is stored at msg_to_send
210 for msg
in msg_to_send
:
211 await self
.msg
.aiowrite(*msg
, loop
=self
.loop
)
212 except (EngineException
, DbException
, MsgException
) as e
:
213 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
214 except Exception as e
:
215 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
220 Close all connections
225 self
.db
.db_disconnect()
227 self
.msg
.disconnect()
228 except (DbException
, MsgException
) as e
:
229 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
233 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
237 self
.to_terminate
= True
238 if self
.aiomain_task
:
239 self
.loop
.call_soon_threadsafe(self
.aiomain_task
.cancel
)
240 if self
.aiomain_task_admin
:
241 self
.loop
.call_soon_threadsafe(self
.aiomain_task_admin
.cancel
)