X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fsubscription_topics.py;h=92c7417a434f4cc555a04945e91ab7eaa3f34f4d;hp=a5c9c44bd741366eff4f5e7e0540b46211668a4d;hb=5758955b7b394517ff5caf5506a4400cdc5aa372;hpb=329b81892dc8d2549209fd73505d1f88cf862a72 diff --git a/osm_nbi/subscription_topics.py b/osm_nbi/subscription_topics.py index a5c9c44..92c7417 100644 --- a/osm_nbi/subscription_topics.py +++ b/osm_nbi/subscription_topics.py @@ -15,51 +15,125 @@ __author__ = "Preethika P,preethika.p@tataelxsi.co.in" - import requests from osm_nbi.base_topic import BaseTopic, EngineException from osm_nbi.validation import subscription +from http import HTTPStatus class CommonSubscriptions(BaseTopic): topic = "subscriptions" topic_msg = None - + + def format_subscription(self, subs_data): + """ + Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4. + :param subs_data: Subscription data to be ordered. + :return: None + """ + if isinstance(subs_data, dict): + for key in subs_data.keys(): + # Base case + if isinstance(subs_data[key], list): + subs_data[key].sort() + return + # Recursive case + self.format_subscription(subs_data[key]) + return + + def check_conflict_on_new(self, session, content): + """ + Two subscriptions are equal if Auth username, CallbackUri and filter are same. + :param session: Session object. + :param content: Subscription data. + :return: None if no conflict otherwise, raises an exception. + """ + # Get all subscriptions from db table subscriptions and compare. + self.format_subscription(content) + filter_dict = {"CallbackUri": content["CallbackUri"]} + if content.get("authentication"): + if content["authentication"].get("authType") == "basic": + filter_dict["authentication.authType"] = "basic" + # elif add other authTypes here + else: + filter_dict["authentication"] = None # For Items without authentication + existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict) + new_sub_pwd = None + if ( + content.get("authentication") + and content["authentication"].get("authType") == "basic" + ): + new_sub_pwd = content["authentication"]["paramsBasic"]["password"] + content["authentication"]["paramsBasic"].pop("password", None) + for existing_subscription in existing_subscriptions: + sub_id = existing_subscription.pop("_id", None) + existing_subscription.pop("_admin", None) + existing_subscription.pop("schema_version", None) + if ( + existing_subscription.get("authentication") + and existing_subscription["authentication"].get("authType") == "basic" + ): + existing_subscription["authentication"]["paramsBasic"].pop( + "password", None + ) + # self.logger.debug(existing_subscription) + if existing_subscription == content: + raise EngineException( + "Subscription already exists with id: {}".format(sub_id), + HTTPStatus.CONFLICT, + ) + if new_sub_pwd: + content["authentication"]["paramsBasic"]["password"] = new_sub_pwd + return + def format_on_new(self, content, project_id=None, make_public=False): super().format_on_new(content, project_id=project_id, make_public=make_public) - + # TODO check how to release Engine.write_lock during the check def _check_endpoint(url, auth): """ Checks if the notification endpoint is valid :param url: the notification end - :param auth: contains the aunthentication details with type basic + :param auth: contains the authentication details with type basic """ try: if auth is None: response = requests.get(url, timeout=5) - if response.status_code != 204: - raise EngineException("Cannot access to the notification URL '{}',received {}: {}" - .format(url, response.status_code, response.content)) + if response.status_code != HTTPStatus.NO_CONTENT: + raise EngineException( + "Cannot access to the notification URL '{}',received {}: {}".format( + url, response.status_code, response.content + ) + ) elif auth["authType"] == "basic": username = auth["paramsBasic"].get("userName") password = auth["paramsBasic"].get("password") response = requests.get(url, auth=(username, password), timeout=5) - if response.status_code != 204: - raise EngineException("Cannot access to the notification URL '{}',received {}: {}" - .format(url, response.status_code, response.content)) + if response.status_code != HTTPStatus.NO_CONTENT: + raise EngineException( + "Cannot access to the notification URL '{}',received {}: {}".format( + url, response.status_code, response.content + ) + ) except requests.exceptions.RequestException as e: error_text = type(e).__name__ + ": " + str(e) - raise EngineException("Cannot access to the notification URL '{}': {}".format(url, error_text)) + raise EngineException( + "Cannot access to the notification URL '{}': {}".format( + url, error_text + ) + ) + url = content["CallbackUri"] auth = content.get("authentication") _check_endpoint(url, auth) content["schema_version"] = schema_version = "1.1" if auth is not None and auth["authType"] == "basic": if content["authentication"]["paramsBasic"].get("password"): - content["authentication"]["paramsBasic"]["password"] = \ - self.db.encrypt(content["authentication"]["paramsBasic"]["password"], - schema_version=schema_version, salt=content["_id"]) + content["authentication"]["paramsBasic"]["password"] = self.db.encrypt( + content["authentication"]["paramsBasic"]["password"], + schema_version=schema_version, + salt=content["_id"], + ) return None def new(self, rollback, session, indata=None, kwargs=None, headers=None): @@ -67,8 +141,16 @@ class CommonSubscriptions(BaseTopic): Uses BaseTopic.new to create entry into db Once entry is made into subscriptions,mapper function is invoked """ - _id, op_id = BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers) - rollback.append({"topic": "mapped_subscriptions", "operation": "del_list", "filter": {"reference": _id}}) + _id, op_id = BaseTopic.new( + self, rollback, session, indata=indata, kwargs=kwargs, headers=headers + ) + rollback.append( + { + "topic": "mapped_subscriptions", + "operation": "del_list", + "filter": {"reference": _id}, + } + ) self._subscription_mapper(_id, indata, table="mapped_subscriptions") return _id, op_id @@ -78,8 +160,7 @@ class CommonSubscriptions(BaseTopic): :param _id: subscription_id deleted """ super().delete_extra(session, _id, db_content, not_send_msg) - filter_q = {} - filter_q["reference"] = _id + filter_q = {"reference": _id} self.db.del_list("mapped_subscriptions", filter_q) @@ -93,8 +174,10 @@ class NslcmSubscriptionsTopic(CommonSubscriptions): :param table: table in which transformed data are inserted """ formatted_data = [] - formed_data = {"reference": data.get("_id"), - "CallbackUri": data.get("CallbackUri")} + formed_data = { + "reference": data.get("_id"), + "CallbackUri": data.get("CallbackUri"), + } if data.get("authentication"): formed_data.update({"authentication": data.get("authentication")}) if data.get("filter"): @@ -116,27 +199,38 @@ class NslcmSubscriptionsTopic(CommonSubscriptions): formatted_data.append(update_dict) elif elem == "NsLcmOperationOccurrenceNotification": if "operationTypes" in data["filter"].keys(): - update_dict["operationTypes"] = data["filter"]["operationTypes"] + update_dict["operationTypes"] = data["filter"][ + "operationTypes" + ] else: update_dict["operationTypes"] = "ANY" if "operationStates" in data["filter"].keys(): - update_dict["operationStates"] = data["filter"]["operationStates"] + update_dict["operationStates"] = data["filter"][ + "operationStates" + ] else: update_dict["operationStates"] = "ANY" formatted_data.append(update_dict) elif elem == "NsChangeNotification": if "nsComponentTypes" in data["filter"].keys(): - update_dict["nsComponentTypes"] = data["filter"]["nsComponentTypes"] + update_dict["nsComponentTypes"] = data["filter"][ + "nsComponentTypes" + ] else: update_dict["nsComponentTypes"] = "ANY" if "lcmOpNameImpactingNsComponent" in data["filter"].keys(): - update_dict["lcmOpNameImpactingNsComponent"] = \ - data["filter"]["lcmOpNameImpactingNsComponent"] + update_dict["lcmOpNameImpactingNsComponent"] = data[ + "filter" + ]["lcmOpNameImpactingNsComponent"] else: update_dict["lcmOpNameImpactingNsComponent"] = "ANY" - if "lcmOpOccStatusImpactingNsComponent" in data["filter"].keys(): - update_dict["lcmOpOccStatusImpactingNsComponent"] = \ - data["filter"]["lcmOpOccStatusImpactingNsComponent"] + if ( + "lcmOpOccStatusImpactingNsComponent" + in data["filter"].keys() + ): + update_dict["lcmOpOccStatusImpactingNsComponent"] = data[ + "filter" + ]["lcmOpOccStatusImpactingNsComponent"] else: update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY" formatted_data.append(update_dict)