1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
26 from http
import HTTPStatus
27 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
28 from osm_common
.dbbase
import DbException
29 from osm_common
.msgbase
import MsgException
30 from engine
import EngineException
32 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
35 class SubscriptionException(Exception):
37 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
38 self
.http_code
= http_code
39 Exception.__init
__(self
, message
)
42 class SubscriptionThread(threading
.Thread
):
44 def __init__(self
, config
, engine
):
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
50 threading
.Thread
.__init
__(self
)
51 self
.to_terminate
= False
57 self
.logger
= logging
.getLogger("nbi.subscriptions")
58 self
.aiomain_task
= None # asyncio task for receiving kafka bus
59 self
.internal_session
= { # used for a session to the engine methods
60 "_id": "subscription",
62 "project_id": "admin",
71 self
.loop
= asyncio
.new_event_loop()
74 if self
.config
["database"]["driver"] == "mongo":
75 self
.db
= dbmongo
.DbMongo()
76 self
.db
.db_connect(self
.config
["database"])
77 elif self
.config
["database"]["driver"] == "memory":
78 self
.db
= dbmemory
.DbMemory()
79 self
.db
.db_connect(self
.config
["database"])
81 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
82 self
.config
["database"]["driver"]))
84 config_msg
= self
.config
["message"].copy()
85 config_msg
["loop"] = self
.loop
86 if config_msg
["driver"] == "local":
87 self
.msg
= msglocal
.MsgLocal()
88 self
.msg
.connect(config_msg
)
89 elif config_msg
["driver"] == "kafka":
90 self
.msg
= msgkafka
.MsgKafka()
91 self
.msg
.connect(config_msg
)
93 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
94 config_msg
["driver"]))
96 except (DbException
, MsgException
) as e
:
97 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
99 self
.logger
.debug("Starting")
100 while not self
.to_terminate
:
102 self
.aiomain_task
= asyncio
.ensure_future(self
.msg
.aioread(("ns", "nsi"), loop
=self
.loop
,
103 callback
=self
._msg
_callback
),
105 self
.loop
.run_until_complete(self
.aiomain_task
)
106 # except asyncio.CancelledError:
107 # break # if cancelled it should end, breaking loop
108 except Exception as e
:
109 if not self
.to_terminate
:
110 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
112 self
.logger
.debug("Finishing")
116 def _msg_callback(self
, topic
, command
, params
):
118 Callback to process a received message from kafka
119 :param topic: topic received
120 :param command: command received
121 :param params: rest of parameters
126 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
127 self
.logger
.debug("received ns terminated {}".format(params
))
128 if params
.get("autoremove"):
129 self
.engine
.del_item(self
.internal_session
, "nsrs", _id
=params
["nsr_id"])
130 self
.logger
.debug("ns={} deleted from database".format(params
["nsr_id"]))
133 if command
== "terminated" and params
["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
134 self
.logger
.debug("received nsi terminated {}".format(params
))
135 if params
.get("autoremove"):
136 self
.engine
.del_item(self
.internal_session
, "nsis", _id
=params
["nsir_id"])
137 self
.logger
.debug("nsis={} deleted from database".format(params
["nsir_id"]))
139 except (EngineException
, DbException
, MsgException
) as e
:
140 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
141 except Exception as e
:
142 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
147 Close all connections
152 self
.db
.db_disconnect()
154 self
.msg
.disconnect()
155 except (DbException
, MsgException
) as e
:
156 raise SubscriptionException(str(e
), http_code
=e
.http_code
)
160 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
164 self
.to_terminate
= True
165 self
.loop
.call_soon_threadsafe(self
.aiomain_task
.cancel
)