1 # Copyright 2020 Preethika P(Tata Elxsi)
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 __author__
= "Preethika P,preethika.p@tataelxsi.co.in"
19 from osm_nbi
.base_topic
import BaseTopic
, EngineException
20 from osm_nbi
.validation
import subscription
21 from http
import HTTPStatus
24 class CommonSubscriptions(BaseTopic
):
25 topic
= "subscriptions"
28 def format_subscription(self
, subs_data
):
30 Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4.
31 :param subs_data: Subscription data to be ordered.
34 if isinstance(subs_data
, dict):
35 for key
in subs_data
.keys():
37 if isinstance(subs_data
[key
], list):
41 self
.format_subscription(subs_data
[key
])
44 def check_conflict_on_new(self
, session
, content
):
46 Two subscriptions are equal if Auth username, CallbackUri and filter are same.
47 :param session: Session object.
48 :param content: Subscription data.
49 :return: None if no conflict otherwise, raises an exception.
51 # Get all subscriptions from db table subscriptions and compare.
52 self
.format_subscription(content
)
53 filter_dict
= {"CallbackUri": content
["CallbackUri"]}
54 if content
.get("authentication"):
55 if content
["authentication"].get("authType") == "basic":
56 filter_dict
["authentication.authType"] = "basic"
57 # elif add other authTypes here
59 filter_dict
["authentication"] = None # For Items without authentication
60 existing_subscriptions
= self
.db
.get_list("subscriptions", q_filter
=filter_dict
)
62 if content
.get("authentication") and content
["authentication"].get("authType") == "basic":
63 new_sub_pwd
= content
["authentication"]["paramsBasic"]["password"]
64 content
["authentication"]["paramsBasic"].pop("password", None)
65 for existing_subscription
in existing_subscriptions
:
66 sub_id
= existing_subscription
.pop("_id", None)
67 existing_subscription
.pop("_admin", None)
68 existing_subscription
.pop("schema_version", None)
69 if existing_subscription
.get("authentication") and \
70 existing_subscription
["authentication"].get("authType") == "basic":
71 existing_subscription
["authentication"]["paramsBasic"].pop("password", None)
72 # self.logger.debug(existing_subscription)
73 if existing_subscription
== content
:
74 raise EngineException("Subscription already exists with id: {}".format(sub_id
),
77 content
["authentication"]["paramsBasic"]["password"] = new_sub_pwd
80 def format_on_new(self
, content
, project_id
=None, make_public
=False):
81 super().format_on_new(content
, project_id
=project_id
, make_public
=make_public
)
83 # TODO check how to release Engine.write_lock during the check
84 def _check_endpoint(url
, auth
):
86 Checks if the notification endpoint is valid
87 :param url: the notification end
88 :param auth: contains the authentication details with type basic
92 response
= requests
.get(url
, timeout
=5)
93 if response
.status_code
!= HTTPStatus
.NO_CONTENT
:
94 raise EngineException("Cannot access to the notification URL '{}',received {}: {}"
95 .format(url
, response
.status_code
, response
.content
))
96 elif auth
["authType"] == "basic":
97 username
= auth
["paramsBasic"].get("userName")
98 password
= auth
["paramsBasic"].get("password")
99 response
= requests
.get(url
, auth
=(username
, password
), timeout
=5)
100 if response
.status_code
!= HTTPStatus
.NO_CONTENT
:
101 raise EngineException("Cannot access to the notification URL '{}',received {}: {}"
102 .format(url
, response
.status_code
, response
.content
))
103 except requests
.exceptions
.RequestException
as e
:
104 error_text
= type(e
).__name
__ + ": " + str(e
)
105 raise EngineException("Cannot access to the notification URL '{}': {}".format(url
, error_text
))
107 url
= content
["CallbackUri"]
108 auth
= content
.get("authentication")
109 _check_endpoint(url
, auth
)
110 content
["schema_version"] = schema_version
= "1.1"
111 if auth
is not None and auth
["authType"] == "basic":
112 if content
["authentication"]["paramsBasic"].get("password"):
113 content
["authentication"]["paramsBasic"]["password"] = \
114 self
.db
.encrypt(content
["authentication"]["paramsBasic"]["password"],
115 schema_version
=schema_version
, salt
=content
["_id"])
118 def new(self
, rollback
, session
, indata
=None, kwargs
=None, headers
=None):
120 Uses BaseTopic.new to create entry into db
121 Once entry is made into subscriptions,mapper function is invoked
123 _id
, op_id
= BaseTopic
.new(self
, rollback
, session
, indata
=indata
, kwargs
=kwargs
, headers
=headers
)
124 rollback
.append({"topic": "mapped_subscriptions", "operation": "del_list", "filter": {"reference": _id
}})
125 self
._subscription
_mapper
(_id
, indata
, table
="mapped_subscriptions")
128 def delete_extra(self
, session
, _id
, db_content
, not_send_msg
=None):
130 Deletes the mapped_subscription entry for this particular subscriber
131 :param _id: subscription_id deleted
133 super().delete_extra(session
, _id
, db_content
, not_send_msg
)
134 filter_q
= {"reference": _id
}
135 self
.db
.del_list("mapped_subscriptions", filter_q
)
138 class NslcmSubscriptionsTopic(CommonSubscriptions
):
139 schema_new
= subscription
141 def _subscription_mapper(self
, _id
, data
, table
):
143 Performs data transformation on subscription request
144 :param data: data to be trasformed
145 :param table: table in which transformed data are inserted
148 formed_data
= {"reference": data
.get("_id"),
149 "CallbackUri": data
.get("CallbackUri")}
150 if data
.get("authentication"):
151 formed_data
.update({"authentication": data
.get("authentication")})
152 if data
.get("filter"):
153 if data
["filter"].get("nsInstanceSubscriptionFilter"):
154 key
= list(data
["filter"]["nsInstanceSubscriptionFilter"].keys())[0]
155 identifier
= data
["filter"]["nsInstanceSubscriptionFilter"][key
]
156 formed_data
.update({"identifier": identifier
})
157 if data
["filter"].get("notificationTypes"):
158 for elem
in data
["filter"].get("notificationTypes"):
159 update_dict
= formed_data
.copy()
160 update_dict
["notificationType"] = elem
161 if elem
== "NsIdentifierCreationNotification":
162 update_dict
["operationTypes"] = "INSTANTIATE"
163 update_dict
["operationStates"] = "ANY"
164 formatted_data
.append(update_dict
)
165 elif elem
== "NsIdentifierDeletionNotification":
166 update_dict
["operationTypes"] = "TERMINATE"
167 update_dict
["operationStates"] = "ANY"
168 formatted_data
.append(update_dict
)
169 elif elem
== "NsLcmOperationOccurrenceNotification":
170 if "operationTypes" in data
["filter"].keys():
171 update_dict
["operationTypes"] = data
["filter"]["operationTypes"]
173 update_dict
["operationTypes"] = "ANY"
174 if "operationStates" in data
["filter"].keys():
175 update_dict
["operationStates"] = data
["filter"]["operationStates"]
177 update_dict
["operationStates"] = "ANY"
178 formatted_data
.append(update_dict
)
179 elif elem
== "NsChangeNotification":
180 if "nsComponentTypes" in data
["filter"].keys():
181 update_dict
["nsComponentTypes"] = data
["filter"]["nsComponentTypes"]
183 update_dict
["nsComponentTypes"] = "ANY"
184 if "lcmOpNameImpactingNsComponent" in data
["filter"].keys():
185 update_dict
["lcmOpNameImpactingNsComponent"] = \
186 data
["filter"]["lcmOpNameImpactingNsComponent"]
188 update_dict
["lcmOpNameImpactingNsComponent"] = "ANY"
189 if "lcmOpOccStatusImpactingNsComponent" in data
["filter"].keys():
190 update_dict
["lcmOpOccStatusImpactingNsComponent"] = \
191 data
["filter"]["lcmOpOccStatusImpactingNsComponent"]
193 update_dict
["lcmOpOccStatusImpactingNsComponent"] = "ANY"
194 formatted_data
.append(update_dict
)
195 self
.db
.create_list(table
, formatted_data
)