blob: 03bb92b65f4f084b001e18389d65af1b5dc26d4b [file] [log] [blame]
tierno932499c2019-01-28 17:28:10 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus implementing all the subscriptions.
18It is based on asyncio.
19To avoid race conditions it uses same engine class as the main module for database changes
20For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21"""
22
23import logging
24import threading
25import asyncio
26from http import HTTPStatus
27from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28from osm_common.dbbase import DbException
29from osm_common.msgbase import MsgException
tierno23acf402019-08-28 13:36:34 +000030from osm_nbi.engine import EngineException
tierno932499c2019-01-28 17:28:10 +000031
32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35class SubscriptionException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
42class SubscriptionThread(threading.Thread):
43
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
tierno65ca36d2019-02-12 19:27:52 +010051 self.to_terminate = False
tierno932499c2019-01-28 17:28:10 +000052 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
58 self.aiomain_task = None # asyncio task for receiving kafka bus
59 self.internal_session = { # used for a session to the engine methods
tierno86e916a2019-05-29 21:39:37 +000060 "project_id": (),
61 "set_project": (),
62 "admin": True,
63 "force": False,
64 "public": None,
65 "method": "delete",
tierno932499c2019-01-28 17:28:10 +000066 }
67
tiernoee270722019-06-07 14:44:09 +000068 async def start_kafka(self):
69 # timeout_wait_for_kafka = 3*60
70 kafka_working = True
71 while not self.to_terminate:
72 try:
73 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
74 # created.
75 # Before subscribe, send dummy messages
76 await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
77 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
78 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
79 if not kafka_working:
80 self.logger.critical("kafka is working again")
81 kafka_working = True
82 await asyncio.sleep(10, loop=self.loop)
83 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
tiernobee3bad2019-12-05 12:26:01 +000084 aiocallback=self._msg_callback),
tiernoee270722019-06-07 14:44:09 +000085 loop=self.loop)
86 await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
87 except Exception as e:
88 if self.to_terminate:
89 return
90 if kafka_working:
91 # logging only first time
92 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
93 kafka_working = False
94 await asyncio.sleep(10, loop=self.loop)
95
tierno932499c2019-01-28 17:28:10 +000096 def run(self):
97 """
98 Start of the thread
99 :return: None
100 """
101 self.loop = asyncio.new_event_loop()
102 try:
103 if not self.db:
104 if self.config["database"]["driver"] == "mongo":
105 self.db = dbmongo.DbMongo()
106 self.db.db_connect(self.config["database"])
107 elif self.config["database"]["driver"] == "memory":
108 self.db = dbmemory.DbMemory()
109 self.db.db_connect(self.config["database"])
110 else:
111 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
112 self.config["database"]["driver"]))
113 if not self.msg:
114 config_msg = self.config["message"].copy()
115 config_msg["loop"] = self.loop
116 if config_msg["driver"] == "local":
117 self.msg = msglocal.MsgLocal()
118 self.msg.connect(config_msg)
119 elif config_msg["driver"] == "kafka":
120 self.msg = msgkafka.MsgKafka()
121 self.msg.connect(config_msg)
122 else:
123 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
124 config_msg["driver"]))
125
126 except (DbException, MsgException) as e:
127 raise SubscriptionException(str(e), http_code=e.http_code)
128
129 self.logger.debug("Starting")
tierno65ca36d2019-02-12 19:27:52 +0100130 while not self.to_terminate:
tierno932499c2019-01-28 17:28:10 +0000131 try:
tiernoee270722019-06-07 14:44:09 +0000132
133 self.loop.run_until_complete(asyncio.ensure_future(self.start_kafka(), loop=self.loop))
tierno65ca36d2019-02-12 19:27:52 +0100134 # except asyncio.CancelledError:
135 # break # if cancelled it should end, breaking loop
tierno932499c2019-01-28 17:28:10 +0000136 except Exception as e:
tierno65ca36d2019-02-12 19:27:52 +0100137 if not self.to_terminate:
138 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
tierno932499c2019-01-28 17:28:10 +0000139
140 self.logger.debug("Finishing")
141 self._stop()
142 self.loop.close()
143
tiernobee3bad2019-12-05 12:26:01 +0000144 async def _msg_callback(self, topic, command, params):
tierno932499c2019-01-28 17:28:10 +0000145 """
146 Callback to process a received message from kafka
147 :param topic: topic received
148 :param command: command received
149 :param params: rest of parameters
150 :return: None
151 """
tiernobee3bad2019-12-05 12:26:01 +0000152 msg_to_send = []
tierno932499c2019-01-28 17:28:10 +0000153 try:
154 if topic == "ns":
Felipe Vicens09e65422019-01-22 15:06:46 +0100155 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
tierno932499c2019-01-28 17:28:10 +0000156 self.logger.debug("received ns terminated {}".format(params))
157 if params.get("autoremove"):
tiernobee3bad2019-12-05 12:26:01 +0000158 self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
159 not_send_msg=msg_to_send)
tierno932499c2019-01-28 17:28:10 +0000160 self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
tiernobee3bad2019-12-05 12:26:01 +0000161 elif topic == "nsi":
Felipe Vicens09e65422019-01-22 15:06:46 +0100162 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
163 self.logger.debug("received nsi terminated {}".format(params))
164 if params.get("autoremove"):
tiernobee3bad2019-12-05 12:26:01 +0000165 self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
166 not_send_msg=msg_to_send)
Felipe Vicens09e65422019-01-22 15:06:46 +0100167 self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
tiernobee3bad2019-12-05 12:26:01 +0000168
169 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
170 # but content to be written is stored at msg_to_send
171 for msg in msg_to_send:
172 await self.msg.aiowrite(*msg, loop=self.loop)
tierno932499c2019-01-28 17:28:10 +0000173 except (EngineException, DbException, MsgException) as e:
174 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
175 except Exception as e:
176 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
177 exc_info=True)
178
179 def _stop(self):
180 """
181 Close all connections
182 :return: None
183 """
184 try:
185 if self.db:
186 self.db.db_disconnect()
187 if self.msg:
188 self.msg.disconnect()
189 except (DbException, MsgException) as e:
190 raise SubscriptionException(str(e), http_code=e.http_code)
191
192 def terminate(self):
193 """
194 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
195 but not immediately.
196 :return: None
197 """
tierno65ca36d2019-02-12 19:27:52 +0100198 self.to_terminate = True
tiernoee270722019-06-07 14:44:09 +0000199 if self.aiomain_task:
200 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)