1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
27 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
28 from osm_common
.dbbase
import DbException
29 from osm_common
.msgbase
import MsgException
30 from osm_nbi
.engine
import EngineException
32 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35 class SubscriptionException(Exception):
37 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
38 self
.http_code
= http_code
39 Exception.__init
__(self
, message
)
42 class SubscriptionThread(threading
.Thread
):
44 def __init__(self
, config
, engine
):
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
50 threading
.Thread
.__init
__(self
)
51 self
.to_terminate
= False
57 self
.logger
= logging
.getLogger("nbi.subscriptions")
58 self
.aiomain_task_admin
= None # asyncio task for receiving admin actions from kafka bus
59 self
.aiomain_task
= None # asyncio task for receiving normal actions from kafka bus
60 self
.internal_session
= { # used for a session to the engine methods
69 async def start_kafka(self
):
70 # timeout_wait_for_kafka = 3*60
72 while not self
.to_terminate
:
74 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
76 # Before subscribe, send dummy messages
77 await self
.msg
.aiowrite("admin", "echo", "dummy message", loop
=self
.loop
)
78 await self
.msg
.aiowrite("ns", "echo", "dummy message", loop
=self
.loop
)
79 await self
.msg
.aiowrite("nsi", "echo", "dummy message", loop
=self
.loop
)
81 self
.logger
.critical("kafka is working again")
83 if not self
.aiomain_task_admin
or self
.aiomain_task_admin
._state
== "FINISHED":
84 await asyncio
.sleep(10, loop
=self
.loop
)
85 self
.logger
.debug("Starting admin subscription task")
86 self
.aiomain_task_admin
= asyncio
.ensure_future(self
.msg
.aioread(("admin",), loop
=self
.loop
,
88 aiocallback
=self
._msg
_callback
),
90 if not self
.aiomain_task
or self
.aiomain_task
._state
== "FINISHED":
91 await asyncio
.sleep(10, loop
=self
.loop
)
92 self
.logger
.debug("Starting non-admin subscription task")
93 self
.aiomain_task
= asyncio
.ensure_future(self
.msg
.aioread(("ns", "nsi"), loop
=self
.loop
,
94 aiocallback
=self
._msg
_callback
),
96 await asyncio
.wait([self
.aiomain_task
, self
.aiomain_task_admin
],
97 timeout
=None, loop
=self
.loop
, return_when
=asyncio
.FIRST_COMPLETED
)
98 except Exception as e
:
102 # logging only first time
103 self
.logger
.critical("Error accessing kafka '{}'. Retrying ...".format(e
))
104 kafka_working
= False
105 await asyncio
.sleep(10, loop
=self
.loop
)
112 self
.loop
= asyncio
.new_event_loop()
115 if self
.config
["database"]["driver"] == "mongo":
116 self
.db
= dbmongo
.DbMongo()
117 self
.db
.db_connect(self
.config
["database"])
118 elif self
.config
["database"]["driver"] == "memory":
119 self
.db
= dbmemory
.DbMemory()
120 self
.db
.db_connect(self
.config
["database"])
122 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
123 self
.config
["database"]["driver"]))
125 config_msg
= self
.config
["message"].copy()
126 config_msg
["loop"] = self
.loop
127 if config_msg
["driver"] == "local":
128 self
.msg
= msglocal
.MsgLocal()
129 self
.msg
.connect(config_msg
)
130 elif config_msg
["driver"] == "kafka":
131 self
.msg
= msgkafka
.MsgKafka()
132 self
.msg
.connect(config_msg
)
134 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
135 config_msg
["driver"]))
137 except (DbException
, MsgException
) as e
:
138 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
140 self
.logger
.debug("Starting")
141 while not self
.to_terminate
:
144 self
.loop
.run_until_complete(asyncio
.ensure_future(self
.start_kafka(), loop
=self
.loop
))
145 # except asyncio.CancelledError:
146 # break # if cancelled it should end, breaking loop
147 except Exception as e
:
148 if not self
.to_terminate
:
149 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
151 self
.logger
.debug("Finishing")
155 async def _msg_callback(self
, topic
, command
, params
):
157 Callback to process a received message from kafka
158 :param topic: topic received
159 :param command: command received
160 :param params: rest of parameters
166 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
167 self
.logger
.debug("received ns terminated {}".format(params
))
168 if params
.get("autoremove"):
169 self
.engine
.del_item(self
.internal_session
, "nsrs", _id
=params
["nsr_id"],
170 not_send_msg
=msg_to_send
)
171 self
.logger
.debug("ns={} deleted from database".format(params
["nsr_id"]))
173 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
174 self
.logger
.debug("received nsi terminated {}".format(params
))
175 if params
.get("autoremove"):
176 self
.engine
.del_item(self
.internal_session
, "nsis", _id
=params
["nsir_id"],
177 not_send_msg
=msg_to_send
)
178 self
.logger
.debug("nsis={} deleted from database".format(params
["nsir_id"]))
179 elif topic
== "admin":
180 self
.logger
.debug("received {} {} {}".format(topic
, command
, params
))
181 if command
in ["echo", "ping"]: # ignored commands
183 elif command
== "revoke_token":
185 if isinstance(params
, dict) and "_id" in params
:
186 tid
= params
.get("_id")
187 self
.engine
.authenticator
.tokens_cache
.pop(tid
, None)
188 self
.logger
.debug("token '{}' removed from token_cache".format(tid
))
190 self
.logger
.debug("unrecognized params in command '{} {}': {}"
191 .format(topic
, command
, params
))
193 self
.engine
.authenticator
.tokens_cache
.clear()
194 self
.logger
.debug("token_cache cleared")
196 self
.logger
.debug("unrecognized command '{} {}'".format(topic
, command
))
197 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
198 # but content to be written is stored at msg_to_send
199 for msg
in msg_to_send
:
200 await self
.msg
.aiowrite(*msg
, loop
=self
.loop
)
201 except (EngineException
, DbException
, MsgException
) as e
:
202 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
203 except Exception as e
:
204 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
209 Close all connections
214 self
.db
.db_disconnect()
216 self
.msg
.disconnect()
217 except (DbException
, MsgException
) as e
:
218 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
222 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
226 self
.to_terminate
= True
227 if self
.aiomain_task
:
228 self
.loop
.call_soon_threadsafe(self
.aiomain_task
.cancel
)