Coverage for osm_nbi/acm_topic.py: 17%

186 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-10 20:04 +0000

1# -*- coding: utf-8 -*- 

2 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16from pyrage import x25519 

17from uuid import uuid4 

18 

19from http import HTTPStatus 

20from time import time 

21 

22# from osm_common.dbbase import deep_update_rfc7396, DbException 

23from osm_common.msgbase import MsgException 

24from osm_common.dbbase import DbException 

25from osm_common.fsbase import FsException 

26from osm_nbi.base_topic import BaseTopic, EngineException 

27from osm_nbi.validation import ValidationError 

28 

29# import logging 

30# import random 

31# import string 

32# from yaml import safe_load, YAMLError 

33 

34 

35class ACMOperationTopic: 

36 def __init__(self, db, fs, msg, auth): 

37 self.multiproject = None # Declare the attribute here 

38 

39 @staticmethod 

40 def format_on_operation(content, operation_type, operation_params=None): 

41 op_id = str(uuid4()) 

42 now = time() 

43 if "operationHistory" not in content: 

44 content["operationHistory"] = [] 

45 

46 operation = {} 

47 operation["operationType"] = operation_type 

48 operation["op_id"] = op_id 

49 operation["result"] = None 

50 operation["creationDate"] = now 

51 operation["endDate"] = None 

52 operation["workflowState"] = operation["resourceState"] = operation[ 

53 "operationState" 

54 ] = operation["gitOperationInfo"] = None 

55 operation["operationParams"] = operation_params 

56 

57 content["operationHistory"].append(operation) 

58 return op_id 

59 

60 

61class ACMTopic(BaseTopic, ACMOperationTopic): 

62 def __init__(self, db, fs, msg, auth): 

63 super().__init__(db, fs, msg, auth) 

64 # ACMOperationTopic.__init__(db, fs, msg, auth) 

65 

66 def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None): 

67 step = "name unique check" 

68 try: 

69 self.check_unique_name(session, indata["name"]) 

70 

71 step = "validating input parameters" 

72 profile_request = self._remove_envelop(indata) 

73 self._update_input_with_kwargs(profile_request, kwargs) 

74 profile_request = self._validate_input_new( 

75 profile_request, session["force"] 

76 ) 

77 operation_params = profile_request 

78 

79 step = "filling profile details from input data" 

80 profile_create = self._create_profile(profile_request, session) 

81 

82 step = "creating profile at database" 

83 self.format_on_new( 

84 profile_create, session["project_id"], make_public=session["public"] 

85 ) 

86 profile_create["current_operation"] = None 

87 op_id = ACMOperationTopic.format_on_operation( 

88 profile_create, 

89 "create", 

90 operation_params, 

91 ) 

92 

93 _id = self.db.create(self.topic, profile_create) 

94 pubkey, privkey = self._generate_age_key() 

95 profile_create["age_pubkey"] = self.db.encrypt( 

96 pubkey, schema_version="1.11", salt=_id 

97 ) 

98 profile_create["age_privkey"] = self.db.encrypt( 

99 privkey, schema_version="1.11", salt=_id 

100 ) 

101 rollback.append({"topic": self.topic, "_id": _id}) 

102 self.db.set_one(self.topic, {"_id": _id}, profile_create) 

103 if op_id: 

104 profile_create["op_id"] = op_id 

105 self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id}) 

106 

107 return _id, None 

108 except ( 

109 ValidationError, 

110 EngineException, 

111 DbException, 

112 MsgException, 

113 FsException, 

114 ) as e: 

115 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) 

116 

117 def _create_profile(self, profile_request, session): 

118 profile_desc = { 

119 "name": profile_request["name"], 

120 "description": profile_request["description"], 

121 "default": False, 

122 "git_name": self.create_gitname(profile_request, session), 

123 "state": "IN_CREATION", 

124 "operatingState": "IN_PROGRESS", 

125 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", 

126 } 

127 return profile_desc 

128 

129 def default_profile( 

130 self, rollback, session, indata=None, kwargs=None, headers=None 

131 ): 

132 step = "validating input parameters" 

133 try: 

134 profile_request = self._remove_envelop(indata) 

135 self._update_input_with_kwargs(profile_request, kwargs) 

136 operation_params = profile_request 

137 

138 step = "filling profile details from input data" 

139 profile_create = self._create_default_profile(profile_request, session) 

140 

141 step = "creating profile at database" 

142 self.format_on_new( 

143 profile_create, session["project_id"], make_public=session["public"] 

144 ) 

145 profile_create["current_operation"] = None 

146 ACMOperationTopic.format_on_operation( 

147 profile_create, 

148 "create", 

149 operation_params, 

150 ) 

151 _id = self.db.create(self.topic, profile_create) 

152 rollback.append({"topic": self.topic, "_id": _id}) 

153 return _id 

154 except ( 

155 ValidationError, 

156 EngineException, 

157 DbException, 

158 MsgException, 

159 FsException, 

160 ) as e: 

161 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) 

162 

163 def _create_default_profile(self, profile_request, session): 

164 profile_desc = { 

165 "name": profile_request["name"], 

166 "description": f"{self.topic} profile for cluster {profile_request['name']}", 

167 "default": True, 

168 "git_name": self.create_gitname(profile_request, session), 

169 "state": "IN_CREATION", 

170 "operatingState": "IN_PROGRESS", 

171 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", 

172 } 

173 return profile_desc 

174 

175 def detach(self, session, _id, profile_type): 

176 # To detach the profiles from every cluster 

177 filter_q = {} 

178 existing_clusters = self.db.get_list("clusters", filter_q) 

179 existing_clusters_profiles = [ 

180 profile["_id"] 

181 for profile in existing_clusters 

182 if profile.get("profile_type", _id) 

183 ] 

184 update_dict = None 

185 for profile in existing_clusters_profiles: 

186 filter_q = {"_id": profile} 

187 data = self.db.get_one("clusters", filter_q) 

188 if profile_type in data: 

189 profile_ids = data[profile_type] 

190 if _id in profile_ids: 

191 profile_ids.remove(_id) 

192 update_dict = {profile_type: profile_ids} 

193 self.db.set_one("clusters", filter_q, update_dict) 

194 

195 def _generate_age_key(self): 

196 ident = x25519.Identity.generate() 

197 # gets the public key 

198 pubkey = str(ident.to_public()) 

199 # gets the private key 

200 privkey = str(ident) 

201 # return both public and private key 

202 return pubkey, privkey 

203 

204 def common_delete(self, _id, db_content): 

205 if "state" in db_content: 

206 db_content["state"] = "IN_DELETION" 

207 db_content["operatingState"] = "PROCESSING" 

208 # self.db.set_one(self.topic, {"_id": _id}, db_content) 

209 

210 db_content["current_operation"] = None 

211 op_id = ACMOperationTopic.format_on_operation( 

212 db_content, 

213 "delete", 

214 None, 

215 ) 

216 self.db.set_one(self.topic, {"_id": _id}, db_content) 

217 return op_id 

218 

219 def add_to_old_collection(self, content, session): 

220 item = {} 

221 item["name"] = content["name"] 

222 item["credentials"] = {} 

223 # item["k8s_version"] = content["k8s_version"] 

224 if "k8s_version" in content: 

225 item["k8s_version"] = content["k8s_version"] 

226 else: 

227 item["k8s_version"] = None 

228 vim_account_details = self.db.get_one( 

229 "vim_accounts", {"name": content["vim_account"]} 

230 ) 

231 item["vim_account"] = vim_account_details["_id"] 

232 item["nets"] = {"k8s_net1": None} 

233 item["deployment_methods"] = {"juju-bundle": False, "helm-chart-v3": True} 

234 # item["description"] = content["description"] 

235 if "description" in content: 

236 item["description"] = content["description"] 

237 else: 

238 item["description"] = None 

239 item["namespace"] = "kube-system" 

240 item["osm_acm"] = True 

241 item["schema_version"] = "1.11" 

242 self.format_on_new(item, session["project_id"], make_public=session["public"]) 

243 _id = self.db.create("k8sclusters", item) 

244 self.logger.info(f"_id is : {_id}") 

245 item_1 = self.db.get_one("k8sclusters", {"name": item["name"]}) 

246 item_1["_admin"]["operationalState"] = "PROCESSING" 

247 

248 # Create operation data 

249 now = time() 

250 operation_data = { 

251 "lcmOperationType": "create", # Assuming 'create' operation here 

252 "operationState": "PROCESSING", 

253 "startTime": now, 

254 "statusEnteredTime": now, 

255 "detailed-status": "", 

256 "operationParams": None, # Add parameters as needed 

257 } 

258 # create operation 

259 item_1["_admin"]["operations"] = [operation_data] 

260 item_1["_admin"]["current_operation"] = None 

261 self.logger.info(f"content is : {item_1}") 

262 self.db.set_one("k8sclusters", {"_id": item_1["_id"]}, item_1) 

263 return 

264 

265 def cluster_unique_name_check(self, session, name): 

266 # First check using the method you have for unique name validation 

267 self.check_unique_name(session, name) 

268 _filter = {"name": name} 

269 topics = [ 

270 "k8sclusters", 

271 "k8sapp", 

272 "k8sinfra_config", 

273 "k8sinfra_controller", 

274 "k8sresource", 

275 ] 

276 

277 # Loop through each topic to check if the name already exists in any of them 

278 for item in topics: 

279 if self.db.get_one(item, _filter, fail_on_empty=False, fail_on_more=False): 

280 raise EngineException( 

281 f"name '{name}' already exists in topic '{item}'", 

282 HTTPStatus.CONFLICT, 

283 ) 

284 

285 def list_both(self, session, filter_q=None, api_req=False): 

286 """List all clusters from both new and old APIs""" 

287 if not filter_q: 

288 filter_q = {} 

289 if self.multiproject: 

290 filter_q.update(self._get_project_filter(session)) 

291 cluster_list1 = self.db.get_list(self.topic, filter_q) 

292 cluster_list2 = self.db.get_list("k8sclusters", filter_q) 

293 list1_names = {item["name"] for item in cluster_list1} 

294 for item in cluster_list2: 

295 if item["name"] not in list1_names: 

296 # Complete the information for clusters from old API 

297 item["state"] = "N/A" 

298 old_state = item.get("_admin", {}).get("operationalState", "Unknown") 

299 item["bootstrap"] = "NO" 

300 item["operatingState"] = "N/A" 

301 item["resourceState"] = old_state 

302 item["created"] = "NO" 

303 cluster_list1.append(item) 

304 if api_req: 

305 cluster_list1 = [self.sol005_projection(inst) for inst in cluster_list1] 

306 return cluster_list1 

307 

308 

309class ProfileTopic(ACMTopic): 

310 profile_topic_map = { 

311 "k8sapp": "app_profiles", 

312 "k8sresource": "resource_profiles", 

313 "k8sinfra_controller": "infra_controller_profiles", 

314 "k8sinfra_config": "infra_config_profiles", 

315 } 

316 

317 def __init__(self, db, fs, msg, auth): 

318 super().__init__(db, fs, msg, auth) 

319 

320 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None): 

321 check = self.db.get_one(self.topic, {"_id": _id}) 

322 if check["default"] is True: 

323 raise EngineException( 

324 "Cannot edit default profiles", 

325 HTTPStatus.UNPROCESSABLE_ENTITY, 

326 ) 

327 if "name" in indata and check["name"] != indata["name"]: 

328 self.check_unique_name(session, indata["name"]) 

329 return True 

330 

331 def delete_extra_before(self, session, _id, db_content, not_send_msg=None): 

332 op_id = self.common_delete(_id, db_content) 

333 return {"profile_id": _id, "operation_id": op_id, "force": session["force"]} 

334 

335 def delete_profile(self, session, _id, dry_run=False, not_send_msg=None): 

336 item_content = self.db.get_one(self.topic, {"_id": _id}) 

337 if item_content.get("default", False): 

338 raise EngineException( 

339 "Cannot delete item because it is marked as default", 

340 http_code=HTTPStatus.UNPROCESSABLE_ENTITY, 

341 ) 

342 # Before deleting, detach the profile from the associated clusters. 

343 profile_type = self.profile_topic_map[self.topic] 

344 self.detach(session, _id, profile_type) 

345 # To delete the infra controller profile 

346 super().delete(session, _id, not_send_msg=not_send_msg) 

347 return _id