blob: 557bf800f0915d4fa512c39e92396750d7bf7ae9 [file] [log] [blame]
tierno932499c2019-01-28 17:28:10 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus implementing all the subscriptions.
18It is based on asyncio.
19To avoid race conditions it uses same engine class as the main module for database changes
20For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21"""
22
23import logging
24import threading
25import asyncio
26from http import HTTPStatus
27from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28from osm_common.dbbase import DbException
29from osm_common.msgbase import MsgException
30from engine import EngineException
31
32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35class SubscriptionException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
42class SubscriptionThread(threading.Thread):
43
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
tierno65ca36d2019-02-12 19:27:52 +010051 self.to_terminate = False
tierno932499c2019-01-28 17:28:10 +000052 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
58 self.aiomain_task = None # asyncio task for receiving kafka bus
59 self.internal_session = { # used for a session to the engine methods
tierno86e916a2019-05-29 21:39:37 +000060 "project_id": (),
61 "set_project": (),
62 "admin": True,
63 "force": False,
64 "public": None,
65 "method": "delete",
tierno932499c2019-01-28 17:28:10 +000066 }
67
68 def run(self):
69 """
70 Start of the thread
71 :return: None
72 """
73 self.loop = asyncio.new_event_loop()
74 try:
75 if not self.db:
76 if self.config["database"]["driver"] == "mongo":
77 self.db = dbmongo.DbMongo()
78 self.db.db_connect(self.config["database"])
79 elif self.config["database"]["driver"] == "memory":
80 self.db = dbmemory.DbMemory()
81 self.db.db_connect(self.config["database"])
82 else:
83 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
84 self.config["database"]["driver"]))
85 if not self.msg:
86 config_msg = self.config["message"].copy()
87 config_msg["loop"] = self.loop
88 if config_msg["driver"] == "local":
89 self.msg = msglocal.MsgLocal()
90 self.msg.connect(config_msg)
91 elif config_msg["driver"] == "kafka":
92 self.msg = msgkafka.MsgKafka()
93 self.msg.connect(config_msg)
94 else:
95 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
96 config_msg["driver"]))
97
98 except (DbException, MsgException) as e:
99 raise SubscriptionException(str(e), http_code=e.http_code)
100
101 self.logger.debug("Starting")
tierno65ca36d2019-02-12 19:27:52 +0100102 while not self.to_terminate:
tierno932499c2019-01-28 17:28:10 +0000103 try:
Felipe Vicens09e65422019-01-22 15:06:46 +0100104 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
tierno932499c2019-01-28 17:28:10 +0000105 callback=self._msg_callback),
106 loop=self.loop)
107 self.loop.run_until_complete(self.aiomain_task)
tierno65ca36d2019-02-12 19:27:52 +0100108 # except asyncio.CancelledError:
109 # break # if cancelled it should end, breaking loop
tierno932499c2019-01-28 17:28:10 +0000110 except Exception as e:
tierno65ca36d2019-02-12 19:27:52 +0100111 if not self.to_terminate:
112 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
tierno932499c2019-01-28 17:28:10 +0000113
114 self.logger.debug("Finishing")
115 self._stop()
116 self.loop.close()
117
118 def _msg_callback(self, topic, command, params):
119 """
120 Callback to process a received message from kafka
121 :param topic: topic received
122 :param command: command received
123 :param params: rest of parameters
124 :return: None
125 """
126 try:
127 if topic == "ns":
Felipe Vicens09e65422019-01-22 15:06:46 +0100128 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
tierno932499c2019-01-28 17:28:10 +0000129 self.logger.debug("received ns terminated {}".format(params))
130 if params.get("autoremove"):
131 self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
132 self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
133 return
Felipe Vicens09e65422019-01-22 15:06:46 +0100134 if topic == "nsi":
135 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
136 self.logger.debug("received nsi terminated {}".format(params))
137 if params.get("autoremove"):
138 self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"])
139 self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
140 return
tierno932499c2019-01-28 17:28:10 +0000141 except (EngineException, DbException, MsgException) as e:
142 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
143 except Exception as e:
144 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
145 exc_info=True)
146
147 def _stop(self):
148 """
149 Close all connections
150 :return: None
151 """
152 try:
153 if self.db:
154 self.db.db_disconnect()
155 if self.msg:
156 self.msg.disconnect()
157 except (DbException, MsgException) as e:
158 raise SubscriptionException(str(e), http_code=e.http_code)
159
160 def terminate(self):
161 """
162 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
163 but not immediately.
164 :return: None
165 """
tierno65ca36d2019-02-12 19:27:52 +0100166 self.to_terminate = True
tierno932499c2019-01-28 17:28:10 +0000167 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)