From 28d887f8e818fa0801a1371425435325c0e79c45 Mon Sep 17 00:00:00 2001 From: shrinithi Date: Wed, 8 Jan 2025 05:27:19 +0000 Subject: [PATCH] NBI ACM code refactor Change-Id: I1957f32b3120903ac978e353c05408fc00b3fe66 Signed-off-by: shrinithi --- osm_nbi/acm_topic.py | 342 ++++++++++++++++++++++++++++++++++++++++ osm_nbi/base_topic.py | 352 +++++++----------------------------------- osm_nbi/engine.py | 25 ++- osm_nbi/k8s_topics.py | 96 +++++------- osm_nbi/nbi.py | 22 ++- 5 files changed, 468 insertions(+), 369 deletions(-) create mode 100644 osm_nbi/acm_topic.py diff --git a/osm_nbi/acm_topic.py b/osm_nbi/acm_topic.py new file mode 100644 index 0000000..87087b7 --- /dev/null +++ b/osm_nbi/acm_topic.py @@ -0,0 +1,342 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyrage import x25519 +from uuid import uuid4 + +from http import HTTPStatus +from time import time + +# from osm_common.dbbase import deep_update_rfc7396, DbException +from osm_common.msgbase import MsgException +from osm_common.dbbase import DbException +from osm_common.fsbase import FsException +from osm_nbi.base_topic import BaseTopic, EngineException +from osm_nbi.validation import ValidationError + +# import logging +# import random +# import string +# from yaml import safe_load, YAMLError + + +class ACMOperationTopic: + def __init__(self, db, fs, msg, auth): + self.multiproject = None # Declare the attribute here + + @staticmethod + def format_on_operation(content, operation_type, operation_params=None): + op_id = str(uuid4()) + now = time() + if "operationHistory" not in content: + content["operationHistory"] = [] + + operation = {} + operation["operationType"] = operation_type + operation["op_id"] = op_id + operation["result"] = None + operation["creationDate"] = now + operation["endDate"] = None + operation["workflowState"] = operation["resourceState"] = operation[ + "operationState" + ] = operation["gitOperationInfo"] = None + operation["operationParams"] = operation_params + + content["operationHistory"].append(operation) + return op_id + + +class ACMTopic(BaseTopic, ACMOperationTopic): + def __init__(self, db, fs, msg, auth): + super().__init__(db, fs, msg, auth) + # ACMOperationTopic.__init__(db, fs, msg, auth) + + def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None): + step = "name unique check" + try: + self.check_unique_name(session, indata["name"]) + + step = "validating input parameters" + profile_request = self._remove_envelop(indata) + self._update_input_with_kwargs(profile_request, kwargs) + profile_request = self._validate_input_new( + profile_request, session["force"] + ) + operation_params = profile_request + + step = "filling profile details from input data" + profile_create = self._create_profile(profile_request, session) + + step = "creating profile at database" + self.format_on_new( + profile_create, session["project_id"], make_public=session["public"] + ) + profile_create["current_operation"] = None + op_id = ACMOperationTopic.format_on_operation( + profile_create, + "create", + operation_params, + ) + + _id = self.db.create(self.topic, profile_create) + pubkey, privkey = self._generate_age_key() + profile_create["age_pubkey"] = self.db.encrypt( + pubkey, schema_version="1.11", salt=_id + ) + profile_create["age_privkey"] = self.db.encrypt( + privkey, schema_version="1.11", salt=_id + ) + rollback.append({"topic": self.topic, "_id": _id}) + self.db.set_one(self.topic, {"_id": _id}, profile_create) + if op_id: + profile_create["op_id"] = op_id + self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id}) + + return _id, None + except ( + ValidationError, + EngineException, + DbException, + MsgException, + FsException, + ) as e: + raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) + + def _create_profile(self, profile_request, session): + profile_desc = { + "name": profile_request["name"], + "description": profile_request["description"], + "default": False, + "git_name": self.create_gitname(profile_request, session), + "state": "IN_CREATION", + "operatingState": "IN_PROGRESS", + "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", + } + return profile_desc + + def default_profile( + self, rollback, session, indata=None, kwargs=None, headers=None + ): + step = "validating input parameters" + try: + profile_request = self._remove_envelop(indata) + self._update_input_with_kwargs(profile_request, kwargs) + operation_params = profile_request + + step = "filling profile details from input data" + profile_create = self._create_default_profile(profile_request, session) + + step = "creating profile at database" + self.format_on_new( + profile_create, session["project_id"], make_public=session["public"] + ) + profile_create["current_operation"] = None + ACMOperationTopic.format_on_operation( + profile_create, + "create", + operation_params, + ) + _id = self.db.create(self.topic, profile_create) + rollback.append({"topic": self.topic, "_id": _id}) + return _id + except ( + ValidationError, + EngineException, + DbException, + MsgException, + FsException, + ) as e: + raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) + + def _create_default_profile(self, profile_request, session): + profile_desc = { + "name": profile_request["name"], + "description": f"{self.topic} profile for cluster {profile_request['name']}", + "default": True, + "git_name": self.create_gitname(profile_request, session), + "state": "IN_CREATION", + "operatingState": "IN_PROGRESS", + "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", + } + return profile_desc + + def detach(self, session, _id, profile_type): + # To detach the profiles from every cluster + filter_q = {} + existing_clusters = self.db.get_list("clusters", filter_q) + existing_clusters_profiles = [ + profile["_id"] + for profile in existing_clusters + if profile.get("profile_type", _id) + ] + update_dict = None + for profile in existing_clusters_profiles: + filter_q = {"_id": profile} + data = self.db.get_one("clusters", filter_q) + if profile_type in data: + profile_ids = data[profile_type] + if _id in profile_ids: + profile_ids.remove(_id) + update_dict = {profile_type: profile_ids} + self.db.set_one("clusters", filter_q, update_dict) + + def _generate_age_key(self): + ident = x25519.Identity.generate() + # gets the public key + pubkey = str(ident.to_public()) + # gets the private key + privkey = str(ident) + # return both public and private key + return pubkey, privkey + + def common_delete(self, _id, db_content): + if "state" in db_content: + db_content["state"] = "IN_DELETION" + db_content["operatingState"] = "PROCESSING" + # self.db.set_one(self.topic, {"_id": _id}, db_content) + + db_content["current_operation"] = None + op_id = ACMOperationTopic.format_on_operation( + db_content, + "delete", + None, + ) + self.db.set_one(self.topic, {"_id": _id}, db_content) + return op_id + + def add_to_old_collection(self, content, session): + item = {} + item["name"] = content["name"] + item["credentials"] = {} + # item["k8s_version"] = content["k8s_version"] + if "k8s_version" in content: + item["k8s_version"] = content["k8s_version"] + else: + item["k8s_version"] = None + vim_account_details = self.db.get_one( + "vim_accounts", {"name": content["vim_account"]} + ) + item["vim_account"] = vim_account_details["_id"] + item["nets"] = {"k8s_net1": None} + item["deployment_methods"] = {"juju-bundle": False, "helm-chart-v3": True} + # item["description"] = content["description"] + if "description" in content: + item["description"] = content["description"] + else: + item["description"] = None + item["namespace"] = "kube-system" + item["osm_acm"] = True + item["schema_version"] = "1.11" + self.format_on_new(item, session["project_id"], make_public=session["public"]) + _id = self.db.create("k8sclusters", item) + self.logger.info(f"_id is : {_id}") + item_1 = self.db.get_one("k8sclusters", {"name": item["name"]}) + item_1["_admin"]["operationalState"] = "PROCESSING" + + # Create operation data + now = time() + operation_data = { + "lcmOperationType": "create", # Assuming 'create' operation here + "operationState": "PROCESSING", + "startTime": now, + "statusEnteredTime": now, + "detailed-status": "", + "operationParams": None, # Add parameters as needed + } + # create operation + item_1["_admin"]["operations"] = [operation_data] + item_1["_admin"]["current_operation"] = None + self.logger.info(f"content is : {item_1}") + self.db.set_one("k8sclusters", {"_id": item_1["_id"]}, item_1) + return + + def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None): + check = self.db.get_one(self.topic, {"_id": _id}) + if self.topic != "clusters": + if check["default"] is True: + raise EngineException( + "Cannot edit default profiles", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + if "name" in indata: + if check["name"] == indata["name"]: + pass + else: + self.check_unique_name(session, indata["name"]) + + return True + + def cluster_unique_name_check(self, session, name): + self.check_unique_name(session, name) + _filter = {"name": name} + topic = "k8sclusters" + if self.db.get_one(topic, _filter, fail_on_empty=False, fail_on_more=False): + raise EngineException( + "name '{}' already exists".format(name), + HTTPStatus.CONFLICT, + ) + + def list_both(self, session, filter_q=None, api_req=False): + """List all clusters from both new and old APIs""" + if not filter_q: + filter_q = {} + if self.multiproject: + filter_q.update(self._get_project_filter(session)) + cluster_list1 = self.db.get_list(self.topic, filter_q) + cluster_list2 = self.db.get_list("k8sclusters", filter_q) + list1_names = {item["name"] for item in cluster_list1} + for item in cluster_list2: + if item["name"] not in list1_names: + # Complete the information for clusters from old API + item["state"] = "N/A" + old_state = item.get("_admin", {}).get("operationalState", "Unknown") + item["bootstrap"] = "NO" + item["operatingState"] = "N/A" + item["resourceState"] = old_state + item["created"] = "NO" + cluster_list1.append(item) + if api_req: + cluster_list1 = [self.sol005_projection(inst) for inst in cluster_list1] + return cluster_list1 + + +class ProfileTopic(ACMTopic): + profile_topic_map = { + "k8sapp": "app_profiles", + "k8sresource": "resource_profiles", + "k8sinfra_controller": "infra_controller_profiles", + "k8sinfra_config": "infra_config_profiles", + } + + def __init__(self, db, fs, msg, auth): + super().__init__(db, fs, msg, auth) + + def delete_extra_before(self, session, _id, db_content, not_send_msg=None): + op_id = self.common_delete(_id, db_content) + return {"profile_id": _id, "operation_id": op_id} + + def delete_profile(self, session, _id, dry_run=False, not_send_msg=None): + item_content = self.db.get_one(self.topic, {"_id": _id}) + if item_content.get("default", False): + raise EngineException( + "Cannot delete item because it is marked as default", + http_code=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + # Before deleting, detach the profile from the associated clusters. + profile_type = self.profile_topic_map[self.topic] + self.detach(session, _id, profile_type) + # To delete the infra controller profile + super().delete(session, _id, not_send_msg=not_send_msg) + return _id diff --git a/osm_nbi/base_topic.py b/osm_nbi/base_topic.py index 3dbaa3f..b66e6bd 100644 --- a/osm_nbi/base_topic.py +++ b/osm_nbi/base_topic.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pyrage import x25519 + import logging import random import string @@ -21,8 +21,6 @@ from uuid import uuid4 from http import HTTPStatus from time import time from osm_common.dbbase import deep_update_rfc7396, DbException -from osm_common.msgbase import MsgException -from osm_common.fsbase import FsException from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid from yaml import safe_load, YAMLError @@ -410,27 +408,6 @@ class BaseTopic: content["_admin"]["projects_write"] = list(project_id) return None - @staticmethod - def format_on_operation(content, operation_type, operation_params=None): - op_id = str(uuid4()) - now = time() - if "operationHistory" not in content: - content["operationHistory"] = [] - - operation = {} - operation["operationType"] = operation_type - operation["op_id"] = op_id - operation["result"] = None - operation["creationDate"] = now - operation["endDate"] = None - operation["workflowState"] = operation["resourceState"] = operation[ - "operationState" - ] = operation["gitOperationInfo"] = None - operation["operationParams"] = operation_params - - content["operationHistory"].append(operation) - return op_id - @staticmethod def format_on_edit(final_content, edit_content): """ @@ -643,137 +620,6 @@ class BaseTopic: HTTPStatus.INTERNAL_SERVER_ERROR, ) - def create_gitname(self, content, session, _id=None): - if not self.multiproject: - _filter = {} - else: - _filter = self._get_project_filter(session) - _filter["git_name"] = content["name"] - if _id: - _filter["_id.neq"] = _id - if self.db.get_one( - self.topic, _filter, fail_on_empty=False, fail_on_more=False - ): - n = 5 - # using random.choices() - # generating random strings - res = "".join(random.choices(string.ascii_lowercase + string.digits, k=n)) - res1 = content["name"] - new_name1 = res1 + res - new_name = new_name1.lower() - return new_name - else: - return content["name"] - - def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None): - step = "name unique check" - try: - self.check_unique_name(session, indata["name"]) - - step = "validating input parameters" - profile_request = self._remove_envelop(indata) - self._update_input_with_kwargs(profile_request, kwargs) - profile_request = self._validate_input_new( - profile_request, session["force"] - ) - operation_params = profile_request - - step = "filling profile details from input data" - profile_create = self._create_profile(profile_request, session) - - step = "creating profile at database" - self.format_on_new( - profile_create, session["project_id"], make_public=session["public"] - ) - profile_create["current_operation"] = None - op_id = self.format_on_operation( - profile_create, - "create", - operation_params, - ) - - _id = self.db.create(self.topic, profile_create) - pubkey, privkey = self._generate_age_key() - profile_create["age_pubkey"] = self.db.encrypt( - pubkey, schema_version="1.11", salt=_id - ) - profile_create["age_privkey"] = self.db.encrypt( - privkey, schema_version="1.11", salt=_id - ) - rollback.append({"topic": self.topic, "_id": _id}) - self.db.set_one(self.topic, {"_id": _id}, profile_create) - if op_id: - profile_create["op_id"] = op_id - self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id}) - - return _id, None - except ( - ValidationError, - EngineException, - DbException, - MsgException, - FsException, - ) as e: - raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) - - def _create_profile(self, profile_request, session): - profile_desc = { - "name": profile_request["name"], - "description": profile_request["description"], - "default": False, - "git_name": self.create_gitname(profile_request, session), - "state": "IN_CREATION", - "operatingState": "IN_PROGRESS", - "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", - } - return profile_desc - - def default_profile( - self, rollback, session, indata=None, kwargs=None, headers=None - ): - step = "validating input parameters" - try: - profile_request = self._remove_envelop(indata) - self._update_input_with_kwargs(profile_request, kwargs) - operation_params = profile_request - - step = "filling profile details from input data" - profile_create = self._create_default_profile(profile_request, session) - - step = "creating profile at database" - self.format_on_new( - profile_create, session["project_id"], make_public=session["public"] - ) - profile_create["current_operation"] = None - self.format_on_operation( - profile_create, - "create", - operation_params, - ) - _id = self.db.create(self.topic, profile_create) - rollback.append({"topic": self.topic, "_id": _id}) - return _id - except ( - ValidationError, - EngineException, - DbException, - MsgException, - FsException, - ) as e: - raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code) - - def _create_default_profile(self, profile_request, session): - profile_desc = { - "name": profile_request["name"], - "description": f"{self.topic} profile for cluster {profile_request['name']}", - "default": True, - "git_name": self.create_gitname(profile_request, session), - "state": "IN_CREATION", - "operatingState": "IN_PROGRESS", - "resourceState": "IN_PROGRESS.REQUEST_RECEIVED", - } - return profile_desc - def delete_list(self, session, filter_q=None): """ Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API @@ -801,6 +647,12 @@ class BaseTopic: """ pass + def delete_extra_before(self, session, _id, db_content, not_send_msg=None): + """ + Delete other things apart from database entry of a item _id. + """ + return {} + def delete(self, session, _id, dry_run=False, not_send_msg=None): """ Delete item by its internal _id @@ -820,26 +672,6 @@ class BaseTopic: item_content = self.db.get_one(self.topic, filter_q) nsd_id = item_content.get("_id") - if ( - self.topic == "k8sinfra_controller" - or self.topic == "k8sinfra_config" - or self.topic == "k8sapp" - or self.topic == "k8sresource" - or self.topic == "clusters" - ): - if "state" in item_content: - item_content["state"] = "IN_DELETION" - item_content["operatingState"] = "PROCESSING" - self.db.set_one(self.topic, {"_id": _id}, item_content) - - item_content_1 = self.db.get_one(self.topic, filter_q) - item_content_1["current_operation"] = None - op_id = self.format_on_operation( - item_content_1, - "delete", - None, - ) - self.check_conflict_on_del(session, _id, item_content) # While deteling ns descriptor associated ns config template should also get deleted. @@ -897,30 +729,25 @@ class BaseTopic: http_code=HTTPStatus.UNAUTHORIZED, ) # delete - if ( - self.topic == "k8sinfra_controller" - or self.topic == "k8sinfra_config" - or self.topic == "k8sapp" - or self.topic == "k8sresource" - ): - self.db.set_one(self.topic, {"_id": _id}, item_content_1) - self._send_msg( - "delete", - {"profile_id": _id, "operation_id": op_id}, - not_send_msg=not_send_msg, - ) - elif self.topic == "clusters": - self.db.set_one("clusters", {"_id": _id}, item_content_1) - self._send_msg( - "delete", - {"cluster_id": _id, "operation_id": op_id}, - not_send_msg=not_send_msg, - ) + different_message = self.delete_extra_before( + session, _id, item_content, not_send_msg=not_send_msg + ) + # self.db.del_one(self.topic, filter_q) + # self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg) + if different_message: + self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg) + self._send_msg("delete", different_message, not_send_msg=not_send_msg) else: self.db.del_one(self.topic, filter_q) self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg) self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg) - return _id + return None + + def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None): + """ + edit other things apart from database entry of a item _id. + """ + return {} def edit(self, session, _id, indata=None, kwargs=None, content=None): """ @@ -938,25 +765,6 @@ class BaseTopic: if kwargs: self._update_input_with_kwargs(indata, kwargs) try: - if ( - self.topic == "k8sinfra_controller" - or self.topic == "k8sinfra_config" - or self.topic == "k8sapp" - or self.topic == "k8sresource" - or self.topic == "clusters" - ): - check = self.db.get_one(self.topic, {"_id": _id}) - if self.topic != "clusters": - if check["default"] is True: - raise EngineException( - "Cannot edit default profiles", - HTTPStatus.UNPROCESSABLE_ENTITY, - ) - if "name" in indata: - if check["name"] == indata["name"]: - pass - else: - self.check_unique_name(session, indata["name"]) if indata and session.get("set_project"): raise EngineException( "Cannot edit content and set to project (query string SET_PROJECT) at same time", @@ -974,104 +782,48 @@ class BaseTopic: content = self.check_conflict_on_edit(session, content, indata, _id=_id) op_id = self.format_on_edit(content, indata) + self.logger.info(f"indata is : {indata}") + + different_message = self.edit_extra_before( + session, _id, indata, kwargs=None, content=None + ) + self.logger.info(f"different msg is : {different_message}") + self.db.replace(self.topic, _id, content) indata.pop("_admin", None) if op_id: indata["op_id"] = op_id indata["_id"] = _id - if ( - self.topic == "k8sinfra_controller" - or self.topic == "k8sinfra_config" - or self.topic == "k8sapp" - or self.topic == "k8sresource" - or self.topic == "clusters" - ): + + if different_message: + self.logger.info("It is getting into if") pass else: + self.logger.info("It is getting into else") self._send_msg("edited", indata) return op_id except ValidationError as e: raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) - def detach(self, session, _id, profile_type): - # To detach the profiles from every cluster - filter_q = {} - existing_clusters = self.db.get_list("clusters", filter_q) - existing_clusters_profiles = [ - profile["_id"] - for profile in existing_clusters - if profile.get("profile_type", _id) - ] - update_dict = None - for profile in existing_clusters_profiles: - filter_q = {"_id": profile} - data = self.db.get_one("clusters", filter_q) - if profile_type in data: - profile_ids = data[profile_type] - if _id in profile_ids: - profile_ids.remove(_id) - update_dict = {profile_type: profile_ids} - self.db.set_one("clusters", filter_q, update_dict) - - def _generate_age_key(self): - ident = x25519.Identity.generate() - # gets the public key - pubkey = str(ident.to_public()) - # gets the private key - privkey = str(ident) - # return both public and private key - return pubkey, privkey - - def add_to_old_collection(self, content, session): - self.logger.info(f"content is : {content}") - item = {} - item["name"] = content["name"] - item["credentials"] = {} - # item["k8s_version"] = content["k8s_version"] - if "k8s_version" in content: - item["k8s_version"] = content["k8s_version"] + def create_gitname(self, content, session, _id=None): + if not self.multiproject: + _filter = {} else: - item["k8s_version"] = None - vim_account_details = self.db.get_one( - "vim_accounts", {"name": content["vim_account"]} - ) - item["vim_account"] = vim_account_details["_id"] - item["nets"] = {"k8s_net1": None} - item["deployment_methods"] = {"juju-bundle": False, "helm-chart-v3": True} - # item["description"] = content["description"] - if "description" in content: - item["description"] = content["description"] + _filter = self._get_project_filter(session) + _filter["git_name"] = content["name"] + if _id: + _filter["_id.neq"] = _id + if self.db.get_one( + self.topic, _filter, fail_on_empty=False, fail_on_more=False + ): + n = 5 + # using random.choices() + # generating random strings + res = "".join(random.choices(string.ascii_lowercase + string.digits, k=n)) + res1 = content["name"] + new_name1 = res1 + res + new_name = new_name1.lower() + return new_name else: - item["description"] = None - item["namespace"] = "kube-system" - item["osm_acm"] = True - item["schema_version"] = "1.11" - self.logger.info(f"item is : {item}") - self.format_on_new(item, session["project_id"], make_public=session["public"]) - _id = self.db.create("k8sclusters", item) - self.logger.info(f"_id is : {_id}") - - item_1 = self.db.get_one("k8sclusters", {"name": item["name"]}) - - item_1["_admin"]["operationalState"] = "PROCESSING" - self.logger.info(f"content is : {item_1}") - - # Create operation data - now = time() - operation_data = { - "lcmOperationType": "create", # Assuming 'create' operation here - "operationState": "PROCESSING", - "startTime": now, - "statusEnteredTime": now, - "detailed-status": "", - "operationParams": None, # Add parameters as needed - } - - # create operation - item_1["_admin"]["operations"] = [operation_data] - item_1["_admin"]["current_operation"] = None - self.logger.info(f"content is : {item_1}") - self.db.set_one("k8sclusters", {"_id": item_1["_id"]}, item_1) - - return + return content["name"] diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index 02b3e14..ff65bbb 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -54,12 +54,12 @@ from osm_nbi.instance_topics import ( NsiLcmOpTopic, ) from osm_nbi.k8s_topics import ( - K8sTopic, + ClusterTopic, InfraContTopic, InfraConfTopic, AppTopic, ResourceTopic, - K8saddTopic, + ClusterOpsTopic, KsusTopic, OkaTopic, ) @@ -102,12 +102,12 @@ class Engine(object): "vnflcmops": VnfLcmOpTopic, "vnflcm_subscriptions": VnflcmSubscriptionsTopic, "nsconfigtemps": NsConfigTemplateTopic, - "k8s": K8sTopic, + "cluster": ClusterTopic, "infras_cont": InfraContTopic, "infras_conf": InfraConfTopic, "apps": AppTopic, "resources": ResourceTopic, - "k8sops": K8saddTopic, + "clusterops": ClusterOpsTopic, "ksus": KsusTopic, "oka_packages": OkaTopic, # [NEW_TOPIC]: add an entry here @@ -384,12 +384,29 @@ class Engine(object): :param api_req: True if this call is serving an external API request. False if serving internal request. :return: The list, it can be empty if no one match the filter_q. """ + self.logger.info("it is getting into item list") if topic not in self.map_topic: raise EngineException( "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR ) return self.map_topic[topic].list(session, filter_q, api_req) + def get_item_list_cluster(self, session, topic, filter_q=None, api_req=False): + """ + Get a list of items + :param session: contains the used login username and working project + :param topic: it can be: users, projects, vnfds, nsds, ... + :param filter_q: filter of data to be applied + :param api_req: True if this call is serving an external API request. False if serving internal request. + :return: The list, it can be empty if no one match the filter_q. + """ + self.logger.info("it is getting into item list cluster") + if topic not in self.map_topic: + raise EngineException( + "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR + ) + return self.map_topic[topic].list_both(session, filter_q, api_req) + def get_item(self, session, topic, _id, filter_q=None, api_req=False): """ Get complete information on an item diff --git a/osm_nbi/k8s_topics.py b/osm_nbi/k8s_topics.py index 83922e4..9519f2d 100644 --- a/osm_nbi/k8s_topics.py +++ b/osm_nbi/k8s_topics.py @@ -21,6 +21,7 @@ from http import HTTPStatus from time import time from osm_nbi.base_topic import BaseTopic, EngineException +from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic from osm_nbi.descriptor_topics import DescriptorTopic from osm_nbi.validation import ( @@ -37,7 +38,6 @@ from osm_nbi.validation import ( infra_config_profile_create_edit_schema, app_profile_create_edit_schema, resource_profile_create_edit_schema, - # k8scluster_new_schema, clusterregistration_new_schema, attach_dettach_profile_schema, ksu_schema, @@ -53,7 +53,7 @@ __author__ = ( ) -class InfraContTopic(BaseTopic): +class InfraContTopic(ProfileTopic): topic = "k8sinfra_controller" topic_msg = "k8s_infra_controller" schema_new = infra_controller_profile_create_new_schema @@ -71,20 +71,11 @@ class InfraContTopic(BaseTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): - item_content = self.db.get_one(self.topic, {"_id": _id}) - if item_content.get("default", False): - raise EngineException( - "Cannot delete item because it is marked as default", - http_code=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - # Before deleting, detach the profile from the associated clusters. - self.detach(session, _id, profile_type="infra_controller_profiles") - # To delete the infra controller profile - super().delete(session, _id, not_send_msg=not_send_msg) + self.delete_profile(session, _id, dry_run=False, not_send_msg=None) return _id -class InfraConfTopic(BaseTopic): +class InfraConfTopic(ProfileTopic): topic = "k8sinfra_config" topic_msg = "k8s_infra_config" schema_new = infra_config_profile_create_new_schema @@ -102,20 +93,11 @@ class InfraConfTopic(BaseTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): - item_content = self.db.get_one(self.topic, {"_id": _id}) - if item_content.get("default", False): - raise EngineException( - "Cannot delete item because it is marked as default", - http_code=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - # Before deleting, detach the profile from the associated clusters. - self.detach(session, _id, profile_type="infra_config_profiles") - # To delete the infra config profile - super().delete(session, _id, not_send_msg=not_send_msg) + self.delete_profile(session, _id, dry_run=False, not_send_msg=None) return _id -class AppTopic(BaseTopic): +class AppTopic(ProfileTopic): topic = "k8sapp" topic_msg = "k8s_app" schema_new = app_profile_create_new_schema @@ -133,20 +115,11 @@ class AppTopic(BaseTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): - item_content = self.db.get_one(self.topic, {"_id": _id}) - if item_content.get("default", False): - raise EngineException( - "Cannot delete item because it is marked as default", - http_code=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - # Before deleting, detach the profile from the associated clusters. - self.detach(session, _id, profile_type="app_profiles") - # To delete the app profile - super().delete(session, _id, not_send_msg=not_send_msg) + self.delete_profile(session, _id, dry_run=False, not_send_msg=None) return _id -class ResourceTopic(BaseTopic): +class ResourceTopic(ProfileTopic): topic = "k8sresource" topic_msg = "k8s_resource" schema_new = resource_profile_create_new_schema @@ -164,27 +137,18 @@ class ResourceTopic(BaseTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): - item_content = self.db.get_one(self.topic, {"_id": _id}) - if item_content.get("default", False): - raise EngineException( - "Cannot delete item because it is marked as default", - http_code=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - # Before deleting, detach the profile from the associated clusters. - self.detach(session, _id, profile_type="resource_profiles") - # To delete the resource profile - super().delete(session, _id, not_send_msg=not_send_msg) + self.delete_profile(session, _id, dry_run=False, not_send_msg=None) return _id -class K8sTopic(BaseTopic): +class ClusterTopic(ACMTopic): topic = "clusters" topic_msg = "cluster" schema_new = clustercreation_new_schema schema_edit = attach_dettach_profile_schema def __init__(self, db, fs, msg, auth): - BaseTopic.__init__(self, db, fs, msg, auth) + super().__init__(db, fs, msg, auth) self.infra_contr_topic = InfraContTopic(db, fs, msg, auth) self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth) self.resource_topic = ResourceTopic(db, fs, msg, auth) @@ -192,8 +156,7 @@ class K8sTopic(BaseTopic): @staticmethod def format_on_new(content, project_id=None, make_public=False): - # logger.info("it is getting into format on new in new fon") - BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public) + ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public) content["current_operation"] = None def new(self, rollback, session, indata=None, kwargs=None, headers=None): @@ -212,7 +175,8 @@ class K8sTopic(BaseTopic): try: self.check_quota(session) step = "name unique check" - self.check_unique_name(session, indata["name"]) + # self.check_unique_name(session, indata["name"]) + self.cluster_unique_name_check(session, indata["name"]) step = "validating input parameters" cls_request = self._remove_envelop(indata) self._update_input_with_kwargs(cls_request, kwargs) @@ -563,18 +527,33 @@ class K8sTopic(BaseTopic): self._send_msg(item, data) return op_id + def delete_extra_before(self, session, _id, db_content, not_send_msg=None): + op_id = self.common_delete(_id, db_content) + return {"cluster_id": _id, "operation_id": op_id} + + def delete(self, session, _id, dry_run=False, not_send_msg=None): + filter_q = self._get_project_filter(session) + filter_q[self.id_field(self.topic, _id)] = _id + check = self.db.get_one(self.topic, filter_q) + if check["created"] == "false": + raise EngineException( + "Cannot delete registered cluster", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + super().delete(session, _id, dry_run=False, not_send_msg=None) -class K8saddTopic(BaseTopic): + +class ClusterOpsTopic(ACMTopic): topic = "clusters" topic_msg = "cluster" schema_new = clusterregistration_new_schema def __init__(self, db, fs, msg, auth): - BaseTopic.__init__(self, db, fs, msg, auth) + super().__init__(db, fs, msg, auth) @staticmethod def format_on_new(content, project_id=None, make_public=False): - BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public) + ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public) content["current_operation"] = None def add(self, rollback, session, indata, kwargs=None, headers=None): @@ -582,7 +561,8 @@ class K8saddTopic(BaseTopic): try: self.check_quota(session) step = "name unique check" - self.check_unique_name(session, indata["name"]) + self.cluster_unique_name_check(session, indata["name"]) + # self.check_unique_name(session, indata["name"]) step = "validating input parameters" cls_add_request = self._remove_envelop(indata) self._update_input_with_kwargs(cls_add_request, kwargs) @@ -722,7 +702,7 @@ class K8saddTopic(BaseTopic): return None -class KsusTopic(BaseTopic): +class KsusTopic(ACMTopic): topic = "ksus" okapkg_topic = "okas" infra_topic = "k8sinfra" @@ -731,7 +711,7 @@ class KsusTopic(BaseTopic): schema_edit = ksu_schema def __init__(self, db, fs, msg, auth): - BaseTopic.__init__(self, db, fs, msg, auth) + super().__init__(db, fs, msg, auth) self.logger = logging.getLogger("nbi.ksus") @staticmethod @@ -881,7 +861,7 @@ class KsusTopic(BaseTopic): @staticmethod def format_on_edit(final_content, edit_content): - op_id = BaseTopic.format_on_operation( + op_id = ACMTopic.format_on_operation( final_content, "update", edit_content, @@ -1029,7 +1009,7 @@ class KsusTopic(BaseTopic): return op_id, not_send_msg -class OkaTopic(DescriptorTopic): +class OkaTopic(DescriptorTopic, ACMOperationTopic): topic = "okas" topic_msg = "oka" schema_new = oka_schema diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index e15aba7..77c009f 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -1717,9 +1717,9 @@ class Server(object): elif main_topic == "pdu": engine_topic = "pdus" elif main_topic == "k8scluster": - engine_topic = "k8s" + engine_topic = "cluster" if topic == "clusters" and _id == "register" or item == "deregister": - engine_topic = "k8sops" + engine_topic = "clusterops" elif topic == "infra_controller_profiles": engine_topic = "infras_cont" elif topic == "infra_config_profiles": @@ -1733,7 +1733,7 @@ class Server(object): "get_creds", "scale", ): - engine_topic = "k8s" + engine_topic = "cluster" elif main_topic == "ksu" and engine_topic in ("ksus", "clone", "move"): engine_topic = "ksus" if ( @@ -1771,10 +1771,10 @@ class Server(object): cherrypy.request.headers.get("Accept"), ) outdata = file - elif not _id: - outdata = self.engine.get_item_list( - engine_session, engine_topic, kwargs, api_req=True - ) + # elif not _id and topic != "clusters": + # outdata = self.engine.get_item_list( + # engine_session, engine_topic, kwargs, api_req=True + # ) elif topic == "clusters" and item in ( "infra_controller_profiles", "infra_config_profiles", @@ -1807,6 +1807,14 @@ class Server(object): engine_session, engine_topic, _id, item ) outdata = {"op_id": op_id} + elif topic == "clusters" and not _id: + outdata = self.engine.get_item_list_cluster( + engine_session, engine_topic, kwargs, api_req=True + ) + elif not _id: + outdata = self.engine.get_item_list( + engine_session, engine_topic, kwargs, api_req=True + ) else: if item == "reports": # TODO check that project_id (_id in this context) has permissions -- 2.25.1