blob: 64fc2bfbd3f0fd7f63279485b4bbdf3b4ad5e429 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module implements a thread that reads from kafka bus implementing all the subscriptions.
It is based on asyncio.
To avoid race conditions it uses same engine class as the main module for database changes
For the moment this module only deletes NS instances when they are terminated with the autoremove flag
"""
import logging
import threading
import asyncio
from http import HTTPStatus
from osm_common import dbmongo, dbmemory, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_common.msgbase import MsgException
from engine import EngineException
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class SubscriptionException(Exception):
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
self.http_code = http_code
Exception.__init__(self, message)
class SubscriptionThread(threading.Thread):
def __init__(self, config, engine):
"""
Constructor of class
:param config: configuration parameters of database and messaging
:param engine: an instance of Engine class, used for deleting instances
"""
threading.Thread.__init__(self)
self.config = config
self.db = None
self.msg = None
self.engine = engine
self.loop = None
self.logger = logging.getLogger("nbi.subscriptions")
self.aiomain_task = None # asyncio task for receiving kafka bus
self.internal_session = { # used for a session to the engine methods
"_id": "subscription",
"id": "subscription",
"project_id": "admin",
"admin": True
}
def run(self):
"""
Start of the thread
:return: None
"""
self.loop = asyncio.new_event_loop()
try:
if not self.db:
if self.config["database"]["driver"] == "mongo":
self.db = dbmongo.DbMongo()
self.db.db_connect(self.config["database"])
elif self.config["database"]["driver"] == "memory":
self.db = dbmemory.DbMemory()
self.db.db_connect(self.config["database"])
else:
raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
self.config["database"]["driver"]))
if not self.msg:
config_msg = self.config["message"].copy()
config_msg["loop"] = self.loop
if config_msg["driver"] == "local":
self.msg = msglocal.MsgLocal()
self.msg.connect(config_msg)
elif config_msg["driver"] == "kafka":
self.msg = msgkafka.MsgKafka()
self.msg.connect(config_msg)
else:
raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
config_msg["driver"]))
except (DbException, MsgException) as e:
raise SubscriptionException(str(e), http_code=e.http_code)
self.logger.debug("Starting")
while True:
try:
self.aiomain_task = asyncio.ensure_future(self.msg.aioread("ns", loop=self.loop,
callback=self._msg_callback),
loop=self.loop)
self.loop.run_until_complete(self.aiomain_task)
except asyncio.CancelledError:
break # if cancelled it should end, breaking loop
except Exception as e:
self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
self.logger.debug("Finishing")
self._stop()
self.loop.close()
def _msg_callback(self, topic, command, params):
"""
Callback to process a received message from kafka
:param topic: topic received
:param command: command received
:param params: rest of parameters
:return: None
"""
try:
if topic == "ns":
if command == "terminated":
self.logger.debug("received ns terminated {}".format(params))
if params.get("autoremove"):
self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
return
except (EngineException, DbException, MsgException) as e:
self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
except Exception as e:
self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
exc_info=True)
def _stop(self):
"""
Close all connections
:return: None
"""
try:
if self.db:
self.db.db_disconnect()
if self.msg:
self.msg.disconnect()
except (DbException, MsgException) as e:
raise SubscriptionException(str(e), http_code=e.http_code)
def terminate(self):
"""
This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
but not immediately.
:return: None
"""
self.loop.call_soon_threadsafe(self.aiomain_task.cancel)