blob: 0389483a832394f76c59a07a77fa93cfe1f9a9be [file] [log] [blame]
# Copyright 2020 Preethika P(Tata Elxsi)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = "Preethika P,preethika.p@tataelxsi.co.in"
import requests
from osm_nbi.base_topic import BaseTopic, EngineException
from osm_nbi.validation import subscription
from http import HTTPStatus
class CommonSubscriptions(BaseTopic):
topic = "subscriptions"
topic_msg = None
def _subscription_mapper(self, _id, data, table):
"""
Performs data transformation on subscription request
:param data: data to be trasformed
:param table: table in which transformed data are inserted
"""
pass
def format_subscription(self, subs_data):
"""
Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
:param subs_data: Subscription data to be ordered.
:return: None
"""
if isinstance(subs_data, dict):
for key in subs_data.keys():
# Base case
if isinstance(subs_data[key], list):
subs_data[key].sort()
return
# Recursive case
self.format_subscription(subs_data[key])
return
def check_conflict_on_new(self, session, content):
"""
Two subscriptions are equal if Auth username, CallbackUri and filter are same.
:param session: Session object.
:param content: Subscription data.
:return: None if no conflict otherwise, raises an exception.
"""
# Get all subscriptions from db table subscriptions and compare.
self.format_subscription(content)
filter_dict = {"CallbackUri": content["CallbackUri"]}
if content.get("authentication"):
if content["authentication"].get("authType") == "basic":
filter_dict["authentication.authType"] = "basic"
# elif add other authTypes here
else:
filter_dict["authentication"] = None # For Items without authentication
existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict)
new_sub_pwd = None
if (
content.get("authentication")
and content["authentication"].get("authType") == "basic"
):
new_sub_pwd = content["authentication"]["paramsBasic"]["password"]
content["authentication"]["paramsBasic"].pop("password", None)
for existing_subscription in existing_subscriptions:
sub_id = existing_subscription.pop("_id", None)
existing_subscription.pop("_admin", None)
existing_subscription.pop("schema_version", None)
if (
existing_subscription.get("authentication")
and existing_subscription["authentication"].get("authType") == "basic"
):
existing_subscription["authentication"]["paramsBasic"].pop(
"password", None
)
# self.logger.debug(existing_subscription)
if existing_subscription == content:
raise EngineException(
"Subscription already exists with id: {}".format(sub_id),
HTTPStatus.CONFLICT,
)
if new_sub_pwd:
content["authentication"]["paramsBasic"]["password"] = new_sub_pwd
return
def format_on_new(self, content, project_id=None, make_public=False):
super().format_on_new(content, project_id=project_id, make_public=make_public)
# TODO check how to release Engine.write_lock during the check
def _check_endpoint(url, auth):
"""
Checks if the notification endpoint is valid
:param url: the notification end
:param auth: contains the authentication details with type basic
"""
try:
if auth is None:
response = requests.get(url, timeout=5)
if response.status_code != HTTPStatus.NO_CONTENT:
raise EngineException(
"Cannot access to the notification URL '{}',received {}: {}".format(
url, response.status_code, response.content
)
)
elif auth["authType"] == "basic":
username = auth["paramsBasic"].get("userName")
password = auth["paramsBasic"].get("password")
response = requests.get(url, auth=(username, password), timeout=5)
if response.status_code != HTTPStatus.NO_CONTENT:
raise EngineException(
"Cannot access to the notification URL '{}',received {}: {}".format(
url, response.status_code, response.content
)
)
except requests.exceptions.RequestException as e:
error_text = type(e).__name__ + ": " + str(e)
raise EngineException(
"Cannot access to the notification URL '{}': {}".format(
url, error_text
)
)
url = content["CallbackUri"]
auth = content.get("authentication")
_check_endpoint(url, auth)
content["schema_version"] = schema_version = "1.1"
if auth is not None and auth["authType"] == "basic":
if content["authentication"]["paramsBasic"].get("password"):
content["authentication"]["paramsBasic"]["password"] = self.db.encrypt(
content["authentication"]["paramsBasic"]["password"],
schema_version=schema_version,
salt=content["_id"],
)
return None
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
Uses BaseTopic.new to create entry into db
Once entry is made into subscriptions,mapper function is invoked
"""
_id, op_id = BaseTopic.new(
self, rollback, session, indata=indata, kwargs=kwargs, headers=headers
)
rollback.append(
{
"topic": "mapped_subscriptions",
"operation": "del_list",
"filter": {"reference": _id},
}
)
self._subscription_mapper(_id, indata, table="mapped_subscriptions")
return _id, op_id
def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
Deletes the mapped_subscription entry for this particular subscriber
:param _id: subscription_id deleted
"""
super().delete_extra(session, _id, db_content, not_send_msg)
filter_q = {"reference": _id}
self.db.del_list("mapped_subscriptions", filter_q)
class NslcmSubscriptionsTopic(CommonSubscriptions):
schema_new = subscription
def _subscription_mapper(self, _id, data, table):
"""
Performs data transformation on subscription request
:param data: data to be trasformed
:param table: table in which transformed data are inserted
"""
formatted_data = []
formed_data = {
"reference": data.get("_id"),
"CallbackUri": data.get("CallbackUri"),
}
if data.get("authentication"):
formed_data.update({"authentication": data.get("authentication")})
if data.get("filter"):
if data["filter"].get("nsInstanceSubscriptionFilter"):
key = list(data["filter"]["nsInstanceSubscriptionFilter"].keys())[0]
identifier = data["filter"]["nsInstanceSubscriptionFilter"][key]
formed_data.update({"identifier": identifier})
if data["filter"].get("notificationTypes"):
for elem in data["filter"].get("notificationTypes"):
update_dict = formed_data.copy()
update_dict["notificationType"] = elem
if elem == "NsIdentifierCreationNotification":
update_dict["operationTypes"] = "INSTANTIATE"
update_dict["operationStates"] = "ANY"
formatted_data.append(update_dict)
elif elem == "NsIdentifierDeletionNotification":
update_dict["operationTypes"] = "TERMINATE"
update_dict["operationStates"] = "ANY"
formatted_data.append(update_dict)
elif elem == "NsLcmOperationOccurrenceNotification":
if "operationTypes" in data["filter"].keys():
update_dict["operationTypes"] = data["filter"][
"operationTypes"
]
else:
update_dict["operationTypes"] = "ANY"
if "operationStates" in data["filter"].keys():
update_dict["operationStates"] = data["filter"][
"operationStates"
]
else:
update_dict["operationStates"] = "ANY"
formatted_data.append(update_dict)
elif elem == "NsChangeNotification":
if "nsComponentTypes" in data["filter"].keys():
update_dict["nsComponentTypes"] = data["filter"][
"nsComponentTypes"
]
else:
update_dict["nsComponentTypes"] = "ANY"
if "lcmOpNameImpactingNsComponent" in data["filter"].keys():
update_dict["lcmOpNameImpactingNsComponent"] = data[
"filter"
]["lcmOpNameImpactingNsComponent"]
else:
update_dict["lcmOpNameImpactingNsComponent"] = "ANY"
if (
"lcmOpOccStatusImpactingNsComponent"
in data["filter"].keys()
):
update_dict["lcmOpOccStatusImpactingNsComponent"] = data[
"filter"
]["lcmOpOccStatusImpactingNsComponent"]
else:
update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY"
formatted_data.append(update_dict)
self.db.create_list(table, formatted_data)
return None