1 # Copyright 2020 Preethika P(Tata Elxsi)
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 __author__
= "Preethika P,preethika.p@tataelxsi.co.in"
19 from osm_nbi
.base_topic
import BaseTopic
, EngineException
20 from osm_nbi
.validation
import subscription
21 from http
import HTTPStatus
24 class CommonSubscriptions(BaseTopic
):
25 topic
= "subscriptions"
28 def format_subscription(self
, subs_data
):
30 Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
31 :param subs_data: Subscription data to be ordered.
34 if isinstance(subs_data
, dict):
35 for key
in subs_data
.keys():
37 if isinstance(subs_data
[key
], list):
41 self
.format_subscription(subs_data
[key
])
44 def check_conflict_on_new(self
, session
, content
):
46 Two subscriptions are equal if Auth username, CallbackUri and filter are same.
47 :param session: Session object.
48 :param content: Subscription data.
49 :return: None if no conflict otherwise, raises an exception.
51 # Get all subscriptions from db table subscriptions and compare.
52 self
.format_subscription(content
)
53 filter_dict
= {"CallbackUri": content
["CallbackUri"]}
54 if content
.get("authentication"):
55 if content
["authentication"].get("authType") == "basic":
56 filter_dict
["authentication.authType"] = "basic"
57 # elif add other authTypes here
59 filter_dict
["authentication"] = None # For Items without authentication
60 existing_subscriptions
= self
.db
.get_list("subscriptions", q_filter
=filter_dict
)
63 content
.get("authentication")
64 and content
["authentication"].get("authType") == "basic"
66 new_sub_pwd
= content
["authentication"]["paramsBasic"]["password"]
67 content
["authentication"]["paramsBasic"].pop("password", None)
68 for existing_subscription
in existing_subscriptions
:
69 sub_id
= existing_subscription
.pop("_id", None)
70 existing_subscription
.pop("_admin", None)
71 existing_subscription
.pop("schema_version", None)
73 existing_subscription
.get("authentication")
74 and existing_subscription
["authentication"].get("authType") == "basic"
76 existing_subscription
["authentication"]["paramsBasic"].pop(
79 # self.logger.debug(existing_subscription)
80 if existing_subscription
== content
:
81 raise EngineException(
82 "Subscription already exists with id: {}".format(sub_id
),
86 content
["authentication"]["paramsBasic"]["password"] = new_sub_pwd
89 def format_on_new(self
, content
, project_id
=None, make_public
=False):
90 super().format_on_new(content
, project_id
=project_id
, make_public
=make_public
)
92 # TODO check how to release Engine.write_lock during the check
93 def _check_endpoint(url
, auth
):
95 Checks if the notification endpoint is valid
96 :param url: the notification end
97 :param auth: contains the authentication details with type basic
101 response
= requests
.get(url
, timeout
=5)
102 if response
.status_code
!= HTTPStatus
.NO_CONTENT
:
103 raise EngineException(
104 "Cannot access to the notification URL '{}',received {}: {}".format(
105 url
, response
.status_code
, response
.content
108 elif auth
["authType"] == "basic":
109 username
= auth
["paramsBasic"].get("userName")
110 password
= auth
["paramsBasic"].get("password")
111 response
= requests
.get(url
, auth
=(username
, password
), timeout
=5)
112 if response
.status_code
!= HTTPStatus
.NO_CONTENT
:
113 raise EngineException(
114 "Cannot access to the notification URL '{}',received {}: {}".format(
115 url
, response
.status_code
, response
.content
118 except requests
.exceptions
.RequestException
as e
:
119 error_text
= type(e
).__name
__ + ": " + str(e
)
120 raise EngineException(
121 "Cannot access to the notification URL '{}': {}".format(
126 url
= content
["CallbackUri"]
127 auth
= content
.get("authentication")
128 _check_endpoint(url
, auth
)
129 content
["schema_version"] = schema_version
= "1.1"
130 if auth
is not None and auth
["authType"] == "basic":
131 if content
["authentication"]["paramsBasic"].get("password"):
132 content
["authentication"]["paramsBasic"]["password"] = self
.db
.encrypt(
133 content
["authentication"]["paramsBasic"]["password"],
134 schema_version
=schema_version
,
139 def new(self
, rollback
, session
, indata
=None, kwargs
=None, headers
=None):
141 Uses BaseTopic.new to create entry into db
142 Once entry is made into subscriptions,mapper function is invoked
144 _id
, op_id
= BaseTopic
.new(
145 self
, rollback
, session
, indata
=indata
, kwargs
=kwargs
, headers
=headers
149 "topic": "mapped_subscriptions",
150 "operation": "del_list",
151 "filter": {"reference": _id
},
154 self
._subscription
_mapper
(_id
, indata
, table
="mapped_subscriptions")
157 def delete_extra(self
, session
, _id
, db_content
, not_send_msg
=None):
159 Deletes the mapped_subscription entry for this particular subscriber
160 :param _id: subscription_id deleted
162 super().delete_extra(session
, _id
, db_content
, not_send_msg
)
163 filter_q
= {"reference": _id
}
164 self
.db
.del_list("mapped_subscriptions", filter_q
)
167 class NslcmSubscriptionsTopic(CommonSubscriptions
):
168 schema_new
= subscription
170 def _subscription_mapper(self
, _id
, data
, table
):
172 Performs data transformation on subscription request
173 :param data: data to be trasformed
174 :param table: table in which transformed data are inserted
178 "reference": data
.get("_id"),
179 "CallbackUri": data
.get("CallbackUri"),
181 if data
.get("authentication"):
182 formed_data
.update({"authentication": data
.get("authentication")})
183 if data
.get("filter"):
184 if data
["filter"].get("nsInstanceSubscriptionFilter"):
185 key
= list(data
["filter"]["nsInstanceSubscriptionFilter"].keys())[0]
186 identifier
= data
["filter"]["nsInstanceSubscriptionFilter"][key
]
187 formed_data
.update({"identifier": identifier
})
188 if data
["filter"].get("notificationTypes"):
189 for elem
in data
["filter"].get("notificationTypes"):
190 update_dict
= formed_data
.copy()
191 update_dict
["notificationType"] = elem
192 if elem
== "NsIdentifierCreationNotification":
193 update_dict
["operationTypes"] = "INSTANTIATE"
194 update_dict
["operationStates"] = "ANY"
195 formatted_data
.append(update_dict
)
196 elif elem
== "NsIdentifierDeletionNotification":
197 update_dict
["operationTypes"] = "TERMINATE"
198 update_dict
["operationStates"] = "ANY"
199 formatted_data
.append(update_dict
)
200 elif elem
== "NsLcmOperationOccurrenceNotification":
201 if "operationTypes" in data
["filter"].keys():
202 update_dict
["operationTypes"] = data
["filter"][
206 update_dict
["operationTypes"] = "ANY"
207 if "operationStates" in data
["filter"].keys():
208 update_dict
["operationStates"] = data
["filter"][
212 update_dict
["operationStates"] = "ANY"
213 formatted_data
.append(update_dict
)
214 elif elem
== "NsChangeNotification":
215 if "nsComponentTypes" in data
["filter"].keys():
216 update_dict
["nsComponentTypes"] = data
["filter"][
220 update_dict
["nsComponentTypes"] = "ANY"
221 if "lcmOpNameImpactingNsComponent" in data
["filter"].keys():
222 update_dict
["lcmOpNameImpactingNsComponent"] = data
[
224 ]["lcmOpNameImpactingNsComponent"]
226 update_dict
["lcmOpNameImpactingNsComponent"] = "ANY"
228 "lcmOpOccStatusImpactingNsComponent"
229 in data
["filter"].keys()
231 update_dict
["lcmOpOccStatusImpactingNsComponent"] = data
[
233 ]["lcmOpOccStatusImpactingNsComponent"]
235 update_dict
["lcmOpOccStatusImpactingNsComponent"] = "ANY"
236 formatted_data
.append(update_dict
)
237 self
.db
.create_list(table
, formatted_data
)