__author__ = "Preethika P,preethika.p@tataelxsi.co.in"
-
import requests
from osm_nbi.base_topic import BaseTopic, EngineException
from osm_nbi.validation import subscription
+from http import HTTPStatus
class CommonSubscriptions(BaseTopic):
topic = "subscriptions"
topic_msg = None
-
+
+ def format_subscription(self, subs_data):
+ """
+ Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
+ :param subs_data: Subscription data to be ordered.
+ :return: None
+ """
+ if isinstance(subs_data, dict):
+ for key in subs_data.keys():
+ # Base case
+ if isinstance(subs_data[key], list):
+ subs_data[key].sort()
+ return
+ # Recursive case
+ self.format_subscription(subs_data[key])
+ return
+
+ def check_conflict_on_new(self, session, content):
+ """
+ Two subscriptions are equal if Auth username, CallbackUri and filter are same.
+ :param session: Session object.
+ :param content: Subscription data.
+ :return: None if no conflict otherwise, raises an exception.
+ """
+ # Get all subscriptions from db table subscriptions and compare.
+ self.format_subscription(content)
+ filter_dict = {"CallbackUri": content["CallbackUri"]}
+ if content.get("authentication"):
+ if content["authentication"].get("authType") == "basic":
+ filter_dict["authentication.authType"] = "basic"
+ # elif add other authTypes here
+ else:
+ filter_dict["authentication"] = None # For Items without authentication
+ existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict)
+ new_sub_pwd = None
+ if content.get("authentication") and content["authentication"].get("authType") == "basic":
+ new_sub_pwd = content["authentication"]["paramsBasic"]["password"]
+ content["authentication"]["paramsBasic"].pop("password", None)
+ for existing_subscription in existing_subscriptions:
+ sub_id = existing_subscription.pop("_id", None)
+ existing_subscription.pop("_admin", None)
+ existing_subscription.pop("schema_version", None)
+ if existing_subscription.get("authentication") and \
+ existing_subscription["authentication"].get("authType") == "basic":
+ existing_subscription["authentication"]["paramsBasic"].pop("password", None)
+ # self.logger.debug(existing_subscription)
+ if existing_subscription == content:
+ raise EngineException("Subscription already exists with id: {}".format(sub_id),
+ HTTPStatus.CONFLICT)
+ if new_sub_pwd:
+ content["authentication"]["paramsBasic"]["password"] = new_sub_pwd
+ return
+
def format_on_new(self, content, project_id=None, make_public=False):
super().format_on_new(content, project_id=project_id, make_public=make_public)
-
+
# TODO check how to release Engine.write_lock during the check
def _check_endpoint(url, auth):
"""
Checks if the notification endpoint is valid
:param url: the notification end
- :param auth: contains the aunthentication details with type basic
+ :param auth: contains the authentication details with type basic
"""
try:
if auth is None:
response = requests.get(url, timeout=5)
- if response.status_code != 204:
+ if response.status_code != HTTPStatus.NO_CONTENT:
raise EngineException("Cannot access to the notification URL '{}',received {}: {}"
.format(url, response.status_code, response.content))
elif auth["authType"] == "basic":
username = auth["paramsBasic"].get("userName")
password = auth["paramsBasic"].get("password")
response = requests.get(url, auth=(username, password), timeout=5)
- if response.status_code != 204:
+ if response.status_code != HTTPStatus.NO_CONTENT:
raise EngineException("Cannot access to the notification URL '{}',received {}: {}"
.format(url, response.status_code, response.content))
except requests.exceptions.RequestException as e:
error_text = type(e).__name__ + ": " + str(e)
raise EngineException("Cannot access to the notification URL '{}': {}".format(url, error_text))
+
url = content["CallbackUri"]
auth = content.get("authentication")
_check_endpoint(url, auth)
if auth is not None and auth["authType"] == "basic":
if content["authentication"]["paramsBasic"].get("password"):
content["authentication"]["paramsBasic"]["password"] = \
- self.db.encrypt(content["authentication"]["paramsBasic"]["password"],
+ self.db.encrypt(content["authentication"]["paramsBasic"]["password"],
schema_version=schema_version, salt=content["_id"])
return None
:param _id: subscription_id deleted
"""
super().delete_extra(session, _id, db_content, not_send_msg)
- filter_q = {}
- filter_q["reference"] = _id
+ filter_q = {"reference": _id}
self.db.del_list("mapped_subscriptions", filter_q)