| # Copyright 2020 Preethika P(Tata Elxsi) |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| __author__ = "Preethika P,preethika.p@tataelxsi.co.in" |
| |
| import requests |
| from osm_nbi.base_topic import BaseTopic, EngineException |
| from osm_nbi.validation import subscription |
| from http import HTTPStatus |
| |
| |
| class CommonSubscriptions(BaseTopic): |
| topic = "subscriptions" |
| topic_msg = None |
| |
| def format_subscription(self, subs_data): |
| """ |
| Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4. |
| :param subs_data: Subscription data to be ordered. |
| :return: None |
| """ |
| if isinstance(subs_data, dict): |
| for key in subs_data.keys(): |
| # Base case |
| if isinstance(subs_data[key], list): |
| subs_data[key].sort() |
| return |
| # Recursive case |
| self.format_subscription(subs_data[key]) |
| return |
| |
| def check_conflict_on_new(self, session, content): |
| """ |
| Two subscriptions are equal if Auth username, CallbackUri and filter are same. |
| :param session: Session object. |
| :param content: Subscription data. |
| :return: None if no conflict otherwise, raises an exception. |
| """ |
| # Get all subscriptions from db table subscriptions and compare. |
| self.format_subscription(content) |
| filter_dict = {"CallbackUri": content["CallbackUri"]} |
| if content.get("authentication"): |
| if content["authentication"].get("authType") == "basic": |
| filter_dict["authentication.authType"] = "basic" |
| # elif add other authTypes here |
| else: |
| filter_dict["authentication"] = None # For Items without authentication |
| existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict) |
| new_sub_pwd = None |
| if ( |
| content.get("authentication") |
| and content["authentication"].get("authType") == "basic" |
| ): |
| new_sub_pwd = content["authentication"]["paramsBasic"]["password"] |
| content["authentication"]["paramsBasic"].pop("password", None) |
| for existing_subscription in existing_subscriptions: |
| sub_id = existing_subscription.pop("_id", None) |
| existing_subscription.pop("_admin", None) |
| existing_subscription.pop("schema_version", None) |
| if ( |
| existing_subscription.get("authentication") |
| and existing_subscription["authentication"].get("authType") == "basic" |
| ): |
| existing_subscription["authentication"]["paramsBasic"].pop( |
| "password", None |
| ) |
| # self.logger.debug(existing_subscription) |
| if existing_subscription == content: |
| raise EngineException( |
| "Subscription already exists with id: {}".format(sub_id), |
| HTTPStatus.CONFLICT, |
| ) |
| if new_sub_pwd: |
| content["authentication"]["paramsBasic"]["password"] = new_sub_pwd |
| return |
| |
| def format_on_new(self, content, project_id=None, make_public=False): |
| super().format_on_new(content, project_id=project_id, make_public=make_public) |
| |
| # TODO check how to release Engine.write_lock during the check |
| def _check_endpoint(url, auth): |
| """ |
| Checks if the notification endpoint is valid |
| :param url: the notification end |
| :param auth: contains the authentication details with type basic |
| """ |
| try: |
| if auth is None: |
| response = requests.get(url, timeout=5) |
| if response.status_code != HTTPStatus.NO_CONTENT: |
| raise EngineException( |
| "Cannot access to the notification URL '{}',received {}: {}".format( |
| url, response.status_code, response.content |
| ) |
| ) |
| elif auth["authType"] == "basic": |
| username = auth["paramsBasic"].get("userName") |
| password = auth["paramsBasic"].get("password") |
| response = requests.get(url, auth=(username, password), timeout=5) |
| if response.status_code != HTTPStatus.NO_CONTENT: |
| raise EngineException( |
| "Cannot access to the notification URL '{}',received {}: {}".format( |
| url, response.status_code, response.content |
| ) |
| ) |
| except requests.exceptions.RequestException as e: |
| error_text = type(e).__name__ + ": " + str(e) |
| raise EngineException( |
| "Cannot access to the notification URL '{}': {}".format( |
| url, error_text |
| ) |
| ) |
| |
| url = content["CallbackUri"] |
| auth = content.get("authentication") |
| _check_endpoint(url, auth) |
| content["schema_version"] = schema_version = "1.1" |
| if auth is not None and auth["authType"] == "basic": |
| if content["authentication"]["paramsBasic"].get("password"): |
| content["authentication"]["paramsBasic"]["password"] = self.db.encrypt( |
| content["authentication"]["paramsBasic"]["password"], |
| schema_version=schema_version, |
| salt=content["_id"], |
| ) |
| return None |
| |
| def new(self, rollback, session, indata=None, kwargs=None, headers=None): |
| """ |
| Uses BaseTopic.new to create entry into db |
| Once entry is made into subscriptions,mapper function is invoked |
| """ |
| _id, op_id = BaseTopic.new( |
| self, rollback, session, indata=indata, kwargs=kwargs, headers=headers |
| ) |
| rollback.append( |
| { |
| "topic": "mapped_subscriptions", |
| "operation": "del_list", |
| "filter": {"reference": _id}, |
| } |
| ) |
| self._subscription_mapper(_id, indata, table="mapped_subscriptions") |
| return _id, op_id |
| |
| def delete_extra(self, session, _id, db_content, not_send_msg=None): |
| """ |
| Deletes the mapped_subscription entry for this particular subscriber |
| :param _id: subscription_id deleted |
| """ |
| super().delete_extra(session, _id, db_content, not_send_msg) |
| filter_q = {"reference": _id} |
| self.db.del_list("mapped_subscriptions", filter_q) |
| |
| |
| class NslcmSubscriptionsTopic(CommonSubscriptions): |
| schema_new = subscription |
| |
| def _subscription_mapper(self, _id, data, table): |
| """ |
| Performs data transformation on subscription request |
| :param data: data to be trasformed |
| :param table: table in which transformed data are inserted |
| """ |
| formatted_data = [] |
| formed_data = { |
| "reference": data.get("_id"), |
| "CallbackUri": data.get("CallbackUri"), |
| } |
| if data.get("authentication"): |
| formed_data.update({"authentication": data.get("authentication")}) |
| if data.get("filter"): |
| if data["filter"].get("nsInstanceSubscriptionFilter"): |
| key = list(data["filter"]["nsInstanceSubscriptionFilter"].keys())[0] |
| identifier = data["filter"]["nsInstanceSubscriptionFilter"][key] |
| formed_data.update({"identifier": identifier}) |
| if data["filter"].get("notificationTypes"): |
| for elem in data["filter"].get("notificationTypes"): |
| update_dict = formed_data.copy() |
| update_dict["notificationType"] = elem |
| if elem == "NsIdentifierCreationNotification": |
| update_dict["operationTypes"] = "INSTANTIATE" |
| update_dict["operationStates"] = "ANY" |
| formatted_data.append(update_dict) |
| elif elem == "NsIdentifierDeletionNotification": |
| update_dict["operationTypes"] = "TERMINATE" |
| update_dict["operationStates"] = "ANY" |
| formatted_data.append(update_dict) |
| elif elem == "NsLcmOperationOccurrenceNotification": |
| if "operationTypes" in data["filter"].keys(): |
| update_dict["operationTypes"] = data["filter"][ |
| "operationTypes" |
| ] |
| else: |
| update_dict["operationTypes"] = "ANY" |
| if "operationStates" in data["filter"].keys(): |
| update_dict["operationStates"] = data["filter"][ |
| "operationStates" |
| ] |
| else: |
| update_dict["operationStates"] = "ANY" |
| formatted_data.append(update_dict) |
| elif elem == "NsChangeNotification": |
| if "nsComponentTypes" in data["filter"].keys(): |
| update_dict["nsComponentTypes"] = data["filter"][ |
| "nsComponentTypes" |
| ] |
| else: |
| update_dict["nsComponentTypes"] = "ANY" |
| if "lcmOpNameImpactingNsComponent" in data["filter"].keys(): |
| update_dict["lcmOpNameImpactingNsComponent"] = data[ |
| "filter" |
| ]["lcmOpNameImpactingNsComponent"] |
| else: |
| update_dict["lcmOpNameImpactingNsComponent"] = "ANY" |
| if ( |
| "lcmOpOccStatusImpactingNsComponent" |
| in data["filter"].keys() |
| ): |
| update_dict["lcmOpOccStatusImpactingNsComponent"] = data[ |
| "filter" |
| ]["lcmOpOccStatusImpactingNsComponent"] |
| else: |
| update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY" |
| formatted_data.append(update_dict) |
| self.db.create_list(table, formatted_data) |
| return None |