Code Coverage

Cobertura Coverage Report > osm_nbi >

subscription_topics.py

Trend

File Coverage summary

NameClassesLinesConditionals
subscription_topics.py
100%
1/1
14%
17/120
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
subscription_topics.py
14%
17/120
N/A

Source

osm_nbi/subscription_topics.py
1 # Copyright 2020 Preethika P(Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 1 __author__ = "Preethika P,preethika.p@tataelxsi.co.in"
17
18 1 import requests
19 1 from osm_nbi.base_topic import BaseTopic, EngineException
20 1 from osm_nbi.validation import subscription
21 1 from http import HTTPStatus
22
23
24 1 class CommonSubscriptions(BaseTopic):
25 1     topic = "subscriptions"
26 1     topic_msg = None
27
28 1     def _subscription_mapper(self, _id, data, table):
29         """
30         Performs data transformation on subscription request
31         :param data: data to be trasformed
32         :param table: table in which transformed data are inserted
33         """
34 0         pass
35
36 1     def format_subscription(self, subs_data):
37         """
38         Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
39         :param subs_data: Subscription data to be ordered.
40         :return: None
41         """
42 0         if isinstance(subs_data, dict):
43 0             for key in subs_data.keys():
44                 # Base case
45 0                 if isinstance(subs_data[key], list):
46 0                     subs_data[key].sort()
47 0                     return
48                 # Recursive case
49 0                 self.format_subscription(subs_data[key])
50 0         return
51
52 1     def check_conflict_on_new(self, session, content):
53         """
54         Two subscriptions are equal if Auth username, CallbackUri and filter are same.
55         :param session: Session object.
56         :param content: Subscription data.
57         :return: None if no conflict otherwise, raises an exception.
58         """
59         # Get all subscriptions from db table subscriptions and compare.
60 0         self.format_subscription(content)
61 0         filter_dict = {"CallbackUri": content["CallbackUri"]}
62 0         if content.get("authentication"):
63 0             if content["authentication"].get("authType") == "basic":
64 0                 filter_dict["authentication.authType"] = "basic"
65             # elif add other authTypes here
66         else:
67 0             filter_dict["authentication"] = None  # For Items without authentication
68 0         existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict)
69 0         new_sub_pwd = None
70 0         if (
71             content.get("authentication")
72             and content["authentication"].get("authType") == "basic"
73         ):
74 0             new_sub_pwd = content["authentication"]["paramsBasic"]["password"]
75 0             content["authentication"]["paramsBasic"].pop("password", None)
76 0         for existing_subscription in existing_subscriptions:
77 0             sub_id = existing_subscription.pop("_id", None)
78 0             existing_subscription.pop("_admin", None)
79 0             existing_subscription.pop("schema_version", None)
80 0             if (
81                 existing_subscription.get("authentication")
82                 and existing_subscription["authentication"].get("authType") == "basic"
83             ):
84 0                 existing_subscription["authentication"]["paramsBasic"].pop(
85                     "password", None
86                 )
87             # self.logger.debug(existing_subscription)
88 0             if existing_subscription == content:
89 0                 raise EngineException(
90                     "Subscription already exists with id: {}".format(sub_id),
91                     HTTPStatus.CONFLICT,
92                 )
93 0         if new_sub_pwd:
94 0             content["authentication"]["paramsBasic"]["password"] = new_sub_pwd
95 0         return
96
97 1     def format_on_new(self, content, project_id=None, make_public=False):
98 0         super().format_on_new(content, project_id=project_id, make_public=make_public)
99
100         # TODO check how to release Engine.write_lock during the check
101 0         def _check_endpoint(url, auth):
102             """
103             Checks if the notification endpoint is valid
104             :param url: the notification end
105             :param auth: contains the authentication details with type basic
106             """
107 0             try:
108 0                 if auth is None:
109 0                     response = requests.get(url, timeout=5)
110 0                     if response.status_code != HTTPStatus.NO_CONTENT:
111 0                         raise EngineException(
112                             "Cannot access to the notification URL '{}',received {}: {}".format(
113                                 url, response.status_code, response.content
114                             )
115                         )
116 0                 elif auth["authType"] == "basic":
117 0                     username = auth["paramsBasic"].get("userName")
118 0                     password = auth["paramsBasic"].get("password")
119 0                     response = requests.get(url, auth=(username, password), timeout=5)
120 0                     if response.status_code != HTTPStatus.NO_CONTENT:
121 0                         raise EngineException(
122                             "Cannot access to the notification URL '{}',received {}: {}".format(
123                                 url, response.status_code, response.content
124                             )
125                         )
126 0             except requests.exceptions.RequestException as e:
127 0                 error_text = type(e).__name__ + ": " + str(e)
128 0                 raise EngineException(
129                     "Cannot access to the notification URL '{}': {}".format(
130                         url, error_text
131                     )
132                 )
133
134 0         url = content["CallbackUri"]
135 0         auth = content.get("authentication")
136 0         _check_endpoint(url, auth)
137 0         content["schema_version"] = schema_version = "1.1"
138 0         if auth is not None and auth["authType"] == "basic":
139 0             if content["authentication"]["paramsBasic"].get("password"):
140 0                 content["authentication"]["paramsBasic"]["password"] = self.db.encrypt(
141                     content["authentication"]["paramsBasic"]["password"],
142                     schema_version=schema_version,
143                     salt=content["_id"],
144                 )
145 0         return None
146
147 1     def new(self, rollback, session, indata=None, kwargs=None, headers=None):
148         """
149         Uses BaseTopic.new to create entry into db
150         Once entry is made into subscriptions,mapper function is invoked
151         """
152 0         _id, op_id = BaseTopic.new(
153             self, rollback, session, indata=indata, kwargs=kwargs, headers=headers
154         )
155 0         rollback.append(
156             {
157                 "topic": "mapped_subscriptions",
158                 "operation": "del_list",
159                 "filter": {"reference": _id},
160             }
161         )
162 0         self._subscription_mapper(_id, indata, table="mapped_subscriptions")
163 0         return _id, op_id
164
165 1     def delete_extra(self, session, _id, db_content, not_send_msg=None):
166         """
167         Deletes the mapped_subscription entry for this particular subscriber
168         :param _id: subscription_id deleted
169         """
170 0         super().delete_extra(session, _id, db_content, not_send_msg)
171 0         filter_q = {"reference": _id}
172 0         self.db.del_list("mapped_subscriptions", filter_q)
173
174
175 1 class NslcmSubscriptionsTopic(CommonSubscriptions):
176 1     schema_new = subscription
177
178 1     def _subscription_mapper(self, _id, data, table):
179         """
180         Performs data transformation on subscription request
181         :param data: data to be trasformed
182         :param table: table in which transformed data are inserted
183         """
184 0         formatted_data = []
185 0         formed_data = {
186             "reference": data.get("_id"),
187             "CallbackUri": data.get("CallbackUri"),
188         }
189 0         if data.get("authentication"):
190 0             formed_data.update({"authentication": data.get("authentication")})
191 0         if data.get("filter"):
192 0             if data["filter"].get("nsInstanceSubscriptionFilter"):
193 0                 key = list(data["filter"]["nsInstanceSubscriptionFilter"].keys())[0]
194 0                 identifier = data["filter"]["nsInstanceSubscriptionFilter"][key]
195 0                 formed_data.update({"identifier": identifier})
196 0             if data["filter"].get("notificationTypes"):
197 0                 for elem in data["filter"].get("notificationTypes"):
198 0                     update_dict = formed_data.copy()
199 0                     update_dict["notificationType"] = elem
200 0                     if elem == "NsIdentifierCreationNotification":
201 0                         update_dict["operationTypes"] = "INSTANTIATE"
202 0                         update_dict["operationStates"] = "ANY"
203 0                         formatted_data.append(update_dict)
204 0                     elif elem == "NsIdentifierDeletionNotification":
205 0                         update_dict["operationTypes"] = "TERMINATE"
206 0                         update_dict["operationStates"] = "ANY"
207 0                         formatted_data.append(update_dict)
208 0                     elif elem == "NsLcmOperationOccurrenceNotification":
209 0                         if "operationTypes" in data["filter"].keys():
210 0                             update_dict["operationTypes"] = data["filter"][
211                                 "operationTypes"
212                             ]
213                         else:
214 0                             update_dict["operationTypes"] = "ANY"
215 0                         if "operationStates" in data["filter"].keys():
216 0                             update_dict["operationStates"] = data["filter"][
217                                 "operationStates"
218                             ]
219                         else:
220 0                             update_dict["operationStates"] = "ANY"
221 0                         formatted_data.append(update_dict)
222 0                     elif elem == "NsChangeNotification":
223 0                         if "nsComponentTypes" in data["filter"].keys():
224 0                             update_dict["nsComponentTypes"] = data["filter"][
225                                 "nsComponentTypes"
226                             ]
227                         else:
228 0                             update_dict["nsComponentTypes"] = "ANY"
229 0                         if "lcmOpNameImpactingNsComponent" in data["filter"].keys():
230 0                             update_dict["lcmOpNameImpactingNsComponent"] = data[
231                                 "filter"
232                             ]["lcmOpNameImpactingNsComponent"]
233                         else:
234 0                             update_dict["lcmOpNameImpactingNsComponent"] = "ANY"
235 0                         if (
236                             "lcmOpOccStatusImpactingNsComponent"
237                             in data["filter"].keys()
238                         ):
239 0                             update_dict["lcmOpOccStatusImpactingNsComponent"] = data[
240                                 "filter"
241                             ]["lcmOpOccStatusImpactingNsComponent"]
242                         else:
243 0                             update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY"
244 0                         formatted_data.append(update_dict)
245 0         self.db.create_list(table, formatted_data)
246 0         return None