Coverage for osm_nbi/subscription_topics.py: 14%

120 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-27 02:46 +0000

1# Copyright 2020 Preethika P(Tata Elxsi) 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16__author__ = "Preethika P,preethika.p@tataelxsi.co.in" 

17 

18import requests 

19from osm_nbi.base_topic import BaseTopic, EngineException 

20from osm_nbi.validation import subscription 

21from http import HTTPStatus 

22 

23 

24class CommonSubscriptions(BaseTopic): 

25 topic = "subscriptions" 

26 topic_msg = None 

27 

28 def _subscription_mapper(self, _id, data, table): 

29 """ 

30 Performs data transformation on subscription request 

31 :param data: data to be trasformed 

32 :param table: table in which transformed data are inserted 

33 """ 

34 pass 

35 

36 def format_subscription(self, subs_data): 

37 """ 

38 Brings lexicographical order for list items at any nested level. For subscriptions max level of nesting is 4. 

39 :param subs_data: Subscription data to be ordered. 

40 :return: None 

41 """ 

42 if isinstance(subs_data, dict): 

43 for key in subs_data.keys(): 

44 # Base case 

45 if isinstance(subs_data[key], list): 

46 subs_data[key].sort() 

47 return 

48 # Recursive case 

49 self.format_subscription(subs_data[key]) 

50 return 

51 

52 def check_conflict_on_new(self, session, content): 

53 """ 

54 Two subscriptions are equal if Auth username, CallbackUri and filter are same. 

55 :param session: Session object. 

56 :param content: Subscription data. 

57 :return: None if no conflict otherwise, raises an exception. 

58 """ 

59 # Get all subscriptions from db table subscriptions and compare. 

60 self.format_subscription(content) 

61 filter_dict = {"CallbackUri": content["CallbackUri"]} 

62 if content.get("authentication"): 

63 if content["authentication"].get("authType") == "basic": 

64 filter_dict["authentication.authType"] = "basic" 

65 # elif add other authTypes here 

66 else: 

67 filter_dict["authentication"] = None # For Items without authentication 

68 existing_subscriptions = self.db.get_list("subscriptions", q_filter=filter_dict) 

69 new_sub_pwd = None 

70 if ( 

71 content.get("authentication") 

72 and content["authentication"].get("authType") == "basic" 

73 ): 

74 new_sub_pwd = content["authentication"]["paramsBasic"]["password"] 

75 content["authentication"]["paramsBasic"].pop("password", None) 

76 for existing_subscription in existing_subscriptions: 

77 sub_id = existing_subscription.pop("_id", None) 

78 existing_subscription.pop("_admin", None) 

79 existing_subscription.pop("schema_version", None) 

80 if ( 

81 existing_subscription.get("authentication") 

82 and existing_subscription["authentication"].get("authType") == "basic" 

83 ): 

84 existing_subscription["authentication"]["paramsBasic"].pop( 

85 "password", None 

86 ) 

87 # self.logger.debug(existing_subscription) 

88 if existing_subscription == content: 

89 raise EngineException( 

90 "Subscription already exists with id: {}".format(sub_id), 

91 HTTPStatus.CONFLICT, 

92 ) 

93 if new_sub_pwd: 

94 content["authentication"]["paramsBasic"]["password"] = new_sub_pwd 

95 return 

96 

97 def format_on_new(self, content, project_id=None, make_public=False): 

98 super().format_on_new(content, project_id=project_id, make_public=make_public) 

99 

100 # TODO check how to release Engine.write_lock during the check 

101 def _check_endpoint(url, auth): 

102 """ 

103 Checks if the notification endpoint is valid 

104 :param url: the notification end 

105 :param auth: contains the authentication details with type basic 

106 """ 

107 try: 

108 if auth is None: 

109 response = requests.get(url, timeout=5) 

110 if response.status_code != HTTPStatus.NO_CONTENT: 

111 raise EngineException( 

112 "Cannot access to the notification URL '{}',received {}: {}".format( 

113 url, response.status_code, response.content 

114 ) 

115 ) 

116 elif auth["authType"] == "basic": 

117 username = auth["paramsBasic"].get("userName") 

118 password = auth["paramsBasic"].get("password") 

119 response = requests.get(url, auth=(username, password), timeout=5) 

120 if response.status_code != HTTPStatus.NO_CONTENT: 

121 raise EngineException( 

122 "Cannot access to the notification URL '{}',received {}: {}".format( 

123 url, response.status_code, response.content 

124 ) 

125 ) 

126 except requests.exceptions.RequestException as e: 

127 error_text = type(e).__name__ + ": " + str(e) 

128 raise EngineException( 

129 "Cannot access to the notification URL '{}': {}".format( 

130 url, error_text 

131 ) 

132 ) 

133 

134 url = content["CallbackUri"] 

135 auth = content.get("authentication") 

136 _check_endpoint(url, auth) 

137 content["schema_version"] = schema_version = "1.1" 

138 if auth is not None and auth["authType"] == "basic": 

139 if content["authentication"]["paramsBasic"].get("password"): 

140 content["authentication"]["paramsBasic"]["password"] = self.db.encrypt( 

141 content["authentication"]["paramsBasic"]["password"], 

142 schema_version=schema_version, 

143 salt=content["_id"], 

144 ) 

145 return None 

146 

147 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

148 """ 

149 Uses BaseTopic.new to create entry into db 

150 Once entry is made into subscriptions,mapper function is invoked 

151 """ 

152 _id, op_id = BaseTopic.new( 

153 self, rollback, session, indata=indata, kwargs=kwargs, headers=headers 

154 ) 

155 rollback.append( 

156 { 

157 "topic": "mapped_subscriptions", 

158 "operation": "del_list", 

159 "filter": {"reference": _id}, 

160 } 

161 ) 

162 self._subscription_mapper(_id, indata, table="mapped_subscriptions") 

163 return _id, op_id 

164 

165 def delete_extra(self, session, _id, db_content, not_send_msg=None): 

166 """ 

167 Deletes the mapped_subscription entry for this particular subscriber 

168 :param _id: subscription_id deleted 

169 """ 

170 super().delete_extra(session, _id, db_content, not_send_msg) 

171 filter_q = {"reference": _id} 

172 self.db.del_list("mapped_subscriptions", filter_q) 

173 

174 

175class NslcmSubscriptionsTopic(CommonSubscriptions): 

176 schema_new = subscription 

177 

178 def _subscription_mapper(self, _id, data, table): 

179 """ 

180 Performs data transformation on subscription request 

181 :param data: data to be trasformed 

182 :param table: table in which transformed data are inserted 

183 """ 

184 formatted_data = [] 

185 formed_data = { 

186 "reference": data.get("_id"), 

187 "CallbackUri": data.get("CallbackUri"), 

188 } 

189 if data.get("authentication"): 

190 formed_data.update({"authentication": data.get("authentication")}) 

191 if data.get("filter"): 

192 if data["filter"].get("nsInstanceSubscriptionFilter"): 

193 key = list(data["filter"]["nsInstanceSubscriptionFilter"].keys())[0] 

194 identifier = data["filter"]["nsInstanceSubscriptionFilter"][key] 

195 formed_data.update({"identifier": identifier}) 

196 if data["filter"].get("notificationTypes"): 

197 for elem in data["filter"].get("notificationTypes"): 

198 update_dict = formed_data.copy() 

199 update_dict["notificationType"] = elem 

200 if elem == "NsIdentifierCreationNotification": 

201 update_dict["operationTypes"] = "INSTANTIATE" 

202 update_dict["operationStates"] = "ANY" 

203 formatted_data.append(update_dict) 

204 elif elem == "NsIdentifierDeletionNotification": 

205 update_dict["operationTypes"] = "TERMINATE" 

206 update_dict["operationStates"] = "ANY" 

207 formatted_data.append(update_dict) 

208 elif elem == "NsLcmOperationOccurrenceNotification": 

209 if "operationTypes" in data["filter"].keys(): 

210 update_dict["operationTypes"] = data["filter"][ 

211 "operationTypes" 

212 ] 

213 else: 

214 update_dict["operationTypes"] = "ANY" 

215 if "operationStates" in data["filter"].keys(): 

216 update_dict["operationStates"] = data["filter"][ 

217 "operationStates" 

218 ] 

219 else: 

220 update_dict["operationStates"] = "ANY" 

221 formatted_data.append(update_dict) 

222 elif elem == "NsChangeNotification": 

223 if "nsComponentTypes" in data["filter"].keys(): 

224 update_dict["nsComponentTypes"] = data["filter"][ 

225 "nsComponentTypes" 

226 ] 

227 else: 

228 update_dict["nsComponentTypes"] = "ANY" 

229 if "lcmOpNameImpactingNsComponent" in data["filter"].keys(): 

230 update_dict["lcmOpNameImpactingNsComponent"] = data[ 

231 "filter" 

232 ]["lcmOpNameImpactingNsComponent"] 

233 else: 

234 update_dict["lcmOpNameImpactingNsComponent"] = "ANY" 

235 if ( 

236 "lcmOpOccStatusImpactingNsComponent" 

237 in data["filter"].keys() 

238 ): 

239 update_dict["lcmOpOccStatusImpactingNsComponent"] = data[ 

240 "filter" 

241 ]["lcmOpOccStatusImpactingNsComponent"] 

242 else: 

243 update_dict["lcmOpOccStatusImpactingNsComponent"] = "ANY" 

244 formatted_data.append(update_dict) 

245 self.db.create_list(table, formatted_data) 

246 return None