topic = "subscriptions"
topic_msg = None
+ def _subscription_mapper(self, _id, data, table):
+ """
+ Performs data transformation on subscription request
+ :param data: data to be trasformed
+ :param table: table in which transformed data are inserted
+ """
+ pass
+
def format_subscription(self, subs_data):
"""
Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
filter_dict["authentication"] = None # For Items without authentication
existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict)
new_sub_pwd = None
- if content.get("authentication") and content["authentication"].get("authType") == "basic":
+ if (
+ content.get("authentication")
+ and content["authentication"].get("authType") == "basic"
+ ):
new_sub_pwd = content["authentication"]["paramsBasic"]["password"]
content["authentication"]["paramsBasic"].pop("password", None)
for existing_subscription in existing_subscriptions:
sub_id = existing_subscription.pop("_id", None)
existing_subscription.pop("_admin", None)
existing_subscription.pop("schema_version", None)
- if existing_subscription.get("authentication") and \
- existing_subscription["authentication"].get("authType") == "basic":
- existing_subscription["authentication"]["paramsBasic"].pop("password", None)
+ if (
+ existing_subscription.get("authentication")
+ and existing_subscription["authentication"].get("authType") == "basic"
+ ):
+ existing_subscription["authentication"]["paramsBasic"].pop(
+ "password", None
+ )
# self.logger.debug(existing_subscription)
if existing_subscription == content:
- raise EngineException("Subscription already exists with id: {}".format(sub_id),
- HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Subscription already exists with id: {}".format(sub_id),
+ HTTPStatus.CONFLICT,
+ )
if new_sub_pwd:
content["authentication"]["paramsBasic"]["password"] = new_sub_pwd
return
if auth is None:
response = requests.get(url, timeout=5)
if response.status_code != HTTPStatus.NO_CONTENT:
- raise EngineException("Cannot access to the notification URL '{}',received {}: {}"
- .format(url, response.status_code, response.content))
+ raise EngineException(
+ "Cannot access to the notification URL '{}',received {}: {}".format(
+ url, response.status_code, response.content
+ )
+ )
elif auth["authType"] == "basic":
username = auth["paramsBasic"].get("userName")
password = auth["paramsBasic"].get("password")
response = requests.get(url, auth=(username, password), timeout=5)
if response.status_code != HTTPStatus.NO_CONTENT:
- raise EngineException("Cannot access to the notification URL '{}',received {}: {}"
- .format(url, response.status_code, response.content))
+ raise EngineException(
+ "Cannot access to the notification URL '{}',received {}: {}".format(
+ url, response.status_code, response.content
+ )
+ )
except requests.exceptions.RequestException as e:
error_text = type(e).__name__ + ": " + str(e)
- raise EngineException("Cannot access to the notification URL '{}': {}".format(url, error_text))
+ raise EngineException(
+ "Cannot access to the notification URL '{}': {}".format(
+ url, error_text
+ )
+ )
url = content["CallbackUri"]
auth = content.get("authentication")
content["schema_version"] = schema_version = "1.1"
if auth is not None and auth["authType"] == "basic":
if content["authentication"]["paramsBasic"].get("password"):
- content["authentication"]["paramsBasic"]["password"] = \
- self.db.encrypt(content["authentication"]["paramsBasic"]["password"],
- schema_version=schema_version, salt=content["_id"])
+ content["authentication"]["paramsBasic"]["password"] = self.db.encrypt(
+ content["authentication"]["paramsBasic"]["password"],
+ schema_version=schema_version,
+ salt=content["_id"],
+ )
return None
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
Uses BaseTopic.new to create entry into db
Once entry is made into subscriptions,mapper function is invoked
"""
- _id, op_id = BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers)
- rollback.append({"topic": "mapped_subscriptions", "operation": "del_list", "filter": {"reference": _id}})
+ _id, op_id = BaseTopic.new(
+ self, rollback, session, indata=indata, kwargs=kwargs, headers=headers
+ )
+ rollback.append(
+ {
+ "topic": "mapped_subscriptions",
+ "operation": "del_list",
+ "filter": {"reference": _id},
+ }
+ )
self._subscription_mapper(_id, indata, table="mapped_subscriptions")
return _id, op_id
:param table: table in which transformed data are inserted
"""
formatted_data = []
- formed_data = {"reference": data.get("_id"),
- "CallbackUri": data.get("CallbackUri")}
+ formed_data = {
+ "reference": data.get("_id"),
+ "CallbackUri": data.get("CallbackUri"),
+ }
if data.get("authentication"):
formed_data.update({"authentication": data.get("authentication")})
if data.get("filter"):
formatted_data.append(update_dict)
elif elem == "NsLcmOperationOccurrenceNotification":
if "operationTypes" in data["filter"].keys():
- update_dict["operationTypes"] = data["filter"]["operationTypes"]
+ update_dict["operationTypes"] = data["filter"][
+ "operationTypes"
+ ]
else:
update_dict["operationTypes"] = "ANY"
if "operationStates" in data["filter"].keys():
- update_dict["operationStates"] = data["filter"]["operationStates"]
+ update_dict["operationStates"] = data["filter"][
+ "operationStates"
+ ]
else:
update_dict["operationStates"] = "ANY"
formatted_data.append(update_dict)
elif elem == "NsChangeNotification":
if "nsComponentTypes" in data["filter"].keys():
- update_dict["nsComponentTypes"] = data["filter"]["nsComponentTypes"]
+ update_dict["nsComponentTypes"] = data["filter"][
+ "nsComponentTypes"
+ ]
else:
update_dict["nsComponentTypes"] = "ANY"
if "lcmOpNameImpactingNsComponent" in data["filter"].keys():
- update_dict["lcmOpNameImpactingNsComponent"] = \
- data["filter"]["lcmOpNameImpactingNsComponent"]
+ update_dict["lcmOpNameImpactingNsComponent"] = data[
+ "filter"
+ ]["lcmOpNameImpactingNsComponent"]
else:
update_dict["lcmOpNameImpactingNsComponent"] = "ANY"
- if "lcmOpOccStatusImpactingNsComponent" in data["filter"].keys():
- update_dict["lcmOpOccStatusImpactingNsComponent"] = \
- data["filter"]["lcmOpOccStatusImpactingNsComponent"]
+ if (
+ "lcmOpOccStatusImpactingNsComponent"
+ in data["filter"].keys()
+ ):
+ update_dict["lcmOpOccStatusImpactingNsComponent"] = data[
+ "filter"
+ ]["lcmOpOccStatusImpactingNsComponent"]
else:
update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY"
formatted_data.append(update_dict)