fixing tests for additionalParams and NSI without osm running
[osm/NBI.git] / osm_nbi / subscriptions.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 This module implements a thread that reads from kafka bus implementing all the subscriptions.
18 It is based on asyncio.
19 To avoid race conditions it uses same engine class as the main module for database changes
20 For the moment this module only deletes NS instances when they are terminated with the autoremove flag
21 """
22
23 import logging
24 import threading
25 import asyncio
26 from http import HTTPStatus
27 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
28 from osm_common.dbbase import DbException
29 from osm_common.msgbase import MsgException
30 from engine import EngineException
31
32 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
33
34
35 class SubscriptionException(Exception):
36
37 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
38 self.http_code = http_code
39 Exception.__init__(self, message)
40
41
42 class SubscriptionThread(threading.Thread):
43
44 def __init__(self, config, engine):
45 """
46 Constructor of class
47 :param config: configuration parameters of database and messaging
48 :param engine: an instance of Engine class, used for deleting instances
49 """
50 threading.Thread.__init__(self)
51
52 self.config = config
53 self.db = None
54 self.msg = None
55 self.engine = engine
56 self.loop = None
57 self.logger = logging.getLogger("nbi.subscriptions")
58 self.aiomain_task = None # asyncio task for receiving kafka bus
59 self.internal_session = { # used for a session to the engine methods
60 "_id": "subscription",
61 "id": "subscription",
62 "project_id": "admin",
63 "admin": True
64 }
65
66 def run(self):
67 """
68 Start of the thread
69 :return: None
70 """
71 self.loop = asyncio.new_event_loop()
72 try:
73 if not self.db:
74 if self.config["database"]["driver"] == "mongo":
75 self.db = dbmongo.DbMongo()
76 self.db.db_connect(self.config["database"])
77 elif self.config["database"]["driver"] == "memory":
78 self.db = dbmemory.DbMemory()
79 self.db.db_connect(self.config["database"])
80 else:
81 raise SubscriptionException("Invalid configuration param '{}' at '[database]':'driver'".format(
82 self.config["database"]["driver"]))
83 if not self.msg:
84 config_msg = self.config["message"].copy()
85 config_msg["loop"] = self.loop
86 if config_msg["driver"] == "local":
87 self.msg = msglocal.MsgLocal()
88 self.msg.connect(config_msg)
89 elif config_msg["driver"] == "kafka":
90 self.msg = msgkafka.MsgKafka()
91 self.msg.connect(config_msg)
92 else:
93 raise SubscriptionException("Invalid configuration param '{}' at '[message]':'driver'".format(
94 config_msg["driver"]))
95
96 except (DbException, MsgException) as e:
97 raise SubscriptionException(str(e), http_code=e.http_code)
98
99 self.logger.debug("Starting")
100 while True:
101 try:
102 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
103 callback=self._msg_callback),
104 loop=self.loop)
105 self.loop.run_until_complete(self.aiomain_task)
106 except asyncio.CancelledError:
107 break # if cancelled it should end, breaking loop
108 except Exception as e:
109 self.logger.exception("Exception '{}' at messaging read loop".format(e), exc_info=True)
110
111 self.logger.debug("Finishing")
112 self._stop()
113 self.loop.close()
114
115 def _msg_callback(self, topic, command, params):
116 """
117 Callback to process a received message from kafka
118 :param topic: topic received
119 :param command: command received
120 :param params: rest of parameters
121 :return: None
122 """
123 try:
124 if topic == "ns":
125 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
126 self.logger.debug("received ns terminated {}".format(params))
127 if params.get("autoremove"):
128 self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
129 self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
130 return
131 if topic == "nsi":
132 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
133 self.logger.debug("received nsi terminated {}".format(params))
134 if params.get("autoremove"):
135 self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"])
136 self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
137 return
138 except (EngineException, DbException, MsgException) as e:
139 self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
140 except Exception as e:
141 self.logger.exception("Exception while processing topic={} command={}: {}".format(topic, command, e),
142 exc_info=True)
143
144 def _stop(self):
145 """
146 Close all connections
147 :return: None
148 """
149 try:
150 if self.db:
151 self.db.db_disconnect()
152 if self.msg:
153 self.msg.disconnect()
154 except (DbException, MsgException) as e:
155 raise SubscriptionException(str(e), http_code=e.http_code)
156
157 def terminate(self):
158 """
159 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
160 but not immediately.
161 :return: None
162 """
163 self.loop.call_soon_threadsafe(self.aiomain_task.cancel)