1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
27 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
28 from osm_common
.dbbase
import DbException
29 from osm_common
.msgbase
import MsgException
30 from engine
import EngineException
32 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35 class SubscriptionException(Exception):
37 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
38 self
.http_code
= http_code
39 Exception.__init
__(self
, message
)
42 class SubscriptionThread(threading
.Thread
):
44 def __init__(self
, config
, engine
):
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
50 threading
.Thread
.__init
__(self
)
51 self
.to_terminate
= False
57 self
.logger
= logging
.getLogger("nbi.subscriptions")
58 self
.aiomain_task
= None # asyncio task for receiving kafka bus
59 self
.internal_session
= { # used for a session to the engine methods
68 async def start_kafka(self
):
69 # timeout_wait_for_kafka = 3*60
71 while not self
.to_terminate
:
73 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
75 # Before subscribe, send dummy messages
76 await self
.msg
.aiowrite("admin", "echo", "dummy message", loop
=self
.loop
)
77 await self
.msg
.aiowrite("ns", "echo", "dummy message", loop
=self
.loop
)
78 await self
.msg
.aiowrite("nsi", "echo", "dummy message", loop
=self
.loop
)
80 self
.logger
.critical("kafka is working again")
82 await asyncio
.sleep(10, loop
=self
.loop
)
83 self
.aiomain_task
= asyncio
.ensure_future(self
.msg
.aioread(("ns", "nsi"), loop
=self
.loop
,
84 callback
=self
._msg
_callback
),
86 await asyncio
.wait_for(self
.aiomain_task
, timeout
=None, loop
=self
.loop
)
87 except Exception as e
:
91 # logging only first time
92 self
.logger
.critical("Error accessing kafka '{}'. Retrying ...".format(e
))
94 await asyncio
.sleep(10, loop
=self
.loop
)
101 self
.loop
= asyncio
.new_event_loop()
104 if self
.config
["database"]["driver"] == "mongo":
105 self
.db
= dbmongo
.DbMongo()
106 self
.db
.db_connect(self
.config
["database"])
107 elif self
.config
["database"]["driver"] == "memory":
108 self
.db
= dbmemory
.DbMemory()
109 self
.db
.db_connect(self
.config
["database"])
111 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
112 self
.config
["database"]["driver"]))
114 config_msg
= self
.config
["message"].copy()
115 config_msg
["loop"] = self
.loop
116 if config_msg
["driver"] == "local":
117 self
.msg
= msglocal
.MsgLocal()
118 self
.msg
.connect(config_msg
)
119 elif config_msg
["driver"] == "kafka":
120 self
.msg
= msgkafka
.MsgKafka()
121 self
.msg
.connect(config_msg
)
123 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
124 config_msg
["driver"]))
126 except (DbException
, MsgException
) as e
:
127 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
129 self
.logger
.debug("Starting")
130 while not self
.to_terminate
:
133 self
.loop
.run_until_complete(asyncio
.ensure_future(self
.start_kafka(), loop
=self
.loop
))
134 # except asyncio.CancelledError:
135 # break # if cancelled it should end, breaking loop
136 except Exception as e
:
137 if not self
.to_terminate
:
138 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
140 self
.logger
.debug("Finishing")
144 def _msg_callback(self
, topic
, command
, params
):
146 Callback to process a received message from kafka
147 :param topic: topic received
148 :param command: command received
149 :param params: rest of parameters
154 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
155 self
.logger
.debug("received ns terminated {}".format(params
))
156 if params
.get("autoremove"):
157 self
.engine
.del_item(self
.internal_session
, "nsrs", _id
=params
["nsr_id"])
158 self
.logger
.debug("ns={} deleted from database".format(params
["nsr_id"]))
161 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
162 self
.logger
.debug("received nsi terminated {}".format(params
))
163 if params
.get("autoremove"):
164 self
.engine
.del_item(self
.internal_session
, "nsis", _id
=params
["nsir_id"])
165 self
.logger
.debug("nsis={} deleted from database".format(params
["nsir_id"]))
167 except (EngineException
, DbException
, MsgException
) as e
:
168 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
169 except Exception as e
:
170 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
175 Close all connections
180 self
.db
.db_disconnect()
182 self
.msg
.disconnect()
183 except (DbException
, MsgException
) as e
:
184 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
188 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
192 self
.to_terminate
= True
193 if self
.aiomain_task
:
194 self
.loop
.call_soon_threadsafe(self
.aiomain_task
.cancel
)