blob: 393918c3e210a7c575295cfb4fa94de739d732b5 [file] [log] [blame]
tierno932499c2019-01-28 17:28:10 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module implements a thread that reads from kafka bus implementing all the subscriptions.
18It is based on asyncio.
19To avoid race conditions it uses same engine class as the main module for database changes
20For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21"""
22
23import logging
24import threading
25import asyncio
26from http import HTTPStatus
27from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28from osm_common.dbbase import DbException
29from osm_common.msgbase import MsgException
tierno23acf402019-08-28 13:36:34 +000030from osm_nbi.engine import EngineException
tierno932499c2019-01-28 17:28:10 +000031
32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35class SubscriptionException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
42class SubscriptionThread(threading.Thread):
43
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
tierno65ca36d2019-02-12 19:27:52 +010051 self.to_terminate = False
tierno932499c2019-01-28 17:28:10 +000052 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
delacruzramoad682a52019-12-10 16:26:34 +010058 self.aiomain_task_admin = None # asyncio task for receiving admin actions from kafka bus
59 self.aiomain_task = None # asyncio task for receiving normal actions from kafka bus
tierno932499c2019-01-28 17:28:10 +000060 self.internal_session = { # used for a session to the engine methods
tierno86e916a2019-05-29 21:39:37 +000061 "project_id": (),
62 "set_project": (),
63 "admin": True,
64 "force": False,
65 "public": None,
66 "method": "delete",
tierno932499c2019-01-28 17:28:10 +000067 }
68
tiernoee270722019-06-07 14:44:09 +000069 async def start_kafka(self):
70 # timeout_wait_for_kafka = 3*60
71 kafka_working = True
72 while not self.to_terminate:
73 try:
74 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been
75 # created.
76 # Before subscribe, send dummy messages
77 await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
78 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
79 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
80 if not kafka_working:
81 self.logger.critical("kafka is working again")
82 kafka_working = True
delacruzramoad682a52019-12-10 16:26:34 +010083 if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED":
84 await asyncio.sleep(10, loop=self.loop)
85 self.logger.debug("Starting admin subscription task")
86 self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
87 group_id=False,
88 aiocallback=self._msg_callback),
89 loop=self.loop)
90 if not self.aiomain_task or self.aiomain_task._state == "FINISHED":
91 await asyncio.sleep(10, loop=self.loop)
92 self.logger.debug("Starting non-admin subscription task")
93 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
94 aiocallback=self._msg_callback),
95 loop=self.loop)
96 await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
97 timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
tiernoee270722019-06-07 14:44:09 +000098 except Exception as e:
99 if self.to_terminate:
100 return
101 if kafka_working:
102 # logging only first time
103 self.logger.critical("Error accessing kafka '{}'. Retrying ...".format(e))
104 kafka_working = False
105 await asyncio.sleep(10, loop=self.loop)
106
tierno932499c2019-01-28 17:28:10 +0000107 def run(self):
108 """
109 Start of the thread
110 :return: None
111 """
112 self.loop = asyncio.new_event_loop()
113 try:
114 if not self.db:
115 if self.config["database"]["driver"] == "mongo":
116 self.db = dbmongo.DbMongo()
117 self.db.db_connect(self.config["database"])
118 elif self.config["database"]["driver"] == "memory":
119 self.db = dbmemory.DbMemory()
120 self.db.db_connect(self.config["database"])
121 else:
122 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
123 self.config["database"]["driver"]))
124 if not self.msg:
125 config_msg = self.config["message"].copy()
126 config_msg["loop"] = self.loop
127 if config_msg["driver"] == "local":
128 self.msg = msglocal.MsgLocal()
129 self.msg.connect(config_msg)
130 elif config_msg["driver"] == "kafka":
131 self.msg = msgkafka.MsgKafka()
132 self.msg.connect(config_msg)
133 else:
134 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
135 config_msg["driver"]))
136
137 except (DbException, MsgException) as e:
138 raise SubscriptionException(str(e), http_code=e.http_code)
139
140 self.logger.debug("Starting")
tierno65ca36d2019-02-12 19:27:52 +0100141 while not self.to_terminate:
tierno932499c2019-01-28 17:28:10 +0000142 try:
tiernoee270722019-06-07 14:44:09 +0000143
144 self.loop.run_until_complete(asyncio.ensure_future(self.start_kafka(), loop=self.loop))
tierno65ca36d2019-02-12 19:27:52 +0100145 # except asyncio.CancelledError:
146 # break # if cancelled it should end, breaking loop
tierno932499c2019-01-28 17:28:10 +0000147 except Exception as e:
tierno65ca36d2019-02-12 19:27:52 +0100148 if not self.to_terminate:
149 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
tierno932499c2019-01-28 17:28:10 +0000150
151 self.logger.debug("Finishing")
152 self._stop()
153 self.loop.close()
154
tiernobee3bad2019-12-05 12:26:01 +0000155 async def _msg_callback(self, topic, command, params):
tierno932499c2019-01-28 17:28:10 +0000156 """
157 Callback to process a received message from kafka
158 :param topic: topic received
159 :param command: command received
160 :param params: rest of parameters
161 :return: None
162 """
tiernobee3bad2019-12-05 12:26:01 +0000163 msg_to_send = []
tierno932499c2019-01-28 17:28:10 +0000164 try:
165 if topic == "ns":
Felipe Vicens09e65422019-01-22 15:06:46 +0100166 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
tierno932499c2019-01-28 17:28:10 +0000167 self.logger.debug("received ns terminated {}".format(params))
168 if params.get("autoremove"):
tiernobee3bad2019-12-05 12:26:01 +0000169 self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
170 not_send_msg=msg_to_send)
tierno932499c2019-01-28 17:28:10 +0000171 self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
tiernobee3bad2019-12-05 12:26:01 +0000172 elif topic == "nsi":
Felipe Vicens09e65422019-01-22 15:06:46 +0100173 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
174 self.logger.debug("received nsi terminated {}".format(params))
175 if params.get("autoremove"):
tiernobee3bad2019-12-05 12:26:01 +0000176 self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
177 not_send_msg=msg_to_send)
Felipe Vicens09e65422019-01-22 15:06:46 +0100178 self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
delacruzramoad682a52019-12-10 16:26:34 +0100179 elif topic == "admin":
180 self.logger.debug("received {} {} {}".format(topic, command, params))
181 if command in ["echo", "ping"]: # ignored commands
182 pass
183 elif command == "revoke_token":
184 if params:
185 if isinstance(params, dict) and "_id" in params:
186 tid = params.get("_id")
187 self.engine.authenticator.tokens_cache.pop(tid, None)
188 self.logger.debug("token '{}' removed from token_cache".format(tid))
189 else:
190 self.logger.debug("unrecognized params in command '{} {}': {}"
191 .format(topic, command, params))
192 else:
193 self.engine.authenticator.tokens_cache.clear()
194 self.logger.debug("token_cache cleared")
195 else:
196 self.logger.debug("unrecognized command '{} {}'".format(topic, command))
197 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
tiernobee3bad2019-12-05 12:26:01 +0000198 # but content to be written is stored at msg_to_send
199 for msg in msg_to_send:
200 await self.msg.aiowrite(*msg, loop=self.loop)
tierno932499c2019-01-28 17:28:10 +0000201 except (EngineException, DbException, MsgException) as e:
202 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
203 except Exception as e:
204 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
205 exc_info=True)
206
207 def _stop(self):
208 """
209 Close all connections
210 :return: None
211 """
212 try:
213 if self.db:
214 self.db.db_disconnect()
215 if self.msg:
216 self.msg.disconnect()
217 except (DbException, MsgException) as e:
218 raise SubscriptionException(str(e), http_code=e.http_code)
219
220 def terminate(self):
221 """
222 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
223 but not immediately.
224 :return: None
225 """
tierno65ca36d2019-02-12 19:27:52 +0100226 self.to_terminate = True
tiernoee270722019-06-07 14:44:09 +0000227 if self.aiomain_task:
228 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)