Feature: 11055 Support of several node groups in clusters created by OSM​
Change-Id: I3f18708c33bb37cec77a05417b56fb5fba3dfda6
Signed-off-by: yshah <shahithya.y@tataelxsi.co.in>
diff --git a/osm_nbi/k8s_topics.py b/osm_nbi/k8s_topics.py
index 90b85c9..496b737 100644
--- a/osm_nbi/k8s_topics.py
+++ b/osm_nbi/k8s_topics.py
@@ -42,6 +42,8 @@
attach_dettach_profile_schema,
ksu_schema,
oka_schema,
+ node_create_new_schema,
+ node_edit_schema,
)
from osm_common.dbbase import deep_update_rfc7396, DbException
from osm_common.msgbase import MsgException
@@ -334,7 +336,7 @@
if not version:
content["k8s_version"] = "1.28"
content["node_count"] = indata.get("node_count", 0)
- content["ksu_count"] = "0"
+ content["ksu_count"] = 0
self.logger.info(f"cotent is : {content}")
return content
@@ -583,7 +585,7 @@
HTTPStatus.UNPROCESSABLE_ENTITY,
)
- def update_cluster(self, session, _id, item, indata):
+ def update_item(self, session, _id, item, indata):
if not self.multiproject:
filter_db = {}
else:
@@ -624,6 +626,184 @@
return op_id
+class NodeGroupTopic(ACMTopic):
+ topic = "nodegroups"
+ topic_msg = "nodegroup"
+ schema_new = node_create_new_schema
+ schema_edit = node_edit_schema
+
+ def __init__(self, db, fs, msg, auth):
+ BaseTopic.__init__(self, db, fs, msg, auth)
+
+ @staticmethod
+ def format_on_new(content, project_id=None, make_public=False):
+ BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
+ content["current_operation"] = None
+ content["state"] = "IN_CREATION"
+ content["operatingState"] = "PROCESSING"
+ content["resourceState"] = "IN_PROGRESS"
+
+ def new(self, rollback, session, indata=None, kwargs=None, headers=None):
+ self.logger.info(f"Indata: {indata}")
+ self.check_unique_name(session, indata["name"])
+
+ indata = self._remove_envelop(indata)
+ self._update_input_with_kwargs(indata, kwargs)
+ if not indata.get("private_subnet") and not indata.get("public_subnet"):
+ raise EngineException(
+ "Please provide atleast one subnet",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ content = self._validate_input_new(indata, session["force"])
+
+ self.logger.info(f"Indata: {indata}")
+ self.logger.info(f"Content: {content}")
+ cluster_id = content["cluster_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ private_subnet = db_cluster.get("private_subnet")
+ public_subnet = db_cluster.get("public_subnet")
+ if content.get("private_subnet"):
+ for subnet in content["private_subnet"]:
+ if subnet not in private_subnet:
+ raise EngineException(
+ "No External subnet is used to add nodegroup",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ if content.get("public_subnet"):
+ for subnet in content["public_subnet"]:
+ if subnet not in public_subnet:
+ raise EngineException(
+ "No External subnet is used to add nodegroup",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ operation_params = {}
+ for content_key, content_value in content.items():
+ operation_params[content_key] = content_value
+ self.format_on_new(
+ content, session["project_id"], make_public=session["public"]
+ )
+ content["git_name"] = self.create_gitname(content, session)
+ self.logger.info(f"Operation Params: {operation_params}")
+ op_id = self.format_on_operation(
+ content,
+ "create",
+ operation_params,
+ )
+ node_count = db_cluster.get("node_count")
+ new_node_count = node_count + 1
+ self.logger.info(f"New Node count: {new_node_count}")
+ db_cluster["node_count"] = new_node_count
+ self.db.set_one("clusters", {"_id": cluster_id}, db_cluster)
+ _id = self.db.create(self.topic, content)
+ self._send_msg("add_nodegroup", {"nodegroup_id": _id, "operation_id": op_id})
+ return _id, op_id
+
+ def list(self, session, filter_q=None, api_req=False):
+ db_filter = {}
+ if filter_q.get("cluster_id"):
+ db_filter["cluster_id"] = filter_q.get("cluster_id")
+ data_list = self.db.get_list(self.topic, db_filter)
+ cluster_data = self.db.get_one("clusters", {"_id": db_filter["cluster_id"]})
+ self.logger.info(f"Cluster Data: {cluster_data}")
+ self.logger.info(f"Data: {data_list}")
+ if filter_q.get("cluster_id"):
+ outdata = {}
+ outdata["count"] = cluster_data["node_count"]
+ outdata["data"] = data_list
+ self.logger.info(f"Outdata: {outdata}")
+ return outdata
+ if api_req:
+ data_list = [self.sol005_projection(inst) for inst in data_list]
+ return data_list
+
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
+ if not self.multiproject:
+ filter_q = {}
+ else:
+ filter_q = self._get_project_filter(session)
+ filter_q[self.id_field(self.topic, _id)] = _id
+ item_content = self.db.get_one(self.topic, filter_q)
+ item_content["state"] = "IN_DELETION"
+ item_content["operatingState"] = "PROCESSING"
+ item_content["resourceState"] = "IN_PROGRESS"
+ self.check_conflict_on_del(session, _id, item_content)
+ op_id = self.format_on_operation(
+ item_content,
+ "delete",
+ None,
+ )
+ self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
+ self._send_msg(
+ "delete_nodegroup",
+ {"nodegroup_id": _id, "operation_id": op_id},
+ not_send_msg=not_send_msg,
+ )
+ return op_id
+
+ def update_item(self, session, _id, item, indata):
+ content = None
+ try:
+ if not content:
+ content = self.db.get_one(self.topic, {"_id": _id})
+ indata = self._validate_input_edit(indata, content, force=session["force"])
+ _id = content.get("_id") or _id
+
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+ op_id = self.format_on_edit(content, indata)
+ op_id = ACMTopic.format_on_operation(
+ content,
+ "scale",
+ indata,
+ )
+ self.logger.info(f"op_id: {op_id}")
+ content["operatingState"] = "PROCESSING"
+ content["resourceState"] = "IN_PROGRESS"
+ self.db.replace(self.topic, _id, content)
+ self._send_msg(
+ "scale_nodegroup", {"nodegroup_id": _id, "operation_id": op_id}
+ )
+ return op_id
+ except ValidationError as e:
+ raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ def edit(self, session, _id, indata, kwargs):
+ content = None
+
+ # Override descriptor with query string kwargs
+ if kwargs:
+ self._update_input_with_kwargs(indata, kwargs)
+ try:
+ if indata and session.get("set_project"):
+ raise EngineException(
+ "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ # TODO self._check_edition(session, indata, _id, force)
+ if not content:
+ content = self.db.get_one(self.topic, {"_id": _id})
+
+ indata = self._validate_input_edit(indata, content, force=session["force"])
+ self.logger.info(f"Indata: {indata}")
+
+ # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
+ _id = content.get("_id") or _id
+
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+ if "name" in indata and "description" in indata:
+ content["name"] = indata["name"]
+ content["description"] = indata["description"]
+ elif "name" in indata:
+ content["name"] = indata["name"]
+ elif "description" in indata:
+ content["description"] = indata["description"]
+ op_id = self.format_on_edit(content, indata)
+ self.db.set_one(self.topic, {"_id": _id}, content)
+ return op_id
+ except ValidationError as e:
+ raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+
class ClusterOpsTopic(ACMTopic):
topic = "clusters"
topic_msg = "cluster"
@@ -845,6 +1025,12 @@
topic_msg = "ksu"
schema_new = ksu_schema
schema_edit = ksu_schema
+ MAP_PROFILE = {
+ "infra_controller_profiles": "infra-controllers",
+ "infra_config_profiles": "infra-configs",
+ "resource_profiles": "managed_resources",
+ "app_profiles": "apps",
+ }
def __init__(self, db, fs, msg, auth):
super().__init__(db, fs, msg, auth)
@@ -914,6 +1100,21 @@
if "_id" in okas.keys():
self.update_usage_state(session, okas)
+ profile_id = content["profile"].get("_id")
+ profile_type = content["profile"].get("profile_type")
+ db_cluster_list = self.db.get_list("clusters")
+ for db_cluster in db_cluster_list:
+ if db_cluster.get("created") == "true":
+ profile_list = db_cluster[profile_type]
+ if profile_id in profile_list:
+ ksu_count = db_cluster.get("ksu_count")
+ new_ksu_count = ksu_count + 1
+ self.logger.info(f"New KSU count: {new_ksu_count}")
+ db_cluster["ksu_count"] = new_ksu_count
+ self.db.set_one(
+ "clusters", {"_id": db_cluster["_id"]}, db_cluster
+ )
+
_id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": _id})
_id_list.append(_id)
@@ -1026,6 +1227,52 @@
data = {"ksus_list": _id_list, "operation_id": op_id}
self._send_msg("edit", data)
+ def cluster_list_ksu(self, session, filter_q=None, api_req=None):
+ db_filter = {}
+ if filter_q.get("cluster_id"):
+ db_filter["_id"] = filter_q.get("cluster_id")
+ ksu_data_list = []
+
+ cluster_data = self.db.get_one("clusters", db_filter)
+ profiles_list = [
+ "infra_controller_profiles",
+ "infra_config_profiles",
+ "app_profiles",
+ "resource_profiles",
+ ]
+ for profile in profiles_list:
+ data_list = []
+ for profile_id in cluster_data[profile]:
+ filter_q = {"profile": {"_id": profile_id, "profile_type": profile}}
+ data_list = self.db.get_list(self.topic, filter_q)
+ for ksu_data in data_list:
+ ksu_data["package_name"] = []
+ ksu_data["package_path"] = []
+ for okas in ksu_data["operationHistory"][0]["operationParams"]["oka"]:
+ sw_catalog_path = okas.get("sw_catalog_path")
+ if sw_catalog_path:
+ parts = sw_catalog_path.rsplit("/", 2)
+ self.logger.info(f"Parts: {parts}")
+ ksu_data["package_name"].append(parts[-2])
+ ksu_data["package_path"].append("/".join(parts[:-1]))
+ else:
+ oka_id = okas["_id"]
+ db_oka = self.db.get_one("okas", {"_id": oka_id})
+ oka_type = self.MAP_PROFILE[
+ db_oka.get("profile_type", "infra_controller_profiles")
+ ]
+ ksu_data["package_name"].append(db_oka["git_name"].lower())
+ ksu_data["package_path"].append(
+ f"{oka_type}/{db_oka['git_name'].lower()}"
+ )
+ ksu_data_list.append(ksu_data)
+
+ outdata = {}
+ outdata["count"] = cluster_data["ksu_count"]
+ outdata["data"] = ksu_data_list
+ self.logger.info(f"Outdata: {outdata}")
+ return outdata
+
def edit_ksu(self, session, _id, indata, kwargs):
content = None
indata = self._remove_envelop(indata)