Coverage for osm_nbi/k8s_topics.py: 16%

745 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-12 20:04 +0000

1# -*- coding: utf-8 -*- 

2 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16import logging 

17import yaml 

18import shutil 

19import os 

20from http import HTTPStatus 

21 

22from time import time 

23from osm_nbi.base_topic import BaseTopic, EngineException 

24from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic 

25 

26from osm_nbi.descriptor_topics import DescriptorTopic 

27from osm_nbi.validation import ( 

28 ValidationError, 

29 validate_input, 

30 clustercreation_new_schema, 

31 cluster_edit_schema, 

32 cluster_update_schema, 

33 infra_controller_profile_create_new_schema, 

34 infra_config_profile_create_new_schema, 

35 app_profile_create_new_schema, 

36 resource_profile_create_new_schema, 

37 infra_controller_profile_create_edit_schema, 

38 infra_config_profile_create_edit_schema, 

39 app_profile_create_edit_schema, 

40 resource_profile_create_edit_schema, 

41 clusterregistration_new_schema, 

42 attach_dettach_profile_schema, 

43 ksu_schema, 

44 oka_schema, 

45) 

46from osm_common.dbbase import deep_update_rfc7396, DbException 

47from osm_common.msgbase import MsgException 

48from osm_common.fsbase import FsException 

49 

50__author__ = ( 

51 "Shrinithi R <shrinithi.r@tataelxsi.co.in>", 

52 "Shahithya Y <shahithya.y@tataelxsi.co.in>", 

53) 

54 

55 

56class InfraContTopic(ProfileTopic): 

57 topic = "k8sinfra_controller" 

58 topic_msg = "k8s_infra_controller" 

59 schema_new = infra_controller_profile_create_new_schema 

60 schema_edit = infra_controller_profile_create_edit_schema 

61 

62 def __init__(self, db, fs, msg, auth): 

63 BaseTopic.__init__(self, db, fs, msg, auth) 

64 

65 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

66 # To create the new infra controller profile 

67 return self.new_profile(rollback, session, indata, kwargs, headers) 

68 

69 def default(self, rollback, session, indata=None, kwargs=None, headers=None): 

70 # To create the default infra controller profile while creating the cluster 

71 return self.default_profile(rollback, session, indata, kwargs, headers) 

72 

73 def delete(self, session, _id, dry_run=False, not_send_msg=None): 

74 self.delete_profile(session, _id, dry_run=False, not_send_msg=None) 

75 return _id 

76 

77 

78class InfraConfTopic(ProfileTopic): 

79 topic = "k8sinfra_config" 

80 topic_msg = "k8s_infra_config" 

81 schema_new = infra_config_profile_create_new_schema 

82 schema_edit = infra_config_profile_create_edit_schema 

83 

84 def __init__(self, db, fs, msg, auth): 

85 BaseTopic.__init__(self, db, fs, msg, auth) 

86 

87 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

88 # To create the new infra config profile 

89 return self.new_profile(rollback, session, indata, kwargs, headers) 

90 

91 def default(self, rollback, session, indata=None, kwargs=None, headers=None): 

92 # To create the default infra config profile while creating the cluster 

93 return self.default_profile(rollback, session, indata, kwargs, headers) 

94 

95 def delete(self, session, _id, dry_run=False, not_send_msg=None): 

96 self.delete_profile(session, _id, dry_run=False, not_send_msg=None) 

97 return _id 

98 

99 

100class AppTopic(ProfileTopic): 

101 topic = "k8sapp" 

102 topic_msg = "k8s_app" 

103 schema_new = app_profile_create_new_schema 

104 schema_edit = app_profile_create_edit_schema 

105 

106 def __init__(self, db, fs, msg, auth): 

107 BaseTopic.__init__(self, db, fs, msg, auth) 

108 

109 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

110 # To create the new app profile 

111 return self.new_profile(rollback, session, indata, kwargs, headers) 

112 

113 def default(self, rollback, session, indata=None, kwargs=None, headers=None): 

114 # To create the default app profile while creating the cluster 

115 return self.default_profile(rollback, session, indata, kwargs, headers) 

116 

117 def delete(self, session, _id, dry_run=False, not_send_msg=None): 

118 self.delete_profile(session, _id, dry_run=False, not_send_msg=None) 

119 return _id 

120 

121 

122class ResourceTopic(ProfileTopic): 

123 topic = "k8sresource" 

124 topic_msg = "k8s_resource" 

125 schema_new = resource_profile_create_new_schema 

126 schema_edit = resource_profile_create_edit_schema 

127 

128 def __init__(self, db, fs, msg, auth): 

129 BaseTopic.__init__(self, db, fs, msg, auth) 

130 

131 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

132 # To create the new resource profile 

133 return self.new_profile(rollback, session, indata, kwargs, headers) 

134 

135 def default(self, rollback, session, indata=None, kwargs=None, headers=None): 

136 # To create the default resource profile while creating the cluster 

137 return self.default_profile(rollback, session, indata, kwargs, headers) 

138 

139 def delete(self, session, _id, dry_run=False, not_send_msg=None): 

140 self.delete_profile(session, _id, dry_run=False, not_send_msg=None) 

141 return _id 

142 

143 

144class ClusterTopic(ACMTopic): 

145 topic = "clusters" 

146 topic_msg = "cluster" 

147 schema_new = clustercreation_new_schema 

148 schema_edit = attach_dettach_profile_schema 

149 

150 def __init__(self, db, fs, msg, auth): 

151 super().__init__(db, fs, msg, auth) 

152 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth) 

153 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth) 

154 self.resource_topic = ResourceTopic(db, fs, msg, auth) 

155 self.app_topic = AppTopic(db, fs, msg, auth) 

156 

157 @staticmethod 

158 def format_on_new(content, project_id=None, make_public=False): 

159 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public) 

160 content["current_operation"] = None 

161 

162 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

163 """ 

164 Creates a new k8scluster into database. 

165 :param rollback: list to append the created items at database in case a rollback must be done 

166 :param session: contains "username", "admin", "force", "public", "project_id", "set_project" 

167 :param indata: params to be used for the k8cluster 

168 :param kwargs: used to override the indata 

169 :param headers: http request headers 

170 :return: the _id of k8scluster created at database. Or an exception of type 

171 EngineException, ValidationError, DbException, FsException, MsgException. 

172 Note: Exceptions are not captured on purpose. They should be captured at called 

173 """ 

174 step = "checking quotas" # first step must be defined outside try 

175 try: 

176 self.check_quota(session) 

177 step = "name unique check" 

178 # self.check_unique_name(session, indata["name"]) 

179 self.cluster_unique_name_check(session, indata["name"]) 

180 step = "validating input parameters" 

181 cls_request = self._remove_envelop(indata) 

182 self._update_input_with_kwargs(cls_request, kwargs) 

183 cls_request = self._validate_input_new(cls_request, session["force"]) 

184 operation_params = cls_request 

185 

186 step = "filling cluster details from input data" 

187 cls_create = self._create_cluster( 

188 cls_request, rollback, session, indata, kwargs, headers 

189 ) 

190 

191 step = "creating cluster at database" 

192 self.format_on_new( 

193 cls_create, session["project_id"], make_public=session["public"] 

194 ) 

195 op_id = self.format_on_operation( 

196 cls_create, 

197 "create", 

198 operation_params, 

199 ) 

200 _id = self.db.create(self.topic, cls_create) 

201 pubkey, privkey = self._generate_age_key() 

202 cls_create["age_pubkey"] = self.db.encrypt( 

203 pubkey, schema_version="1.11", salt=_id 

204 ) 

205 cls_create["age_privkey"] = self.db.encrypt( 

206 privkey, schema_version="1.11", salt=_id 

207 ) 

208 # TODO: set age_pubkey and age_privkey in the default profiles 

209 rollback.append({"topic": self.topic, "_id": _id}) 

210 self.db.set_one("clusters", {"_id": _id}, cls_create) 

211 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id}) 

212 

213 # To add the content in old collection "k8sclusters" 

214 self.add_to_old_collection(cls_create, session) 

215 

216 return _id, None 

217 except ( 

218 ValidationError, 

219 EngineException, 

220 DbException, 

221 MsgException, 

222 FsException, 

223 ) as e: 

224 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) 

225 

226 def _create_cluster(self, cls_request, rollback, session, indata, kwargs, headers): 

227 # Check whether the region name and resource group have been given 

228 region_given = "region_name" in indata 

229 resource_group_given = "resource_group" in indata 

230 

231 # Get the vim_account details 

232 vim_account_details = self.db.get_one( 

233 "vim_accounts", {"name": cls_request["vim_account"]} 

234 ) 

235 

236 # Check whether the region name and resource group have been given 

237 if not region_given and not resource_group_given: 

238 region_name = vim_account_details["config"]["region_name"] 

239 resource_group = vim_account_details["config"]["resource_group"] 

240 elif region_given and not resource_group_given: 

241 region_name = cls_request["region_name"] 

242 resource_group = vim_account_details["config"]["resource_group"] 

243 elif not region_given and resource_group_given: 

244 region_name = vim_account_details["config"]["region_name"] 

245 resource_group = cls_request["resource_group"] 

246 else: 

247 region_name = cls_request["region_name"] 

248 resource_group = cls_request["resource_group"] 

249 

250 cls_desc = { 

251 "name": cls_request["name"], 

252 "vim_account": self.check_vim(session, cls_request["vim_account"]), 

253 "k8s_version": cls_request["k8s_version"], 

254 "node_size": cls_request["node_size"], 

255 "node_count": cls_request["node_count"], 

256 "bootstrap": cls_request["bootstrap"], 

257 "region_name": region_name, 

258 "resource_group": resource_group, 

259 "infra_controller_profiles": [ 

260 self._create_default_profiles( 

261 rollback, session, indata, kwargs, headers, self.infra_contr_topic 

262 ) 

263 ], 

264 "infra_config_profiles": [ 

265 self._create_default_profiles( 

266 rollback, session, indata, kwargs, headers, self.infra_conf_topic 

267 ) 

268 ], 

269 "resource_profiles": [ 

270 self._create_default_profiles( 

271 rollback, session, indata, kwargs, headers, self.resource_topic 

272 ) 

273 ], 

274 "app_profiles": [ 

275 self._create_default_profiles( 

276 rollback, session, indata, kwargs, headers, self.app_topic 

277 ) 

278 ], 

279 "created": "true", 

280 "state": "IN_CREATION", 

281 "operatingState": "PROCESSING", 

282 "git_name": self.create_gitname(cls_request, session), 

283 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", 

284 } 

285 # Add optional fields if they exist in the request 

286 if "description" in cls_request: 

287 cls_desc["description"] = cls_request["description"] 

288 return cls_desc 

289 

290 def check_vim(self, session, name): 

291 try: 

292 vim_account_details = self.db.get_one("vim_accounts", {"name": name}) 

293 if vim_account_details is not None: 

294 return name 

295 except ValidationError as e: 

296 raise EngineException( 

297 e, 

298 HTTPStatus.UNPROCESSABLE_ENTITY, 

299 ) 

300 

301 def _create_default_profiles( 

302 self, rollback, session, indata, kwargs, headers, topic 

303 ): 

304 topic = self.to_select_topic(topic) 

305 default_profiles = topic.default(rollback, session, indata, kwargs, headers) 

306 return default_profiles 

307 

308 def to_select_topic(self, topic): 

309 if topic == "infra_controller_profiles": 

310 topic = self.infra_contr_topic 

311 elif topic == "infra_config_profiles": 

312 topic = self.infra_conf_topic 

313 elif topic == "resource_profiles": 

314 topic = self.resource_topic 

315 elif topic == "app_profiles": 

316 topic = self.app_topic 

317 return topic 

318 

319 def show_one(self, session, _id, profile, filter_q=None, api_req=False): 

320 try: 

321 filter_q = self._get_project_filter(session) 

322 filter_q[self.id_field(self.topic, _id)] = _id 

323 content = self.db.get_one(self.topic, filter_q) 

324 existing_profiles = [] 

325 topic = None 

326 topic = self.to_select_topic(profile) 

327 for profile_id in content[profile]: 

328 data = topic.show(session, profile_id, filter_q, api_req) 

329 existing_profiles.append(data) 

330 return existing_profiles 

331 except ValidationError as e: 

332 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) 

333 

334 def state_check(self, profile_id, session, topic): 

335 topic = self.to_select_topic(topic) 

336 content = topic.show(session, profile_id, filter_q=None, api_req=False) 

337 state = content["state"] 

338 if state == "CREATED": 

339 return 

340 else: 

341 raise EngineException( 

342 f" {profile_id} is not in created state", 

343 HTTPStatus.UNPROCESSABLE_ENTITY, 

344 ) 

345 

346 def edit(self, session, _id, item, indata=None, kwargs=None): 

347 if item not in ( 

348 "infra_controller_profiles", 

349 "infra_config_profiles", 

350 "app_profiles", 

351 "resource_profiles", 

352 ): 

353 self.schema_edit = cluster_edit_schema 

354 super().edit(session, _id, indata=item, kwargs=kwargs, content=None) 

355 else: 

356 indata = self._remove_envelop(indata) 

357 indata = self._validate_input_edit( 

358 indata, content=None, force=session["force"] 

359 ) 

360 if indata.get("add_profile"): 

361 self.add_profile(session, _id, item, indata) 

362 elif indata.get("remove_profile"): 

363 self.remove_profile(session, _id, item, indata) 

364 else: 

365 error_msg = "Add / remove operation is only applicable" 

366 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY) 

367 

368 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None): 

369 check = self.db.get_one(self.topic, {"_id": _id}) 

370 if "name" in indata and check["name"] != indata["name"]: 

371 self.check_unique_name(session, indata["name"]) 

372 _filter = {"name": indata["name"]} 

373 topic_list = [ 

374 "k8sclusters", 

375 "k8sinfra_controller", 

376 "k8sinfra_config", 

377 "k8sapp", 

378 "k8sresource", 

379 ] 

380 # Check unique name for k8scluster and profiles 

381 for topic in topic_list: 

382 if self.db.get_one( 

383 topic, _filter, fail_on_empty=False, fail_on_more=False 

384 ): 

385 raise EngineException( 

386 "name '{}' already exists for {}".format(indata["name"], topic), 

387 HTTPStatus.CONFLICT, 

388 ) 

389 # Replace name in k8scluster and profiles 

390 for topic in topic_list: 

391 data = self.db.get_one(topic, {"name": check["name"]}) 

392 data["name"] = indata["name"] 

393 self.db.replace(topic, data["_id"], data) 

394 return True 

395 

396 def add_profile(self, session, _id, item, indata=None): 

397 indata = self._remove_envelop(indata) 

398 operation_params = indata 

399 profile_id = indata["add_profile"][0]["id"] 

400 # check state 

401 self.state_check(profile_id, session, item) 

402 filter_q = self._get_project_filter(session) 

403 filter_q[self.id_field(self.topic, _id)] = _id 

404 content = self.db.get_one(self.topic, filter_q) 

405 profile_list = content[item] 

406 

407 if profile_id not in profile_list: 

408 content["operatingState"] = "PROCESSING" 

409 op_id = self.format_on_operation( 

410 content, 

411 "add", 

412 operation_params, 

413 ) 

414 self.db.set_one("clusters", {"_id": content["_id"]}, content) 

415 self._send_msg( 

416 "add", 

417 { 

418 "cluster_id": _id, 

419 "profile_id": profile_id, 

420 "profile_type": item, 

421 "operation_id": op_id, 

422 }, 

423 ) 

424 else: 

425 raise EngineException( 

426 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY 

427 ) 

428 

429 def _get_default_profiles(self, session, topic): 

430 topic = self.to_select_topic(topic) 

431 existing_profiles = topic.list(session, filter_q=None, api_req=False) 

432 default_profiles = [ 

433 profile["_id"] 

434 for profile in existing_profiles 

435 if profile.get("default", False) 

436 ] 

437 return default_profiles 

438 

439 def remove_profile(self, session, _id, item, indata): 

440 indata = self._remove_envelop(indata) 

441 operation_params = indata 

442 profile_id = indata["remove_profile"][0]["id"] 

443 filter_q = self._get_project_filter(session) 

444 filter_q[self.id_field(self.topic, _id)] = _id 

445 content = self.db.get_one(self.topic, filter_q) 

446 profile_list = content[item] 

447 

448 default_profiles = self._get_default_profiles(session, item) 

449 

450 if profile_id in default_profiles: 

451 raise EngineException( 

452 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY 

453 ) 

454 if profile_id in profile_list: 

455 op_id = self.format_on_operation( 

456 content, 

457 "remove", 

458 operation_params, 

459 ) 

460 self.db.set_one("clusters", {"_id": content["_id"]}, content) 

461 self._send_msg( 

462 "remove", 

463 { 

464 "cluster_id": _id, 

465 "profile_id": profile_id, 

466 "profile_type": item, 

467 "operation_id": op_id, 

468 }, 

469 ) 

470 else: 

471 raise EngineException( 

472 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY 

473 ) 

474 

475 def get_cluster_creds(self, session, _id, item): 

476 if not self.multiproject: 

477 filter_db = {} 

478 else: 

479 filter_db = self._get_project_filter(session) 

480 filter_db[BaseTopic.id_field(self.topic, _id)] = _id 

481 operation_params = None 

482 data = self.db.get_one(self.topic, filter_db) 

483 op_id = self.format_on_operation(data, item, operation_params) 

484 self.db.set_one(self.topic, {"_id": data["_id"]}, data) 

485 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id}) 

486 return op_id 

487 

488 def get_cluster_creds_file(self, session, _id, item, op_id): 

489 if not self.multiproject: 

490 filter_db = {} 

491 else: 

492 filter_db = self._get_project_filter(session) 

493 filter_db[BaseTopic.id_field(self.topic, _id)] = _id 

494 

495 data = self.db.get_one(self.topic, filter_db) 

496 creds_flag = None 

497 for operations in data["operationHistory"]: 

498 if operations["op_id"] == op_id: 

499 creds_flag = operations["result"] 

500 self.logger.info("Creds Flag: {}".format(creds_flag)) 

501 

502 if creds_flag is True: 

503 credentials = data["credentials"] 

504 

505 file_pkg = None 

506 current_path = _id 

507 

508 self.fs.file_delete(current_path, ignore_non_exist=True) 

509 self.fs.mkdir(current_path) 

510 filename = "credentials.yaml" 

511 file_path = (current_path, filename) 

512 self.logger.info("File path: {}".format(file_path)) 

513 file_pkg = self.fs.file_open(file_path, "a+b") 

514 

515 credentials_yaml = yaml.safe_dump( 

516 credentials, indent=4, default_flow_style=False 

517 ) 

518 file_pkg.write(credentials_yaml.encode(encoding="utf-8")) 

519 

520 if file_pkg: 

521 file_pkg.close() 

522 file_pkg = None 

523 self.fs.sync(from_path=current_path) 

524 

525 return ( 

526 self.fs.file_open((current_path, filename), "rb"), 

527 "text/plain", 

528 ) 

529 else: 

530 raise EngineException( 

531 "Not possible to get the credentials of the cluster", 

532 HTTPStatus.UNPROCESSABLE_ENTITY, 

533 ) 

534 

535 def update_cluster(self, session, _id, item, indata): 

536 if not self.multiproject: 

537 filter_db = {} 

538 else: 

539 filter_db = self._get_project_filter(session) 

540 # To allow project&user addressing by name AS WELL AS _id 

541 filter_db[BaseTopic.id_field(self.topic, _id)] = _id 

542 validate_input(indata, cluster_update_schema) 

543 data = self.db.get_one(self.topic, filter_db) 

544 operation_params = {} 

545 data["operatingState"] = "PROCESSING" 

546 data["resourceState"] = "IN_PROGRESS" 

547 operation_params = indata 

548 op_id = self.format_on_operation( 

549 data, 

550 item, 

551 operation_params, 

552 ) 

553 self.db.set_one(self.topic, {"_id": _id}, data) 

554 data = {"cluster_id": _id, "operation_id": op_id} 

555 self._send_msg(item, data) 

556 return op_id 

557 

558 def delete_extra_before(self, session, _id, db_content, not_send_msg=None): 

559 op_id = self.common_delete(_id, db_content) 

560 return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]} 

561 

562 def delete(self, session, _id, dry_run=False, not_send_msg=None): 

563 filter_q = self._get_project_filter(session) 

564 filter_q[self.id_field(self.topic, _id)] = _id 

565 check = self.db.get_one(self.topic, filter_q) 

566 if check["created"] == "false": 

567 raise EngineException( 

568 "Cannot delete registered cluster. Please deregister.", 

569 HTTPStatus.UNPROCESSABLE_ENTITY, 

570 ) 

571 super().delete(session, _id, dry_run=False, not_send_msg=None) 

572 

573 

574class ClusterOpsTopic(ACMTopic): 

575 topic = "clusters" 

576 topic_msg = "cluster" 

577 schema_new = clusterregistration_new_schema 

578 

579 def __init__(self, db, fs, msg, auth): 

580 super().__init__(db, fs, msg, auth) 

581 

582 @staticmethod 

583 def format_on_new(content, project_id=None, make_public=False): 

584 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public) 

585 content["current_operation"] = None 

586 

587 def add(self, rollback, session, indata, kwargs=None, headers=None): 

588 step = "checking quotas" 

589 try: 

590 self.check_quota(session) 

591 step = "name unique check" 

592 self.cluster_unique_name_check(session, indata["name"]) 

593 # self.check_unique_name(session, indata["name"]) 

594 step = "validating input parameters" 

595 cls_add_request = self._remove_envelop(indata) 

596 self._update_input_with_kwargs(cls_add_request, kwargs) 

597 cls_add_request = self._validate_input_new( 

598 cls_add_request, session["force"] 

599 ) 

600 operation_params = cls_add_request 

601 

602 step = "filling cluster details from input data" 

603 cls_add_request = self._add_cluster(cls_add_request, session) 

604 

605 step = "registering the cluster at database" 

606 self.format_on_new( 

607 cls_add_request, session["project_id"], make_public=session["public"] 

608 ) 

609 op_id = self.format_on_operation( 

610 cls_add_request, 

611 "register", 

612 operation_params, 

613 ) 

614 _id = self.db.create(self.topic, cls_add_request) 

615 pubkey, privkey = self._generate_age_key() 

616 cls_add_request["age_pubkey"] = self.db.encrypt( 

617 pubkey, schema_version="1.11", salt=_id 

618 ) 

619 cls_add_request["age_privkey"] = self.db.encrypt( 

620 privkey, schema_version="1.11", salt=_id 

621 ) 

622 # TODO: set age_pubkey and age_privkey in the default profiles 

623 self.db.set_one(self.topic, {"_id": _id}, cls_add_request) 

624 rollback.append({"topic": self.topic, "_id": _id}) 

625 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id}) 

626 

627 # To add the content in old collection "k8sclusters" 

628 self.add_to_old_collection(cls_add_request, session) 

629 

630 return _id, None 

631 except ( 

632 ValidationError, 

633 EngineException, 

634 DbException, 

635 MsgException, 

636 FsException, 

637 ) as e: 

638 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) 

639 

640 def _add_cluster(self, cls_add_request, session): 

641 cls_add = { 

642 "name": cls_add_request["name"], 

643 "credentials": cls_add_request["credentials"], 

644 "vim_account": cls_add_request["vim_account"], 

645 "bootstrap": cls_add_request["bootstrap"], 

646 "created": "false", 

647 "state": "IN_CREATION", 

648 "operatingState": "PROCESSING", 

649 "git_name": self.create_gitname(cls_add_request, session), 

650 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", 

651 } 

652 # Add optional fields if they exist in the request 

653 if "description" in cls_add_request: 

654 cls_add["description"] = cls_add_request["description"] 

655 return cls_add 

656 

657 def remove(self, session, _id, dry_run=False, not_send_msg=None): 

658 """ 

659 Delete item by its internal _id 

660 :param session: contains "username", "admin", "force", "public", "project_id", "set_project" 

661 :param _id: server internal id 

662 :param dry_run: make checking but do not delete 

663 :param not_send_msg: To not send message (False) or store content (list) instead 

664 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ... 

665 """ 

666 

667 # To allow addressing projects and users by name AS WELL AS by _id 

668 if not self.multiproject: 

669 filter_q = {} 

670 else: 

671 filter_q = self._get_project_filter(session) 

672 filter_q[self.id_field(self.topic, _id)] = _id 

673 item_content = self.db.get_one(self.topic, filter_q) 

674 

675 op_id = self.format_on_operation( 

676 item_content, 

677 "deregister", 

678 None, 

679 ) 

680 self.db.set_one(self.topic, {"_id": _id}, item_content) 

681 

682 self.check_conflict_on_del(session, _id, item_content) 

683 if dry_run: 

684 return None 

685 

686 if self.multiproject and session["project_id"]: 

687 # remove reference from project_read if there are more projects referencing it. If it last one, 

688 # do not remove reference, but delete 

689 other_projects_referencing = next( 

690 ( 

691 p 

692 for p in item_content["_admin"]["projects_read"] 

693 if p not in session["project_id"] and p != "ANY" 

694 ), 

695 None, 

696 ) 

697 

698 # check if there are projects referencing it (apart from ANY, that means, public).... 

699 if other_projects_referencing: 

700 # remove references but not delete 

701 update_dict_pull = { 

702 "_admin.projects_read": session["project_id"], 

703 "_admin.projects_write": session["project_id"], 

704 } 

705 self.db.set_one( 

706 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull 

707 ) 

708 return None 

709 else: 

710 can_write = next( 

711 ( 

712 p 

713 for p in item_content["_admin"]["projects_write"] 

714 if p == "ANY" or p in session["project_id"] 

715 ), 

716 None, 

717 ) 

718 if not can_write: 

719 raise EngineException( 

720 "You have not write permission to delete it", 

721 http_code=HTTPStatus.UNAUTHORIZED, 

722 ) 

723 

724 # delete 

725 self._send_msg( 

726 "deregister", 

727 {"cluster_id": _id, "operation_id": op_id}, 

728 not_send_msg=not_send_msg, 

729 ) 

730 return None 

731 

732 

733class KsusTopic(ACMTopic): 

734 topic = "ksus" 

735 okapkg_topic = "okas" 

736 infra_topic = "k8sinfra" 

737 topic_msg = "ksu" 

738 schema_new = ksu_schema 

739 schema_edit = ksu_schema 

740 

741 def __init__(self, db, fs, msg, auth): 

742 super().__init__(db, fs, msg, auth) 

743 self.logger = logging.getLogger("nbi.ksus") 

744 

745 @staticmethod 

746 def format_on_new(content, project_id=None, make_public=False): 

747 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public) 

748 content["current_operation"] = None 

749 content["state"] = "IN_CREATION" 

750 content["operatingState"] = "PROCESSING" 

751 content["resourceState"] = "IN_PROGRESS" 

752 

753 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

754 _id_list = [] 

755 for ksus in indata["ksus"]: 

756 content = ksus 

757 oka = content["oka"][0] 

758 oka_flag = "" 

759 if oka["_id"]: 

760 oka_flag = "_id" 

761 oka["sw_catalog_path"] = "" 

762 elif oka["sw_catalog_path"]: 

763 oka_flag = "sw_catalog_path" 

764 

765 for okas in content["oka"]: 

766 if okas["_id"] and okas["sw_catalog_path"]: 

767 raise EngineException( 

768 "Cannot create ksu with both OKA and SW catalog path", 

769 HTTPStatus.UNPROCESSABLE_ENTITY, 

770 ) 

771 if not okas["sw_catalog_path"]: 

772 okas.pop("sw_catalog_path") 

773 elif not okas["_id"]: 

774 okas.pop("_id") 

775 if oka_flag not in okas.keys(): 

776 raise EngineException( 

777 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU", 

778 HTTPStatus.UNPROCESSABLE_ENTITY, 

779 ) 

780 

781 # Override descriptor with query string kwargs 

782 content = self._remove_envelop(content) 

783 self._update_input_with_kwargs(content, kwargs) 

784 content = self._validate_input_new(input=content, force=session["force"]) 

785 

786 # Check for unique name 

787 self.check_unique_name(session, content["name"]) 

788 

789 self.check_conflict_on_new(session, content) 

790 

791 operation_params = {} 

792 for content_key, content_value in content.items(): 

793 operation_params[content_key] = content_value 

794 self.format_on_new( 

795 content, project_id=session["project_id"], make_public=session["public"] 

796 ) 

797 op_id = self.format_on_operation( 

798 content, 

799 operation_type="create", 

800 operation_params=operation_params, 

801 ) 

802 content["git_name"] = self.create_gitname(content, session) 

803 

804 # Update Oka_package usage state 

805 for okas in content["oka"]: 

806 if "_id" in okas.keys(): 

807 self.update_usage_state(session, okas) 

808 

809 _id = self.db.create(self.topic, content) 

810 rollback.append({"topic": self.topic, "_id": _id}) 

811 _id_list.append(_id) 

812 data = {"ksus_list": _id_list, "operation_id": op_id} 

813 self._send_msg("create", data) 

814 return _id_list, op_id 

815 

816 def clone(self, rollback, session, _id, indata, kwargs, headers): 

817 filter_db = self._get_project_filter(session) 

818 filter_db[BaseTopic.id_field(self.topic, _id)] = _id 

819 data = self.db.get_one(self.topic, filter_db) 

820 

821 op_id = self.format_on_operation( 

822 data, 

823 "clone", 

824 indata, 

825 ) 

826 self.db.set_one(self.topic, {"_id": data["_id"]}, data) 

827 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id}) 

828 return op_id 

829 

830 def update_usage_state(self, session, oka_content): 

831 _id = oka_content["_id"] 

832 filter_db = self._get_project_filter(session) 

833 filter_db[BaseTopic.id_field(self.topic, _id)] = _id 

834 

835 data = self.db.get_one(self.okapkg_topic, filter_db) 

836 if data["_admin"]["usageState"] == "NOT_IN_USE": 

837 usage_state_update = { 

838 "_admin.usageState": "IN_USE", 

839 } 

840 self.db.set_one( 

841 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update 

842 ) 

843 

844 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None): 

845 indata = self._remove_envelop(indata) 

846 

847 # Override descriptor with query string kwargs 

848 if kwargs: 

849 self._update_input_with_kwargs(indata, kwargs) 

850 try: 

851 if indata and session.get("set_project"): 

852 raise EngineException( 

853 "Cannot edit content and set to project (query string SET_PROJECT) at same time", 

854 HTTPStatus.UNPROCESSABLE_ENTITY, 

855 ) 

856 # TODO self._check_edition(session, indata, _id, force) 

857 if not content: 

858 content = self.show(session, _id) 

859 indata = self._validate_input_edit( 

860 input=indata, content=content, force=session["force"] 

861 ) 

862 operation_params = indata 

863 deep_update_rfc7396(content, indata) 

864 

865 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name 

866 _id = content.get("_id") or _id 

867 op_id = self.format_on_operation( 

868 content, 

869 "move", 

870 operation_params, 

871 ) 

872 if content.get("_admin"): 

873 now = time() 

874 content["_admin"]["modified"] = now 

875 content["operatingState"] = "PROCESSING" 

876 content["resourceState"] = "IN_PROGRESS" 

877 

878 self.db.replace(self.topic, _id, content) 

879 data = {"ksus_list": [content["_id"]], "operation_id": op_id} 

880 self._send_msg("move", data) 

881 return op_id 

882 except ValidationError as e: 

883 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) 

884 

885 def check_conflict_on_edit(self, session, final_content, edit_content, _id): 

886 if final_content["name"] != edit_content["name"]: 

887 self.check_unique_name(session, edit_content["name"]) 

888 return final_content 

889 

890 @staticmethod 

891 def format_on_edit(final_content, edit_content): 

892 op_id = ACMTopic.format_on_operation( 

893 final_content, 

894 "update", 

895 edit_content, 

896 ) 

897 final_content["operatingState"] = "PROCESSING" 

898 final_content["resourceState"] = "IN_PROGRESS" 

899 if final_content.get("_admin"): 

900 now = time() 

901 final_content["_admin"]["modified"] = now 

902 return op_id 

903 

904 def edit(self, session, _id, indata, kwargs): 

905 _id_list = [] 

906 if _id == "update": 

907 for ksus in indata["ksus"]: 

908 content = ksus 

909 _id = content["_id"] 

910 _id_list.append(_id) 

911 content.pop("_id") 

912 op_id = self.edit_ksu(session, _id, content, kwargs) 

913 else: 

914 content = indata 

915 _id_list.append(_id) 

916 op_id = self.edit_ksu(session, _id, content, kwargs) 

917 

918 data = {"ksus_list": _id_list, "operation_id": op_id} 

919 self._send_msg("edit", data) 

920 

921 def edit_ksu(self, session, _id, indata, kwargs): 

922 content = None 

923 indata = self._remove_envelop(indata) 

924 

925 # Override descriptor with query string kwargs 

926 if kwargs: 

927 self._update_input_with_kwargs(indata, kwargs) 

928 try: 

929 if indata and session.get("set_project"): 

930 raise EngineException( 

931 "Cannot edit content and set to project (query string SET_PROJECT) at same time", 

932 HTTPStatus.UNPROCESSABLE_ENTITY, 

933 ) 

934 # TODO self._check_edition(session, indata, _id, force) 

935 if not content: 

936 content = self.show(session, _id) 

937 

938 for okas in indata["oka"]: 

939 if not okas["_id"]: 

940 okas.pop("_id") 

941 if not okas["sw_catalog_path"]: 

942 okas.pop("sw_catalog_path") 

943 

944 indata = self._validate_input_edit(indata, content, force=session["force"]) 

945 

946 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name 

947 _id = content.get("_id") or _id 

948 

949 content = self.check_conflict_on_edit(session, content, indata, _id=_id) 

950 op_id = self.format_on_edit(content, indata) 

951 self.db.replace(self.topic, _id, content) 

952 return op_id 

953 except ValidationError as e: 

954 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) 

955 

956 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None): 

957 _id_list = [] 

958 if _id == "delete": 

959 for ksus in indata["ksus"]: 

960 content = ksus 

961 _id = content["_id"] 

962 content.pop("_id") 

963 op_id, not_send_msg_ksu = self.delete(session, _id) 

964 if not not_send_msg_ksu: 

965 _id_list.append(_id) 

966 else: 

967 op_id, not_send_msg_ksu = self.delete(session, _id) 

968 if not not_send_msg_ksu: 

969 _id_list.append(_id) 

970 

971 if _id_list: 

972 data = { 

973 "ksus_list": _id_list, 

974 "operation_id": op_id, 

975 "force": session["force"], 

976 } 

977 self._send_msg("delete", data, not_send_msg) 

978 return op_id 

979 

980 def delete(self, session, _id): 

981 if not self.multiproject: 

982 filter_q = {} 

983 else: 

984 filter_q = self._get_project_filter(session) 

985 filter_q[self.id_field(self.topic, _id)] = _id 

986 item_content = self.db.get_one(self.topic, filter_q) 

987 item_content["state"] = "IN_DELETION" 

988 item_content["operatingState"] = "PROCESSING" 

989 item_content["resourceState"] = "IN_PROGRESS" 

990 op_id = self.format_on_operation( 

991 item_content, 

992 "delete", 

993 None, 

994 ) 

995 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content) 

996 

997 if item_content["oka"][0].get("_id"): 

998 used_oka = {} 

999 existing_oka = [] 

1000 for okas in item_content["oka"]: 

1001 used_oka["_id"] = okas["_id"] 

1002 

1003 filter = self._get_project_filter(session) 

1004 data = self.db.get_list(self.topic, filter) 

1005 

1006 if data: 

1007 for ksus in data: 

1008 if ksus["_id"] != _id: 

1009 for okas in ksus["oka"]: 

1010 self.logger.info("OKA: {}".format(okas)) 

1011 if okas.get("sw_catalog_path", ""): 

1012 continue 

1013 elif okas["_id"] not in existing_oka: 

1014 existing_oka.append(okas["_id"]) 

1015 

1016 if used_oka: 

1017 for oka, oka_id in used_oka.items(): 

1018 if oka_id not in existing_oka: 

1019 self.db.set_one( 

1020 self.okapkg_topic, 

1021 {"_id": oka_id}, 

1022 {"_admin.usageState": "NOT_IN_USE"}, 

1023 ) 

1024 # Check if the profile exists. If it doesn't, no message should be sent to Kafka 

1025 not_send_msg = None 

1026 profile_id = item_content["profile"]["_id"] 

1027 profile_type = item_content["profile"]["profile_type"] 

1028 profile_collection_map = { 

1029 "app_profiles": "k8sapp", 

1030 "resource_profiles": "k8sresource", 

1031 "infra_controller_profiles": "k8sinfra_controller", 

1032 "infra_config_profiles": "k8sinfra_config", 

1033 } 

1034 profile_collection = profile_collection_map[profile_type] 

1035 profile_content = self.db.get_one( 

1036 profile_collection, {"_id": profile_id}, fail_on_empty=False 

1037 ) 

1038 if not profile_content: 

1039 self.db.del_one(self.topic, filter_q) 

1040 not_send_msg = True 

1041 return op_id, not_send_msg 

1042 

1043 

1044class OkaTopic(DescriptorTopic, ACMOperationTopic): 

1045 topic = "okas" 

1046 topic_msg = "oka" 

1047 schema_new = oka_schema 

1048 schema_edit = oka_schema 

1049 

1050 def __init__(self, db, fs, msg, auth): 

1051 super().__init__(db, fs, msg, auth) 

1052 self.logger = logging.getLogger("nbi.oka") 

1053 

1054 @staticmethod 

1055 def format_on_new(content, project_id=None, make_public=False): 

1056 DescriptorTopic.format_on_new( 

1057 content, project_id=project_id, make_public=make_public 

1058 ) 

1059 content["current_operation"] = None 

1060 content["state"] = "PENDING_CONTENT" 

1061 content["operatingState"] = "PROCESSING" 

1062 content["resourceState"] = "IN_PROGRESS" 

1063 

1064 def check_conflict_on_del(self, session, _id, db_content): 

1065 usage_state = db_content["_admin"]["usageState"] 

1066 if usage_state == "IN_USE": 

1067 raise EngineException( 

1068 "There is a KSU using this package", 

1069 http_code=HTTPStatus.CONFLICT, 

1070 ) 

1071 

1072 def check_conflict_on_edit(self, session, final_content, edit_content, _id): 

1073 if "name" in edit_content: 

1074 if final_content["name"] == edit_content["name"]: 

1075 name = edit_content["name"] 

1076 raise EngineException( 

1077 f"No update, new name for the OKA is the same: {name}", 

1078 http_code=HTTPStatus.CONFLICT, 

1079 ) 

1080 else: 

1081 self.check_unique_name(session, edit_content["name"]) 

1082 elif ( 

1083 "description" in edit_content 

1084 and final_content["description"] == edit_content["description"] 

1085 ): 

1086 description = edit_content["description"] 

1087 raise EngineException( 

1088 f"No update, new description for the OKA is the same: {description}", 

1089 http_code=HTTPStatus.CONFLICT, 

1090 ) 

1091 return final_content 

1092 

1093 def edit(self, session, _id, indata=None, kwargs=None, content=None): 

1094 indata = self._remove_envelop(indata) 

1095 

1096 # Override descriptor with query string kwargs 

1097 if kwargs: 

1098 self._update_input_with_kwargs(indata, kwargs) 

1099 try: 

1100 if indata and session.get("set_project"): 

1101 raise EngineException( 

1102 "Cannot edit content and set to project (query string SET_PROJECT) at same time", 

1103 HTTPStatus.UNPROCESSABLE_ENTITY, 

1104 ) 

1105 # TODO self._check_edition(session, indata, _id, force) 

1106 if not content: 

1107 content = self.show(session, _id) 

1108 

1109 indata = self._validate_input_edit(indata, content, force=session["force"]) 

1110 

1111 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name 

1112 _id = content.get("_id") or _id 

1113 

1114 content = self.check_conflict_on_edit(session, content, indata, _id=_id) 

1115 op_id = self.format_on_edit(content, indata) 

1116 deep_update_rfc7396(content, indata) 

1117 

1118 self.db.replace(self.topic, _id, content) 

1119 return op_id 

1120 except ValidationError as e: 

1121 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) 

1122 

1123 def delete(self, session, _id, dry_run=False, not_send_msg=None): 

1124 if not self.multiproject: 

1125 filter_q = {} 

1126 else: 

1127 filter_q = self._get_project_filter(session) 

1128 filter_q[self.id_field(self.topic, _id)] = _id 

1129 item_content = self.db.get_one(self.topic, filter_q) 

1130 item_content["state"] = "IN_DELETION" 

1131 item_content["operatingState"] = "PROCESSING" 

1132 self.check_conflict_on_del(session, _id, item_content) 

1133 op_id = self.format_on_operation( 

1134 item_content, 

1135 "delete", 

1136 None, 

1137 ) 

1138 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content) 

1139 self._send_msg( 

1140 "delete", 

1141 {"oka_id": _id, "operation_id": op_id, "force": session["force"]}, 

1142 not_send_msg=not_send_msg, 

1143 ) 

1144 return op_id 

1145 

1146 def new(self, rollback, session, indata=None, kwargs=None, headers=None): 

1147 # _remove_envelop 

1148 if indata: 

1149 if "userDefinedData" in indata: 

1150 indata = indata["userDefinedData"] 

1151 

1152 content = {"_admin": {"userDefinedData": indata, "revision": 0}} 

1153 

1154 self._update_input_with_kwargs(content, kwargs) 

1155 content = BaseTopic._validate_input_new( 

1156 self, input=kwargs, force=session["force"] 

1157 ) 

1158 

1159 self.check_unique_name(session, content["name"]) 

1160 operation_params = {} 

1161 for content_key, content_value in content.items(): 

1162 operation_params[content_key] = content_value 

1163 self.format_on_new( 

1164 content, session["project_id"], make_public=session["public"] 

1165 ) 

1166 op_id = self.format_on_operation( 

1167 content, 

1168 operation_type="create", 

1169 operation_params=operation_params, 

1170 ) 

1171 content["git_name"] = self.create_gitname(content, session) 

1172 _id = self.db.create(self.topic, content) 

1173 rollback.append({"topic": self.topic, "_id": _id}) 

1174 return _id, op_id 

1175 

1176 def upload_content(self, session, _id, indata, kwargs, headers): 

1177 current_desc = self.show(session, _id) 

1178 

1179 compressed = None 

1180 content_type = headers.get("Content-Type") 

1181 if ( 

1182 content_type 

1183 and "application/gzip" in content_type 

1184 or "application/x-gzip" in content_type 

1185 ): 

1186 compressed = "gzip" 

1187 if content_type and "application/zip" in content_type: 

1188 compressed = "zip" 

1189 filename = headers.get("Content-Filename") 

1190 if not filename and compressed: 

1191 filename = "package.tar.gz" if compressed == "gzip" else "package.zip" 

1192 elif not filename: 

1193 filename = "package" 

1194 

1195 revision = 1 

1196 if "revision" in current_desc["_admin"]: 

1197 revision = current_desc["_admin"]["revision"] + 1 

1198 

1199 file_pkg = None 

1200 fs_rollback = [] 

1201 

1202 try: 

1203 start = 0 

1204 # Rather than using a temp folder, we will store the package in a folder based on 

1205 # the current revision. 

1206 proposed_revision_path = _id + ":" + str(revision) 

1207 # all the content is upload here and if ok, it is rename from id_ to is folder 

1208 

1209 if start: 

1210 if not self.fs.file_exists(proposed_revision_path, "dir"): 

1211 raise EngineException( 

1212 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND 

1213 ) 

1214 else: 

1215 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True) 

1216 self.fs.mkdir(proposed_revision_path) 

1217 fs_rollback.append(proposed_revision_path) 

1218 

1219 storage = self.fs.get_params() 

1220 storage["folder"] = proposed_revision_path 

1221 storage["zipfile"] = filename 

1222 

1223 file_path = (proposed_revision_path, filename) 

1224 file_pkg = self.fs.file_open(file_path, "a+b") 

1225 

1226 if isinstance(indata, dict): 

1227 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False) 

1228 file_pkg.write(indata_text.encode(encoding="utf-8")) 

1229 else: 

1230 indata_len = 0 

1231 indata = indata.file 

1232 while True: 

1233 indata_text = indata.read(4096) 

1234 indata_len += len(indata_text) 

1235 if not indata_text: 

1236 break 

1237 file_pkg.write(indata_text) 

1238 

1239 # Need to close the file package here so it can be copied from the 

1240 # revision to the current, unrevisioned record 

1241 if file_pkg: 

1242 file_pkg.close() 

1243 file_pkg = None 

1244 

1245 # Fetch both the incoming, proposed revision and the original revision so we 

1246 # can call a validate method to compare them 

1247 current_revision_path = _id + "/" 

1248 self.fs.sync(from_path=current_revision_path) 

1249 self.fs.sync(from_path=proposed_revision_path) 

1250 

1251 # Is this required? 

1252 if revision > 1: 

1253 try: 

1254 self._validate_descriptor_changes( 

1255 _id, 

1256 filename, 

1257 current_revision_path, 

1258 proposed_revision_path, 

1259 ) 

1260 except Exception as e: 

1261 shutil.rmtree( 

1262 self.fs.path + current_revision_path, ignore_errors=True 

1263 ) 

1264 shutil.rmtree( 

1265 self.fs.path + proposed_revision_path, ignore_errors=True 

1266 ) 

1267 # Only delete the new revision. We need to keep the original version in place 

1268 # as it has not been changed. 

1269 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True) 

1270 raise e 

1271 

1272 indata = self._remove_envelop(indata) 

1273 

1274 # Override descriptor with query string kwargs 

1275 if kwargs: 

1276 self._update_input_with_kwargs(indata, kwargs) 

1277 

1278 current_desc["_admin"]["storage"] = storage 

1279 current_desc["_admin"]["onboardingState"] = "ONBOARDED" 

1280 current_desc["_admin"]["operationalState"] = "ENABLED" 

1281 current_desc["_admin"]["modified"] = time() 

1282 current_desc["_admin"]["revision"] = revision 

1283 

1284 deep_update_rfc7396(current_desc, indata) 

1285 

1286 # Copy the revision to the active package name by its original id 

1287 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True) 

1288 os.rename( 

1289 self.fs.path + proposed_revision_path, 

1290 self.fs.path + current_revision_path, 

1291 ) 

1292 self.fs.file_delete(current_revision_path, ignore_non_exist=True) 

1293 self.fs.mkdir(current_revision_path) 

1294 self.fs.reverse_sync(from_path=current_revision_path) 

1295 

1296 shutil.rmtree(self.fs.path + _id) 

1297 kwargs = {} 

1298 kwargs["package"] = filename 

1299 if headers["Method"] == "POST": 

1300 current_desc["state"] = "IN_CREATION" 

1301 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get( 

1302 "op_id" 

1303 ) 

1304 elif headers["Method"] in ("PUT", "PATCH"): 

1305 op_id = self.format_on_operation( 

1306 current_desc, 

1307 "update", 

1308 kwargs, 

1309 ) 

1310 current_desc["operatingState"] = "PROCESSING" 

1311 current_desc["resourceState"] = "IN_PROGRESS" 

1312 

1313 self.db.replace(self.topic, _id, current_desc) 

1314 

1315 # Store a copy of the package as a point in time revision 

1316 revision_desc = dict(current_desc) 

1317 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"]) 

1318 self.db.create(self.topic + "_revisions", revision_desc) 

1319 fs_rollback = [] 

1320 

1321 if headers["Method"] == "POST": 

1322 self._send_msg("create", {"oka_id": _id, "operation_id": op_id}) 

1323 elif headers["Method"] == "PUT" or "PATCH": 

1324 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id}) 

1325 

1326 return True 

1327 

1328 except EngineException: 

1329 raise 

1330 finally: 

1331 if file_pkg: 

1332 file_pkg.close() 

1333 for file in fs_rollback: 

1334 self.fs.file_delete(file, ignore_non_exist=True)