From: yshah Date: Fri, 13 Jun 2025 05:49:31 +0000 (+0000) Subject: Feature: 11055 Support of several node groups in clusters created by OSM​ X-Git-Tag: v18.0.0~6 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=d23c6a59bab2a03bfbf4db963175b1e4ff4c37b3;p=osm%2FNBI.git Feature: 11055 Support of several node groups in clusters created by OSM​ Change-Id: I3f18708c33bb37cec77a05417b56fb5fba3dfda6 Signed-off-by: yshah --- diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index ff65bbb..ffbb07d 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -62,6 +62,7 @@ from osm_nbi.k8s_topics import ( ClusterOpsTopic, KsusTopic, OkaTopic, + NodeGroupTopic, ) from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic from osm_nbi.pmjobs_topics import PmJobsTopic @@ -110,6 +111,7 @@ class Engine(object): "clusterops": ClusterOpsTopic, "ksus": KsusTopic, "oka_packages": OkaTopic, + "node_groups": NodeGroupTopic, # [NEW_TOPIC]: add an entry here # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters } @@ -358,12 +360,12 @@ class Engine(object): ) return self.map_topic[topic].get_cluster_creds(session, _id, item) - def update_cluster(self, session, topic, _id, item, indata): + def update_item(self, session, topic, _id, item, indata): if topic not in self.map_topic: raise EngineException( "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR ) - return self.map_topic[topic].update_cluster(session, _id, item, indata) + return self.map_topic[topic].update_item(session, _id, item, indata) def delete_ksu(self, session, topic, _id, indata): if topic not in self.map_topic: @@ -456,6 +458,22 @@ class Engine(object): ) return self.map_topic[topic].get_file(session, _id, path, accept_header) + def get_cluster_list_ksu(self, session, topic, filter_q=None, api_req=False): + """ + Get a list of items + :param session: contains the used login username and working project + :param topic: it can be: users, projects, vnfds, nsds, ... + :param filter_q: filter of data to be applied + :param api_req: True if this call is serving an external API request. False if serving internal request. + :return: The list, it can be empty if no one match the filter_q. + """ + self.logger.info("it is getting into item list cluster") + if topic not in self.map_topic: + raise EngineException( + "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR + ) + return self.map_topic[topic].cluster_list_ksu(session, filter_q, api_req) + def del_item_list(self, session, topic, _filter=None): """ Delete a list of items diff --git a/osm_nbi/k8s_topics.py b/osm_nbi/k8s_topics.py index 90b85c9..496b737 100644 --- a/osm_nbi/k8s_topics.py +++ b/osm_nbi/k8s_topics.py @@ -42,6 +42,8 @@ from osm_nbi.validation import ( attach_dettach_profile_schema, ksu_schema, oka_schema, + node_create_new_schema, + node_edit_schema, ) from osm_common.dbbase import deep_update_rfc7396, DbException from osm_common.msgbase import MsgException @@ -334,7 +336,7 @@ class ClusterTopic(ACMTopic): if not version: content["k8s_version"] = "1.28" content["node_count"] = indata.get("node_count", 0) - content["ksu_count"] = "0" + content["ksu_count"] = 0 self.logger.info(f"cotent is : {content}") return content @@ -583,7 +585,7 @@ class ClusterTopic(ACMTopic): HTTPStatus.UNPROCESSABLE_ENTITY, ) - def update_cluster(self, session, _id, item, indata): + def update_item(self, session, _id, item, indata): if not self.multiproject: filter_db = {} else: @@ -624,6 +626,184 @@ class ClusterTopic(ACMTopic): return op_id +class NodeGroupTopic(ACMTopic): + topic = "nodegroups" + topic_msg = "nodegroup" + schema_new = node_create_new_schema + schema_edit = node_edit_schema + + def __init__(self, db, fs, msg, auth): + BaseTopic.__init__(self, db, fs, msg, auth) + + @staticmethod + def format_on_new(content, project_id=None, make_public=False): + BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public) + content["current_operation"] = None + content["state"] = "IN_CREATION" + content["operatingState"] = "PROCESSING" + content["resourceState"] = "IN_PROGRESS" + + def new(self, rollback, session, indata=None, kwargs=None, headers=None): + self.logger.info(f"Indata: {indata}") + self.check_unique_name(session, indata["name"]) + + indata = self._remove_envelop(indata) + self._update_input_with_kwargs(indata, kwargs) + if not indata.get("private_subnet") and not indata.get("public_subnet"): + raise EngineException( + "Please provide atleast one subnet", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + content = self._validate_input_new(indata, session["force"]) + + self.logger.info(f"Indata: {indata}") + self.logger.info(f"Content: {content}") + cluster_id = content["cluster_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + private_subnet = db_cluster.get("private_subnet") + public_subnet = db_cluster.get("public_subnet") + if content.get("private_subnet"): + for subnet in content["private_subnet"]: + if subnet not in private_subnet: + raise EngineException( + "No External subnet is used to add nodegroup", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + if content.get("public_subnet"): + for subnet in content["public_subnet"]: + if subnet not in public_subnet: + raise EngineException( + "No External subnet is used to add nodegroup", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + operation_params = {} + for content_key, content_value in content.items(): + operation_params[content_key] = content_value + self.format_on_new( + content, session["project_id"], make_public=session["public"] + ) + content["git_name"] = self.create_gitname(content, session) + self.logger.info(f"Operation Params: {operation_params}") + op_id = self.format_on_operation( + content, + "create", + operation_params, + ) + node_count = db_cluster.get("node_count") + new_node_count = node_count + 1 + self.logger.info(f"New Node count: {new_node_count}") + db_cluster["node_count"] = new_node_count + self.db.set_one("clusters", {"_id": cluster_id}, db_cluster) + _id = self.db.create(self.topic, content) + self._send_msg("add_nodegroup", {"nodegroup_id": _id, "operation_id": op_id}) + return _id, op_id + + def list(self, session, filter_q=None, api_req=False): + db_filter = {} + if filter_q.get("cluster_id"): + db_filter["cluster_id"] = filter_q.get("cluster_id") + data_list = self.db.get_list(self.topic, db_filter) + cluster_data = self.db.get_one("clusters", {"_id": db_filter["cluster_id"]}) + self.logger.info(f"Cluster Data: {cluster_data}") + self.logger.info(f"Data: {data_list}") + if filter_q.get("cluster_id"): + outdata = {} + outdata["count"] = cluster_data["node_count"] + outdata["data"] = data_list + self.logger.info(f"Outdata: {outdata}") + return outdata + if api_req: + data_list = [self.sol005_projection(inst) for inst in data_list] + return data_list + + def delete(self, session, _id, dry_run=False, not_send_msg=None): + if not self.multiproject: + filter_q = {} + else: + filter_q = self._get_project_filter(session) + filter_q[self.id_field(self.topic, _id)] = _id + item_content = self.db.get_one(self.topic, filter_q) + item_content["state"] = "IN_DELETION" + item_content["operatingState"] = "PROCESSING" + item_content["resourceState"] = "IN_PROGRESS" + self.check_conflict_on_del(session, _id, item_content) + op_id = self.format_on_operation( + item_content, + "delete", + None, + ) + self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content) + self._send_msg( + "delete_nodegroup", + {"nodegroup_id": _id, "operation_id": op_id}, + not_send_msg=not_send_msg, + ) + return op_id + + def update_item(self, session, _id, item, indata): + content = None + try: + if not content: + content = self.db.get_one(self.topic, {"_id": _id}) + indata = self._validate_input_edit(indata, content, force=session["force"]) + _id = content.get("_id") or _id + + content = self.check_conflict_on_edit(session, content, indata, _id=_id) + op_id = self.format_on_edit(content, indata) + op_id = ACMTopic.format_on_operation( + content, + "scale", + indata, + ) + self.logger.info(f"op_id: {op_id}") + content["operatingState"] = "PROCESSING" + content["resourceState"] = "IN_PROGRESS" + self.db.replace(self.topic, _id, content) + self._send_msg( + "scale_nodegroup", {"nodegroup_id": _id, "operation_id": op_id} + ) + return op_id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + + def edit(self, session, _id, indata, kwargs): + content = None + + # Override descriptor with query string kwargs + if kwargs: + self._update_input_with_kwargs(indata, kwargs) + try: + if indata and session.get("set_project"): + raise EngineException( + "Cannot edit content and set to project (query string SET_PROJECT) at same time", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + # TODO self._check_edition(session, indata, _id, force) + if not content: + content = self.db.get_one(self.topic, {"_id": _id}) + + indata = self._validate_input_edit(indata, content, force=session["force"]) + self.logger.info(f"Indata: {indata}") + + # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name + _id = content.get("_id") or _id + + content = self.check_conflict_on_edit(session, content, indata, _id=_id) + if "name" in indata and "description" in indata: + content["name"] = indata["name"] + content["description"] = indata["description"] + elif "name" in indata: + content["name"] = indata["name"] + elif "description" in indata: + content["description"] = indata["description"] + op_id = self.format_on_edit(content, indata) + self.db.set_one(self.topic, {"_id": _id}, content) + return op_id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + + class ClusterOpsTopic(ACMTopic): topic = "clusters" topic_msg = "cluster" @@ -845,6 +1025,12 @@ class KsusTopic(ACMTopic): topic_msg = "ksu" schema_new = ksu_schema schema_edit = ksu_schema + MAP_PROFILE = { + "infra_controller_profiles": "infra-controllers", + "infra_config_profiles": "infra-configs", + "resource_profiles": "managed_resources", + "app_profiles": "apps", + } def __init__(self, db, fs, msg, auth): super().__init__(db, fs, msg, auth) @@ -914,6 +1100,21 @@ class KsusTopic(ACMTopic): if "_id" in okas.keys(): self.update_usage_state(session, okas) + profile_id = content["profile"].get("_id") + profile_type = content["profile"].get("profile_type") + db_cluster_list = self.db.get_list("clusters") + for db_cluster in db_cluster_list: + if db_cluster.get("created") == "true": + profile_list = db_cluster[profile_type] + if profile_id in profile_list: + ksu_count = db_cluster.get("ksu_count") + new_ksu_count = ksu_count + 1 + self.logger.info(f"New KSU count: {new_ksu_count}") + db_cluster["ksu_count"] = new_ksu_count + self.db.set_one( + "clusters", {"_id": db_cluster["_id"]}, db_cluster + ) + _id = self.db.create(self.topic, content) rollback.append({"topic": self.topic, "_id": _id}) _id_list.append(_id) @@ -1026,6 +1227,52 @@ class KsusTopic(ACMTopic): data = {"ksus_list": _id_list, "operation_id": op_id} self._send_msg("edit", data) + def cluster_list_ksu(self, session, filter_q=None, api_req=None): + db_filter = {} + if filter_q.get("cluster_id"): + db_filter["_id"] = filter_q.get("cluster_id") + ksu_data_list = [] + + cluster_data = self.db.get_one("clusters", db_filter) + profiles_list = [ + "infra_controller_profiles", + "infra_config_profiles", + "app_profiles", + "resource_profiles", + ] + for profile in profiles_list: + data_list = [] + for profile_id in cluster_data[profile]: + filter_q = {"profile": {"_id": profile_id, "profile_type": profile}} + data_list = self.db.get_list(self.topic, filter_q) + for ksu_data in data_list: + ksu_data["package_name"] = [] + ksu_data["package_path"] = [] + for okas in ksu_data["operationHistory"][0]["operationParams"]["oka"]: + sw_catalog_path = okas.get("sw_catalog_path") + if sw_catalog_path: + parts = sw_catalog_path.rsplit("/", 2) + self.logger.info(f"Parts: {parts}") + ksu_data["package_name"].append(parts[-2]) + ksu_data["package_path"].append("/".join(parts[:-1])) + else: + oka_id = okas["_id"] + db_oka = self.db.get_one("okas", {"_id": oka_id}) + oka_type = self.MAP_PROFILE[ + db_oka.get("profile_type", "infra_controller_profiles") + ] + ksu_data["package_name"].append(db_oka["git_name"].lower()) + ksu_data["package_path"].append( + f"{oka_type}/{db_oka['git_name'].lower()}" + ) + ksu_data_list.append(ksu_data) + + outdata = {} + outdata["count"] = cluster_data["ksu_count"] + outdata["data"] = ksu_data_list + self.logger.info(f"Outdata: {outdata}") + return outdata + def edit_ksu(self, session, _id, indata, kwargs): content = None indata = self._remove_envelop(indata) diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index 134d565..e67588c 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -731,6 +731,22 @@ valid_url_methods = { "METHODS": ("POST",), "ROLE_PERMISSION": "k8scluster:id:upgrade:", }, + "nodegroup": { + "METHODS": ("POST", "GET"), + "ROLE_PERMISSION": "k8scluster:id:nodegroup:", + "": { + "METHODS": ("GET", "PATCH", "DELETE"), + "ROLE_PERMISSION": "k8scluster:id:nodegroup:id", + "scale": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "k8scluster:id:nodegroup:id:scale:", + }, + }, + }, + "ksus": { + "METHODS": ("GET",), + "ROLE_PERMISSION": "k8scluster:id:ksus:", + }, }, "register": { "METHODS": ("POST",), @@ -1728,6 +1744,8 @@ class Server(object): engine_topic = "resources" elif topic == "app_profiles": engine_topic = "apps" + if topic == "clusters" and item == "nodegroup": + engine_topic = "node_groups" elif main_topic == "k8scluster" and item in ( "upgrade", "get_creds", @@ -1811,6 +1829,22 @@ class Server(object): outdata = self.engine.get_item_list_cluster( engine_session, engine_topic, kwargs, api_req=True ) + elif topic == "clusters" and item == "nodegroup" and args: + _id = args[0] + outdata = outdata = self.engine.get_item( + engine_session, engine_topic, _id, kwargs, True + ) + elif topic == "clusters" and item == "nodegroup": + kwargs["cluster_id"] = _id + outdata = self.engine.get_item_list( + engine_session, engine_topic, kwargs, api_req=True + ) + elif topic == "clusters" and item == "ksus": + engine_topic = "ksus" + kwargs["cluster_id"] = _id + outdata = self.engine.get_cluster_list_ksu( + engine_session, engine_topic, kwargs, api_req=True + ) elif not _id: outdata = self.engine.get_item_list( engine_session, engine_topic, kwargs, api_req=True @@ -2053,10 +2087,24 @@ class Server(object): ) outdata = {"op_id": op_id} elif topic == "clusters" and item in ("upgrade", "scale"): - op_id = self.engine.update_cluster( + op_id = self.engine.update_item( engine_session, engine_topic, _id, item, indata ) outdata = {"op_id": op_id} + elif topic == "clusters" and item == "nodegroup": + indata["cluster_id"] = _id + if args: + _id = args[0] + op_id = self.engine.update_item( + engine_session, engine_topic, _id, item, indata + ) + outdata = {"op_id": op_id} + else: + _id, _ = self.engine.new_item( + rollback, engine_session, engine_topic, indata, kwargs + ) + self._set_location_header(main_topic, version, topic, _id) + outdata = {"_id": _id, "id": _id} else: _id, op_id = self.engine.new_item( rollback, @@ -2119,6 +2167,11 @@ class Server(object): if op_id else HTTPStatus.NO_CONTENT.value ) + elif topic == "clusters" and item == "nodegroup" and args: + _id = args[0] + op_id = self.engine.del_item(engine_session, engine_topic, _id) + if op_id: + outdata = {"_id": op_id} elif topic == "ksus": op_id = self.engine.delete_ksu( engine_session, engine_topic, _id, indata @@ -2206,6 +2259,11 @@ class Server(object): ) if not completed: cherrypy.response.headers["Transaction-Id"] = id + elif topic == "clusters" and item == "nodegroup" and args: + _id = args[0] + op_id = self.engine.edit_item( + engine_session, engine_topic, _id, indata, kwargs + ) else: op_id = self.engine.edit_item( engine_session, engine_topic, _id, indata, kwargs diff --git a/osm_nbi/validation.py b/osm_nbi/validation.py index 1df72b4..2272fa0 100644 --- a/osm_nbi/validation.py +++ b/osm_nbi/validation.py @@ -1125,7 +1125,10 @@ clustercreation_new_schema = { # "minItems": 2, # Minimum 2 subnets "uniqueItems": True, # Subnet IDs must be unique }, - "iam_role": string_schema, + "iam_role": { + "type": "string", + "pattern": "^arn:aws:iam::\\d{12}:role\\/[A-Za-z0-9]+$", + }, }, "required": ["vim_account", "name", "k8s_version"], "additionalProperties": False, @@ -1283,6 +1286,61 @@ attach_dettach_profile_schema = { }, "additionalProperties": False, } + +node_create_new_schema = { + "title": "node creation operation input schema", + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "name": name_schema, + "cluster_id": id_schema, + "description": string_schema, + "node_count": integer0_schema, + "node_size": string_schema, + "private_subnet": { + "type": "array", + "items": { + "type": "string", + "pattern": "^subnet-[a-f0-9]+$", + }, + "uniqueItems": True, + }, + "public_subnet": { + "type": "array", + "items": { + "type": "string", + "pattern": "^subnet-[a-f0-9]+$", + }, + "uniqueItems": True, + }, + "iam_role": { + "type": "string", + "pattern": "^arn:aws:iam::\\d{12}:role\\/[A-Za-z0-9]+$", + }, + }, + "required": [ + "name", + "cluster_id", + "node_size", + "node_count", + ], + "anyOf": [{"required": ["private_subnet"]}, {"required": ["public_subnet"]}], + "additionalProperties": False, +} + +node_edit_schema = { + "title": "node update operation input schema", + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "name": name_schema, + "cluster_id": id_schema, + "description": string_schema, + "node_count": integer0_schema, + }, + "additionalProperties": False, +} + # USERS project_role_mappings = { "title": "list pf projects/roles",