Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / subscription_topics.py
1 # Copyright 2020 Preethika P(Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 __author__ = "Preethika P,preethika.p@tataelxsi.co.in"
17
18 import requests
19 from osm_nbi.base_topic import BaseTopic, EngineException
20 from osm_nbi.validation import subscription
21 from http import HTTPStatus
22
23
24 class CommonSubscriptions(BaseTopic):
25 topic = "subscriptions"
26 topic_msg = None
27
28 def _subscription_mapper(self, _id, data, table):
29 """
30 Performs data transformation on subscription request
31 :param data: data to be trasformed
32 :param table: table in which transformed data are inserted
33 """
34 pass
35
36 def format_subscription(self, subs_data):
37 """
38 Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
39 :param subs_data: Subscription data to be ordered.
40 :return: None
41 """
42 if isinstance(subs_data, dict):
43 for key in subs_data.keys():
44 # Base case
45 if isinstance(subs_data[key], list):
46 subs_data[key].sort()
47 return
48 # Recursive case
49 self.format_subscription(subs_data[key])
50 return
51
52 def check_conflict_on_new(self, session, content):
53 """
54 Two subscriptions are equal if Auth username, CallbackUri and filter are same.
55 :param session: Session object.
56 :param content: Subscription data.
57 :return: None if no conflict otherwise, raises an exception.
58 """
59 # Get all subscriptions from db table subscriptions and compare.
60 self.format_subscription(content)
61 filter_dict = {"CallbackUri": content["CallbackUri"]}
62 if content.get("authentication"):
63 if content["authentication"].get("authType") == "basic":
64 filter_dict["authentication.authType"] = "basic"
65 # elif add other authTypes here
66 else:
67 filter_dict["authentication"] = None # For Items without authentication
68 existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict)
69 new_sub_pwd = None
70 if (
71 content.get("authentication")
72 and content["authentication"].get("authType") == "basic"
73 ):
74 new_sub_pwd = content["authentication"]["paramsBasic"]["password"]
75 content["authentication"]["paramsBasic"].pop("password", None)
76 for existing_subscription in existing_subscriptions:
77 sub_id = existing_subscription.pop("_id", None)
78 existing_subscription.pop("_admin", None)
79 existing_subscription.pop("schema_version", None)
80 if (
81 existing_subscription.get("authentication")
82 and existing_subscription["authentication"].get("authType") == "basic"
83 ):
84 existing_subscription["authentication"]["paramsBasic"].pop(
85 "password", None
86 )
87 # self.logger.debug(existing_subscription)
88 if existing_subscription == content:
89 raise EngineException(
90 "Subscription already exists with id: {}".format(sub_id),
91 HTTPStatus.CONFLICT,
92 )
93 if new_sub_pwd:
94 content["authentication"]["paramsBasic"]["password"] = new_sub_pwd
95 return
96
97 def format_on_new(self, content, project_id=None, make_public=False):
98 super().format_on_new(content, project_id=project_id, make_public=make_public)
99
100 # TODO check how to release Engine.write_lock during the check
101 def _check_endpoint(url, auth):
102 """
103 Checks if the notification endpoint is valid
104 :param url: the notification end
105 :param auth: contains the authentication details with type basic
106 """
107 try:
108 if auth is None:
109 response = requests.get(url, timeout=5)
110 if response.status_code != HTTPStatus.NO_CONTENT:
111 raise EngineException(
112 "Cannot access to the notification URL '{}',received {}: {}".format(
113 url, response.status_code, response.content
114 )
115 )
116 elif auth["authType"] == "basic":
117 username = auth["paramsBasic"].get("userName")
118 password = auth["paramsBasic"].get("password")
119 response = requests.get(url, auth=(username, password), timeout=5)
120 if response.status_code != HTTPStatus.NO_CONTENT:
121 raise EngineException(
122 "Cannot access to the notification URL '{}',received {}: {}".format(
123 url, response.status_code, response.content
124 )
125 )
126 except requests.exceptions.RequestException as e:
127 error_text = type(e).__name__ + ": " + str(e)
128 raise EngineException(
129 "Cannot access to the notification URL '{}': {}".format(
130 url, error_text
131 )
132 )
133
134 url = content["CallbackUri"]
135 auth = content.get("authentication")
136 _check_endpoint(url, auth)
137 content["schema_version"] = schema_version = "1.1"
138 if auth is not None and auth["authType"] == "basic":
139 if content["authentication"]["paramsBasic"].get("password"):
140 content["authentication"]["paramsBasic"]["password"] = self.db.encrypt(
141 content["authentication"]["paramsBasic"]["password"],
142 schema_version=schema_version,
143 salt=content["_id"],
144 )
145 return None
146
147 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
148 """
149 Uses BaseTopic.new to create entry into db
150 Once entry is made into subscriptions,mapper function is invoked
151 """
152 _id, op_id = BaseTopic.new(
153 self, rollback, session, indata=indata, kwargs=kwargs, headers=headers
154 )
155 rollback.append(
156 {
157 "topic": "mapped_subscriptions",
158 "operation": "del_list",
159 "filter": {"reference": _id},
160 }
161 )
162 self._subscription_mapper(_id, indata, table="mapped_subscriptions")
163 return _id, op_id
164
165 def delete_extra(self, session, _id, db_content, not_send_msg=None):
166 """
167 Deletes the mapped_subscription entry for this particular subscriber
168 :param _id: subscription_id deleted
169 """
170 super().delete_extra(session, _id, db_content, not_send_msg)
171 filter_q = {"reference": _id}
172 self.db.del_list("mapped_subscriptions", filter_q)
173
174
175 class NslcmSubscriptionsTopic(CommonSubscriptions):
176 schema_new = subscription
177
178 def _subscription_mapper(self, _id, data, table):
179 """
180 Performs data transformation on subscription request
181 :param data: data to be trasformed
182 :param table: table in which transformed data are inserted
183 """
184 formatted_data = []
185 formed_data = {
186 "reference": data.get("_id"),
187 "CallbackUri": data.get("CallbackUri"),
188 }
189 if data.get("authentication"):
190 formed_data.update({"authentication": data.get("authentication")})
191 if data.get("filter"):
192 if data["filter"].get("nsInstanceSubscriptionFilter"):
193 key = list(data["filter"]["nsInstanceSubscriptionFilter"].keys())[0]
194 identifier = data["filter"]["nsInstanceSubscriptionFilter"][key]
195 formed_data.update({"identifier": identifier})
196 if data["filter"].get("notificationTypes"):
197 for elem in data["filter"].get("notificationTypes"):
198 update_dict = formed_data.copy()
199 update_dict["notificationType"] = elem
200 if elem == "NsIdentifierCreationNotification":
201 update_dict["operationTypes"] = "INSTANTIATE"
202 update_dict["operationStates"] = "ANY"
203 formatted_data.append(update_dict)
204 elif elem == "NsIdentifierDeletionNotification":
205 update_dict["operationTypes"] = "TERMINATE"
206 update_dict["operationStates"] = "ANY"
207 formatted_data.append(update_dict)
208 elif elem == "NsLcmOperationOccurrenceNotification":
209 if "operationTypes" in data["filter"].keys():
210 update_dict["operationTypes"] = data["filter"][
211 "operationTypes"
212 ]
213 else:
214 update_dict["operationTypes"] = "ANY"
215 if "operationStates" in data["filter"].keys():
216 update_dict["operationStates"] = data["filter"][
217 "operationStates"
218 ]
219 else:
220 update_dict["operationStates"] = "ANY"
221 formatted_data.append(update_dict)
222 elif elem == "NsChangeNotification":
223 if "nsComponentTypes" in data["filter"].keys():
224 update_dict["nsComponentTypes"] = data["filter"][
225 "nsComponentTypes"
226 ]
227 else:
228 update_dict["nsComponentTypes"] = "ANY"
229 if "lcmOpNameImpactingNsComponent" in data["filter"].keys():
230 update_dict["lcmOpNameImpactingNsComponent"] = data[
231 "filter"
232 ]["lcmOpNameImpactingNsComponent"]
233 else:
234 update_dict["lcmOpNameImpactingNsComponent"] = "ANY"
235 if (
236 "lcmOpOccStatusImpactingNsComponent"
237 in data["filter"].keys()
238 ):
239 update_dict["lcmOpOccStatusImpactingNsComponent"] = data[
240 "filter"
241 ]["lcmOpOccStatusImpactingNsComponent"]
242 else:
243 update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY"
244 formatted_data.append(update_dict)
245 self.db.create_list(table, formatted_data)
246 return None