NBI ACM code refactor
Change-Id: I1957f32b3120903ac978e353c05408fc00b3fe66
Signed-off-by: shrinithi <shrinithi.r@tataelxsi.co.in>
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
diff --git a/osm_nbi/acm_topic.py b/osm_nbi/acm_topic.py
new file mode 100644
index 0000000..87087b7
--- /dev/null
+++ b/osm_nbi/acm_topic.py
@@ -0,0 +1,342 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyrage import x25519
+from uuid import uuid4
+
+from http import HTTPStatus
+from time import time
+
+# from osm_common.dbbase import deep_update_rfc7396, DbException
+from osm_common.msgbase import MsgException
+from osm_common.dbbase import DbException
+from osm_common.fsbase import FsException
+from osm_nbi.base_topic import BaseTopic, EngineException
+from osm_nbi.validation import ValidationError
+
+# import logging
+# import random
+# import string
+# from yaml import safe_load, YAMLError
+
+
+class ACMOperationTopic:
+ def __init__(self, db, fs, msg, auth):
+ self.multiproject = None # Declare the attribute here
+
+ @staticmethod
+ def format_on_operation(content, operation_type, operation_params=None):
+ op_id = str(uuid4())
+ now = time()
+ if "operationHistory" not in content:
+ content["operationHistory"] = []
+
+ operation = {}
+ operation["operationType"] = operation_type
+ operation["op_id"] = op_id
+ operation["result"] = None
+ operation["creationDate"] = now
+ operation["endDate"] = None
+ operation["workflowState"] = operation["resourceState"] = operation[
+ "operationState"
+ ] = operation["gitOperationInfo"] = None
+ operation["operationParams"] = operation_params
+
+ content["operationHistory"].append(operation)
+ return op_id
+
+
+class ACMTopic(BaseTopic, ACMOperationTopic):
+ def __init__(self, db, fs, msg, auth):
+ super().__init__(db, fs, msg, auth)
+ # ACMOperationTopic.__init__(db, fs, msg, auth)
+
+ def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None):
+ step = "name unique check"
+ try:
+ self.check_unique_name(session, indata["name"])
+
+ step = "validating input parameters"
+ profile_request = self._remove_envelop(indata)
+ self._update_input_with_kwargs(profile_request, kwargs)
+ profile_request = self._validate_input_new(
+ profile_request, session["force"]
+ )
+ operation_params = profile_request
+
+ step = "filling profile details from input data"
+ profile_create = self._create_profile(profile_request, session)
+
+ step = "creating profile at database"
+ self.format_on_new(
+ profile_create, session["project_id"], make_public=session["public"]
+ )
+ profile_create["current_operation"] = None
+ op_id = ACMOperationTopic.format_on_operation(
+ profile_create,
+ "create",
+ operation_params,
+ )
+
+ _id = self.db.create(self.topic, profile_create)
+ pubkey, privkey = self._generate_age_key()
+ profile_create["age_pubkey"] = self.db.encrypt(
+ pubkey, schema_version="1.11", salt=_id
+ )
+ profile_create["age_privkey"] = self.db.encrypt(
+ privkey, schema_version="1.11", salt=_id
+ )
+ rollback.append({"topic": self.topic, "_id": _id})
+ self.db.set_one(self.topic, {"_id": _id}, profile_create)
+ if op_id:
+ profile_create["op_id"] = op_id
+ self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id})
+
+ return _id, None
+ except (
+ ValidationError,
+ EngineException,
+ DbException,
+ MsgException,
+ FsException,
+ ) as e:
+ raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
+
+ def _create_profile(self, profile_request, session):
+ profile_desc = {
+ "name": profile_request["name"],
+ "description": profile_request["description"],
+ "default": False,
+ "git_name": self.create_gitname(profile_request, session),
+ "state": "IN_CREATION",
+ "operatingState": "IN_PROGRESS",
+ "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
+ }
+ return profile_desc
+
+ def default_profile(
+ self, rollback, session, indata=None, kwargs=None, headers=None
+ ):
+ step = "validating input parameters"
+ try:
+ profile_request = self._remove_envelop(indata)
+ self._update_input_with_kwargs(profile_request, kwargs)
+ operation_params = profile_request
+
+ step = "filling profile details from input data"
+ profile_create = self._create_default_profile(profile_request, session)
+
+ step = "creating profile at database"
+ self.format_on_new(
+ profile_create, session["project_id"], make_public=session["public"]
+ )
+ profile_create["current_operation"] = None
+ ACMOperationTopic.format_on_operation(
+ profile_create,
+ "create",
+ operation_params,
+ )
+ _id = self.db.create(self.topic, profile_create)
+ rollback.append({"topic": self.topic, "_id": _id})
+ return _id
+ except (
+ ValidationError,
+ EngineException,
+ DbException,
+ MsgException,
+ FsException,
+ ) as e:
+ raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
+
+ def _create_default_profile(self, profile_request, session):
+ profile_desc = {
+ "name": profile_request["name"],
+ "description": f"{self.topic} profile for cluster {profile_request['name']}",
+ "default": True,
+ "git_name": self.create_gitname(profile_request, session),
+ "state": "IN_CREATION",
+ "operatingState": "IN_PROGRESS",
+ "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
+ }
+ return profile_desc
+
+ def detach(self, session, _id, profile_type):
+ # To detach the profiles from every cluster
+ filter_q = {}
+ existing_clusters = self.db.get_list("clusters", filter_q)
+ existing_clusters_profiles = [
+ profile["_id"]
+ for profile in existing_clusters
+ if profile.get("profile_type", _id)
+ ]
+ update_dict = None
+ for profile in existing_clusters_profiles:
+ filter_q = {"_id": profile}
+ data = self.db.get_one("clusters", filter_q)
+ if profile_type in data:
+ profile_ids = data[profile_type]
+ if _id in profile_ids:
+ profile_ids.remove(_id)
+ update_dict = {profile_type: profile_ids}
+ self.db.set_one("clusters", filter_q, update_dict)
+
+ def _generate_age_key(self):
+ ident = x25519.Identity.generate()
+ # gets the public key
+ pubkey = str(ident.to_public())
+ # gets the private key
+ privkey = str(ident)
+ # return both public and private key
+ return pubkey, privkey
+
+ def common_delete(self, _id, db_content):
+ if "state" in db_content:
+ db_content["state"] = "IN_DELETION"
+ db_content["operatingState"] = "PROCESSING"
+ # self.db.set_one(self.topic, {"_id": _id}, db_content)
+
+ db_content["current_operation"] = None
+ op_id = ACMOperationTopic.format_on_operation(
+ db_content,
+ "delete",
+ None,
+ )
+ self.db.set_one(self.topic, {"_id": _id}, db_content)
+ return op_id
+
+ def add_to_old_collection(self, content, session):
+ item = {}
+ item["name"] = content["name"]
+ item["credentials"] = {}
+ # item["k8s_version"] = content["k8s_version"]
+ if "k8s_version" in content:
+ item["k8s_version"] = content["k8s_version"]
+ else:
+ item["k8s_version"] = None
+ vim_account_details = self.db.get_one(
+ "vim_accounts", {"name": content["vim_account"]}
+ )
+ item["vim_account"] = vim_account_details["_id"]
+ item["nets"] = {"k8s_net1": None}
+ item["deployment_methods"] = {"juju-bundle": False, "helm-chart-v3": True}
+ # item["description"] = content["description"]
+ if "description" in content:
+ item["description"] = content["description"]
+ else:
+ item["description"] = None
+ item["namespace"] = "kube-system"
+ item["osm_acm"] = True
+ item["schema_version"] = "1.11"
+ self.format_on_new(item, session["project_id"], make_public=session["public"])
+ _id = self.db.create("k8sclusters", item)
+ self.logger.info(f"_id is : {_id}")
+ item_1 = self.db.get_one("k8sclusters", {"name": item["name"]})
+ item_1["_admin"]["operationalState"] = "PROCESSING"
+
+ # Create operation data
+ now = time()
+ operation_data = {
+ "lcmOperationType": "create", # Assuming 'create' operation here
+ "operationState": "PROCESSING",
+ "startTime": now,
+ "statusEnteredTime": now,
+ "detailed-status": "",
+ "operationParams": None, # Add parameters as needed
+ }
+ # create operation
+ item_1["_admin"]["operations"] = [operation_data]
+ item_1["_admin"]["current_operation"] = None
+ self.logger.info(f"content is : {item_1}")
+ self.db.set_one("k8sclusters", {"_id": item_1["_id"]}, item_1)
+ return
+
+ def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
+ check = self.db.get_one(self.topic, {"_id": _id})
+ if self.topic != "clusters":
+ if check["default"] is True:
+ raise EngineException(
+ "Cannot edit default profiles",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ if "name" in indata:
+ if check["name"] == indata["name"]:
+ pass
+ else:
+ self.check_unique_name(session, indata["name"])
+
+ return True
+
+ def cluster_unique_name_check(self, session, name):
+ self.check_unique_name(session, name)
+ _filter = {"name": name}
+ topic = "k8sclusters"
+ if self.db.get_one(topic, _filter, fail_on_empty=False, fail_on_more=False):
+ raise EngineException(
+ "name '{}' already exists".format(name),
+ HTTPStatus.CONFLICT,
+ )
+
+ def list_both(self, session, filter_q=None, api_req=False):
+ """List all clusters from both new and old APIs"""
+ if not filter_q:
+ filter_q = {}
+ if self.multiproject:
+ filter_q.update(self._get_project_filter(session))
+ cluster_list1 = self.db.get_list(self.topic, filter_q)
+ cluster_list2 = self.db.get_list("k8sclusters", filter_q)
+ list1_names = {item["name"] for item in cluster_list1}
+ for item in cluster_list2:
+ if item["name"] not in list1_names:
+ # Complete the information for clusters from old API
+ item["state"] = "N/A"
+ old_state = item.get("_admin", {}).get("operationalState", "Unknown")
+ item["bootstrap"] = "NO"
+ item["operatingState"] = "N/A"
+ item["resourceState"] = old_state
+ item["created"] = "NO"
+ cluster_list1.append(item)
+ if api_req:
+ cluster_list1 = [self.sol005_projection(inst) for inst in cluster_list1]
+ return cluster_list1
+
+
+class ProfileTopic(ACMTopic):
+ profile_topic_map = {
+ "k8sapp": "app_profiles",
+ "k8sresource": "resource_profiles",
+ "k8sinfra_controller": "infra_controller_profiles",
+ "k8sinfra_config": "infra_config_profiles",
+ }
+
+ def __init__(self, db, fs, msg, auth):
+ super().__init__(db, fs, msg, auth)
+
+ def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
+ op_id = self.common_delete(_id, db_content)
+ return {"profile_id": _id, "operation_id": op_id}
+
+ def delete_profile(self, session, _id, dry_run=False, not_send_msg=None):
+ item_content = self.db.get_one(self.topic, {"_id": _id})
+ if item_content.get("default", False):
+ raise EngineException(
+ "Cannot delete item because it is marked as default",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ # Before deleting, detach the profile from the associated clusters.
+ profile_type = self.profile_topic_map[self.topic]
+ self.detach(session, _id, profile_type)
+ # To delete the infra controller profile
+ super().delete(session, _id, not_send_msg=not_send_msg)
+ return _id