Bug 1926: Update version of Six
[osm/NBI.git] / osm_nbi / subscription_topics.py
1 # Copyright 2020 Preethika P(Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 __author__ = "Preethika P,preethika.p@tataelxsi.co.in"
17
18 import requests
19 from osm_nbi.base_topic import BaseTopic, EngineException
20 from osm_nbi.validation import subscription
21 from http import HTTPStatus
22
23
24 class CommonSubscriptions(BaseTopic):
25 topic = "subscriptions"
26 topic_msg = None
27
28 def format_subscription(self, subs_data):
29 """
30 Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
31 :param subs_data: Subscription data to be ordered.
32 :return: None
33 """
34 if isinstance(subs_data, dict):
35 for key in subs_data.keys():
36 # Base case
37 if isinstance(subs_data[key], list):
38 subs_data[key].sort()
39 return
40 # Recursive case
41 self.format_subscription(subs_data[key])
42 return
43
44 def check_conflict_on_new(self, session, content):
45 """
46 Two subscriptions are equal if Auth username, CallbackUri and filter are same.
47 :param session: Session object.
48 :param content: Subscription data.
49 :return: None if no conflict otherwise, raises an exception.
50 """
51 # Get all subscriptions from db table subscriptions and compare.
52 self.format_subscription(content)
53 filter_dict = {"CallbackUri": content["CallbackUri"]}
54 if content.get("authentication"):
55 if content["authentication"].get("authType") == "basic":
56 filter_dict["authentication.authType"] = "basic"
57 # elif add other authTypes here
58 else:
59 filter_dict["authentication"] = None # For Items without authentication
60 existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict)
61 new_sub_pwd = None
62 if (
63 content.get("authentication")
64 and content["authentication"].get("authType") == "basic"
65 ):
66 new_sub_pwd = content["authentication"]["paramsBasic"]["password"]
67 content["authentication"]["paramsBasic"].pop("password", None)
68 for existing_subscription in existing_subscriptions:
69 sub_id = existing_subscription.pop("_id", None)
70 existing_subscription.pop("_admin", None)
71 existing_subscription.pop("schema_version", None)
72 if (
73 existing_subscription.get("authentication")
74 and existing_subscription["authentication"].get("authType") == "basic"
75 ):
76 existing_subscription["authentication"]["paramsBasic"].pop(
77 "password", None
78 )
79 # self.logger.debug(existing_subscription)
80 if existing_subscription == content:
81 raise EngineException(
82 "Subscription already exists with id: {}".format(sub_id),
83 HTTPStatus.CONFLICT,
84 )
85 if new_sub_pwd:
86 content["authentication"]["paramsBasic"]["password"] = new_sub_pwd
87 return
88
89 def format_on_new(self, content, project_id=None, make_public=False):
90 super().format_on_new(content, project_id=project_id, make_public=make_public)
91
92 # TODO check how to release Engine.write_lock during the check
93 def _check_endpoint(url, auth):
94 """
95 Checks if the notification endpoint is valid
96 :param url: the notification end
97 :param auth: contains the authentication details with type basic
98 """
99 try:
100 if auth is None:
101 response = requests.get(url, timeout=5)
102 if response.status_code != HTTPStatus.NO_CONTENT:
103 raise EngineException(
104 "Cannot access to the notification URL '{}',received {}: {}".format(
105 url, response.status_code, response.content
106 )
107 )
108 elif auth["authType"] == "basic":
109 username = auth["paramsBasic"].get("userName")
110 password = auth["paramsBasic"].get("password")
111 response = requests.get(url, auth=(username, password), timeout=5)
112 if response.status_code != HTTPStatus.NO_CONTENT:
113 raise EngineException(
114 "Cannot access to the notification URL '{}',received {}: {}".format(
115 url, response.status_code, response.content
116 )
117 )
118 except requests.exceptions.RequestException as e:
119 error_text = type(e).__name__ + ": " + str(e)
120 raise EngineException(
121 "Cannot access to the notification URL '{}': {}".format(
122 url, error_text
123 )
124 )
125
126 url = content["CallbackUri"]
127 auth = content.get("authentication")
128 _check_endpoint(url, auth)
129 content["schema_version"] = schema_version = "1.1"
130 if auth is not None and auth["authType"] == "basic":
131 if content["authentication"]["paramsBasic"].get("password"):
132 content["authentication"]["paramsBasic"]["password"] = self.db.encrypt(
133 content["authentication"]["paramsBasic"]["password"],
134 schema_version=schema_version,
135 salt=content["_id"],
136 )
137 return None
138
139 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
140 """
141 Uses BaseTopic.new to create entry into db
142 Once entry is made into subscriptions,mapper function is invoked
143 """
144 _id, op_id = BaseTopic.new(
145 self, rollback, session, indata=indata, kwargs=kwargs, headers=headers
146 )
147 rollback.append(
148 {
149 "topic": "mapped_subscriptions",
150 "operation": "del_list",
151 "filter": {"reference": _id},
152 }
153 )
154 self._subscription_mapper(_id, indata, table="mapped_subscriptions")
155 return _id, op_id
156
157 def delete_extra(self, session, _id, db_content, not_send_msg=None):
158 """
159 Deletes the mapped_subscription entry for this particular subscriber
160 :param _id: subscription_id deleted
161 """
162 super().delete_extra(session, _id, db_content, not_send_msg)
163 filter_q = {"reference": _id}
164 self.db.del_list("mapped_subscriptions", filter_q)
165
166
167 class NslcmSubscriptionsTopic(CommonSubscriptions):
168 schema_new = subscription
169
170 def _subscription_mapper(self, _id, data, table):
171 """
172 Performs data transformation on subscription request
173 :param data: data to be trasformed
174 :param table: table in which transformed data are inserted
175 """
176 formatted_data = []
177 formed_data = {
178 "reference": data.get("_id"),
179 "CallbackUri": data.get("CallbackUri"),
180 }
181 if data.get("authentication"):
182 formed_data.update({"authentication": data.get("authentication")})
183 if data.get("filter"):
184 if data["filter"].get("nsInstanceSubscriptionFilter"):
185 key = list(data["filter"]["nsInstanceSubscriptionFilter"].keys())[0]
186 identifier = data["filter"]["nsInstanceSubscriptionFilter"][key]
187 formed_data.update({"identifier": identifier})
188 if data["filter"].get("notificationTypes"):
189 for elem in data["filter"].get("notificationTypes"):
190 update_dict = formed_data.copy()
191 update_dict["notificationType"] = elem
192 if elem == "NsIdentifierCreationNotification":
193 update_dict["operationTypes"] = "INSTANTIATE"
194 update_dict["operationStates"] = "ANY"
195 formatted_data.append(update_dict)
196 elif elem == "NsIdentifierDeletionNotification":
197 update_dict["operationTypes"] = "TERMINATE"
198 update_dict["operationStates"] = "ANY"
199 formatted_data.append(update_dict)
200 elif elem == "NsLcmOperationOccurrenceNotification":
201 if "operationTypes" in data["filter"].keys():
202 update_dict["operationTypes"] = data["filter"][
203 "operationTypes"
204 ]
205 else:
206 update_dict["operationTypes"] = "ANY"
207 if "operationStates" in data["filter"].keys():
208 update_dict["operationStates"] = data["filter"][
209 "operationStates"
210 ]
211 else:
212 update_dict["operationStates"] = "ANY"
213 formatted_data.append(update_dict)
214 elif elem == "NsChangeNotification":
215 if "nsComponentTypes" in data["filter"].keys():
216 update_dict["nsComponentTypes"] = data["filter"][
217 "nsComponentTypes"
218 ]
219 else:
220 update_dict["nsComponentTypes"] = "ANY"
221 if "lcmOpNameImpactingNsComponent" in data["filter"].keys():
222 update_dict["lcmOpNameImpactingNsComponent"] = data[
223 "filter"
224 ]["lcmOpNameImpactingNsComponent"]
225 else:
226 update_dict["lcmOpNameImpactingNsComponent"] = "ANY"
227 if (
228 "lcmOpOccStatusImpactingNsComponent"
229 in data["filter"].keys()
230 ):
231 update_dict["lcmOpOccStatusImpactingNsComponent"] = data[
232 "filter"
233 ]["lcmOpOccStatusImpactingNsComponent"]
234 else:
235 update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY"
236 formatted_data.append(update_dict)
237 self.db.create_list(table, formatted_data)
238 return None