Feature: 11055 Support of several node groups in clusters created by OSM​ 29/15229/4
authoryshah <shahithya.y@tataelxsi.co.in>
Fri, 13 Jun 2025 05:49:31 +0000 (05:49 +0000)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Tue, 22 Jul 2025 14:26:37 +0000 (16:26 +0200)
Change-Id: I3f18708c33bb37cec77a05417b56fb5fba3dfda6
Signed-off-by: yshah <shahithya.y@tataelxsi.co.in>
osm_nbi/engine.py
osm_nbi/k8s_topics.py
osm_nbi/nbi.py
osm_nbi/validation.py

index ff65bbb..ffbb07d 100644 (file)
@@ -62,6 +62,7 @@ from osm_nbi.k8s_topics import (
     ClusterOpsTopic,
     KsusTopic,
     OkaTopic,
+    NodeGroupTopic,
 )
 from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic
 from osm_nbi.pmjobs_topics import PmJobsTopic
@@ -110,6 +111,7 @@ class Engine(object):
         "clusterops": ClusterOpsTopic,
         "ksus": KsusTopic,
         "oka_packages": OkaTopic,
+        "node_groups": NodeGroupTopic,
         # [NEW_TOPIC]: add an entry here
         # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
     }
@@ -358,12 +360,12 @@ class Engine(object):
             )
         return self.map_topic[topic].get_cluster_creds(session, _id, item)
 
-    def update_cluster(self, session, topic, _id, item, indata):
+    def update_item(self, session, topic, _id, item, indata):
         if topic not in self.map_topic:
             raise EngineException(
                 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
             )
-        return self.map_topic[topic].update_cluster(session, _id, item, indata)
+        return self.map_topic[topic].update_item(session, _id, item, indata)
 
     def delete_ksu(self, session, topic, _id, indata):
         if topic not in self.map_topic:
@@ -456,6 +458,22 @@ class Engine(object):
             )
         return self.map_topic[topic].get_file(session, _id, path, accept_header)
 
+    def get_cluster_list_ksu(self, session, topic, filter_q=None, api_req=False):
+        """
+        Get a list of items
+        :param session: contains the used login username and working project
+        :param topic: it can be: users, projects, vnfds, nsds, ...
+        :param filter_q: filter of data to be applied
+        :param api_req: True if this call is serving an external API request. False if serving internal request.
+        :return: The list, it can be empty if no one match the filter_q.
+        """
+        self.logger.info("it is getting into item list cluster")
+        if topic not in self.map_topic:
+            raise EngineException(
+                "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+            )
+        return self.map_topic[topic].cluster_list_ksu(session, filter_q, api_req)
+
     def del_item_list(self, session, topic, _filter=None):
         """
         Delete a list of items
index 90b85c9..496b737 100644 (file)
@@ -42,6 +42,8 @@ from osm_nbi.validation import (
     attach_dettach_profile_schema,
     ksu_schema,
     oka_schema,
+    node_create_new_schema,
+    node_edit_schema,
 )
 from osm_common.dbbase import deep_update_rfc7396, DbException
 from osm_common.msgbase import MsgException
@@ -334,7 +336,7 @@ class ClusterTopic(ACMTopic):
         if not version:
             content["k8s_version"] = "1.28"
         content["node_count"] = indata.get("node_count", 0)
-        content["ksu_count"] = "0"
+        content["ksu_count"] = 0
         self.logger.info(f"cotent is : {content}")
         return content
 
@@ -583,7 +585,7 @@ class ClusterTopic(ACMTopic):
                 HTTPStatus.UNPROCESSABLE_ENTITY,
             )
 
-    def update_cluster(self, session, _id, item, indata):
+    def update_item(self, session, _id, item, indata):
         if not self.multiproject:
             filter_db = {}
         else:
@@ -624,6 +626,184 @@ class ClusterTopic(ACMTopic):
         return op_id
 
 
+class NodeGroupTopic(ACMTopic):
+    topic = "nodegroups"
+    topic_msg = "nodegroup"
+    schema_new = node_create_new_schema
+    schema_edit = node_edit_schema
+
+    def __init__(self, db, fs, msg, auth):
+        BaseTopic.__init__(self, db, fs, msg, auth)
+
+    @staticmethod
+    def format_on_new(content, project_id=None, make_public=False):
+        BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
+        content["current_operation"] = None
+        content["state"] = "IN_CREATION"
+        content["operatingState"] = "PROCESSING"
+        content["resourceState"] = "IN_PROGRESS"
+
+    def new(self, rollback, session, indata=None, kwargs=None, headers=None):
+        self.logger.info(f"Indata: {indata}")
+        self.check_unique_name(session, indata["name"])
+
+        indata = self._remove_envelop(indata)
+        self._update_input_with_kwargs(indata, kwargs)
+        if not indata.get("private_subnet") and not indata.get("public_subnet"):
+            raise EngineException(
+                "Please provide atleast one subnet",
+                HTTPStatus.UNPROCESSABLE_ENTITY,
+            )
+        content = self._validate_input_new(indata, session["force"])
+
+        self.logger.info(f"Indata: {indata}")
+        self.logger.info(f"Content: {content}")
+        cluster_id = content["cluster_id"]
+        db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+        private_subnet = db_cluster.get("private_subnet")
+        public_subnet = db_cluster.get("public_subnet")
+        if content.get("private_subnet"):
+            for subnet in content["private_subnet"]:
+                if subnet not in private_subnet:
+                    raise EngineException(
+                        "No External subnet is used to add nodegroup",
+                        HTTPStatus.UNPROCESSABLE_ENTITY,
+                    )
+        if content.get("public_subnet"):
+            for subnet in content["public_subnet"]:
+                if subnet not in public_subnet:
+                    raise EngineException(
+                        "No External subnet is used to add nodegroup",
+                        HTTPStatus.UNPROCESSABLE_ENTITY,
+                    )
+
+        operation_params = {}
+        for content_key, content_value in content.items():
+            operation_params[content_key] = content_value
+        self.format_on_new(
+            content, session["project_id"], make_public=session["public"]
+        )
+        content["git_name"] = self.create_gitname(content, session)
+        self.logger.info(f"Operation Params: {operation_params}")
+        op_id = self.format_on_operation(
+            content,
+            "create",
+            operation_params,
+        )
+        node_count = db_cluster.get("node_count")
+        new_node_count = node_count + 1
+        self.logger.info(f"New Node count: {new_node_count}")
+        db_cluster["node_count"] = new_node_count
+        self.db.set_one("clusters", {"_id": cluster_id}, db_cluster)
+        _id = self.db.create(self.topic, content)
+        self._send_msg("add_nodegroup", {"nodegroup_id": _id, "operation_id": op_id})
+        return _id, op_id
+
+    def list(self, session, filter_q=None, api_req=False):
+        db_filter = {}
+        if filter_q.get("cluster_id"):
+            db_filter["cluster_id"] = filter_q.get("cluster_id")
+        data_list = self.db.get_list(self.topic, db_filter)
+        cluster_data = self.db.get_one("clusters", {"_id": db_filter["cluster_id"]})
+        self.logger.info(f"Cluster Data: {cluster_data}")
+        self.logger.info(f"Data: {data_list}")
+        if filter_q.get("cluster_id"):
+            outdata = {}
+            outdata["count"] = cluster_data["node_count"]
+            outdata["data"] = data_list
+            self.logger.info(f"Outdata: {outdata}")
+            return outdata
+        if api_req:
+            data_list = [self.sol005_projection(inst) for inst in data_list]
+        return data_list
+
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
+        if not self.multiproject:
+            filter_q = {}
+        else:
+            filter_q = self._get_project_filter(session)
+        filter_q[self.id_field(self.topic, _id)] = _id
+        item_content = self.db.get_one(self.topic, filter_q)
+        item_content["state"] = "IN_DELETION"
+        item_content["operatingState"] = "PROCESSING"
+        item_content["resourceState"] = "IN_PROGRESS"
+        self.check_conflict_on_del(session, _id, item_content)
+        op_id = self.format_on_operation(
+            item_content,
+            "delete",
+            None,
+        )
+        self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
+        self._send_msg(
+            "delete_nodegroup",
+            {"nodegroup_id": _id, "operation_id": op_id},
+            not_send_msg=not_send_msg,
+        )
+        return op_id
+
+    def update_item(self, session, _id, item, indata):
+        content = None
+        try:
+            if not content:
+                content = self.db.get_one(self.topic, {"_id": _id})
+            indata = self._validate_input_edit(indata, content, force=session["force"])
+            _id = content.get("_id") or _id
+
+            content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+            op_id = self.format_on_edit(content, indata)
+            op_id = ACMTopic.format_on_operation(
+                content,
+                "scale",
+                indata,
+            )
+            self.logger.info(f"op_id: {op_id}")
+            content["operatingState"] = "PROCESSING"
+            content["resourceState"] = "IN_PROGRESS"
+            self.db.replace(self.topic, _id, content)
+            self._send_msg(
+                "scale_nodegroup", {"nodegroup_id": _id, "operation_id": op_id}
+            )
+            return op_id
+        except ValidationError as e:
+            raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+    def edit(self, session, _id, indata, kwargs):
+        content = None
+
+        # Override descriptor with query string kwargs
+        if kwargs:
+            self._update_input_with_kwargs(indata, kwargs)
+        try:
+            if indata and session.get("set_project"):
+                raise EngineException(
+                    "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+                    HTTPStatus.UNPROCESSABLE_ENTITY,
+                )
+            # TODO self._check_edition(session, indata, _id, force)
+            if not content:
+                content = self.db.get_one(self.topic, {"_id": _id})
+
+            indata = self._validate_input_edit(indata, content, force=session["force"])
+            self.logger.info(f"Indata: {indata}")
+
+            # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
+            _id = content.get("_id") or _id
+
+            content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+            if "name" in indata and "description" in indata:
+                content["name"] = indata["name"]
+                content["description"] = indata["description"]
+            elif "name" in indata:
+                content["name"] = indata["name"]
+            elif "description" in indata:
+                content["description"] = indata["description"]
+            op_id = self.format_on_edit(content, indata)
+            self.db.set_one(self.topic, {"_id": _id}, content)
+            return op_id
+        except ValidationError as e:
+            raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+
 class ClusterOpsTopic(ACMTopic):
     topic = "clusters"
     topic_msg = "cluster"
@@ -845,6 +1025,12 @@ class KsusTopic(ACMTopic):
     topic_msg = "ksu"
     schema_new = ksu_schema
     schema_edit = ksu_schema
+    MAP_PROFILE = {
+        "infra_controller_profiles": "infra-controllers",
+        "infra_config_profiles": "infra-configs",
+        "resource_profiles": "managed_resources",
+        "app_profiles": "apps",
+    }
 
     def __init__(self, db, fs, msg, auth):
         super().__init__(db, fs, msg, auth)
@@ -914,6 +1100,21 @@ class KsusTopic(ACMTopic):
                 if "_id" in okas.keys():
                     self.update_usage_state(session, okas)
 
+            profile_id = content["profile"].get("_id")
+            profile_type = content["profile"].get("profile_type")
+            db_cluster_list = self.db.get_list("clusters")
+            for db_cluster in db_cluster_list:
+                if db_cluster.get("created") == "true":
+                    profile_list = db_cluster[profile_type]
+                    if profile_id in profile_list:
+                        ksu_count = db_cluster.get("ksu_count")
+                        new_ksu_count = ksu_count + 1
+                        self.logger.info(f"New KSU count: {new_ksu_count}")
+                        db_cluster["ksu_count"] = new_ksu_count
+                        self.db.set_one(
+                            "clusters", {"_id": db_cluster["_id"]}, db_cluster
+                        )
+
             _id = self.db.create(self.topic, content)
             rollback.append({"topic": self.topic, "_id": _id})
             _id_list.append(_id)
@@ -1026,6 +1227,52 @@ class KsusTopic(ACMTopic):
         data = {"ksus_list": _id_list, "operation_id": op_id}
         self._send_msg("edit", data)
 
+    def cluster_list_ksu(self, session, filter_q=None, api_req=None):
+        db_filter = {}
+        if filter_q.get("cluster_id"):
+            db_filter["_id"] = filter_q.get("cluster_id")
+        ksu_data_list = []
+
+        cluster_data = self.db.get_one("clusters", db_filter)
+        profiles_list = [
+            "infra_controller_profiles",
+            "infra_config_profiles",
+            "app_profiles",
+            "resource_profiles",
+        ]
+        for profile in profiles_list:
+            data_list = []
+            for profile_id in cluster_data[profile]:
+                filter_q = {"profile": {"_id": profile_id, "profile_type": profile}}
+                data_list = self.db.get_list(self.topic, filter_q)
+            for ksu_data in data_list:
+                ksu_data["package_name"] = []
+                ksu_data["package_path"] = []
+                for okas in ksu_data["operationHistory"][0]["operationParams"]["oka"]:
+                    sw_catalog_path = okas.get("sw_catalog_path")
+                    if sw_catalog_path:
+                        parts = sw_catalog_path.rsplit("/", 2)
+                        self.logger.info(f"Parts: {parts}")
+                        ksu_data["package_name"].append(parts[-2])
+                        ksu_data["package_path"].append("/".join(parts[:-1]))
+                    else:
+                        oka_id = okas["_id"]
+                        db_oka = self.db.get_one("okas", {"_id": oka_id})
+                        oka_type = self.MAP_PROFILE[
+                            db_oka.get("profile_type", "infra_controller_profiles")
+                        ]
+                        ksu_data["package_name"].append(db_oka["git_name"].lower())
+                        ksu_data["package_path"].append(
+                            f"{oka_type}/{db_oka['git_name'].lower()}"
+                        )
+                ksu_data_list.append(ksu_data)
+
+        outdata = {}
+        outdata["count"] = cluster_data["ksu_count"]
+        outdata["data"] = ksu_data_list
+        self.logger.info(f"Outdata: {outdata}")
+        return outdata
+
     def edit_ksu(self, session, _id, indata, kwargs):
         content = None
         indata = self._remove_envelop(indata)
index 134d565..e67588c 100644 (file)
@@ -731,6 +731,22 @@ valid_url_methods = {
                         "METHODS": ("POST",),
                         "ROLE_PERMISSION": "k8scluster:id:upgrade:",
                     },
+                    "nodegroup": {
+                        "METHODS": ("POST", "GET"),
+                        "ROLE_PERMISSION": "k8scluster:id:nodegroup:",
+                        "<ID>": {
+                            "METHODS": ("GET", "PATCH", "DELETE"),
+                            "ROLE_PERMISSION": "k8scluster:id:nodegroup:id",
+                            "scale": {
+                                "METHODS": ("POST",),
+                                "ROLE_PERMISSION": "k8scluster:id:nodegroup:id:scale:",
+                            },
+                        },
+                    },
+                    "ksus": {
+                        "METHODS": ("GET",),
+                        "ROLE_PERMISSION": "k8scluster:id:ksus:",
+                    },
                 },
                 "register": {
                     "METHODS": ("POST",),
@@ -1728,6 +1744,8 @@ class Server(object):
                     engine_topic = "resources"
                 elif topic == "app_profiles":
                     engine_topic = "apps"
+                if topic == "clusters" and item == "nodegroup":
+                    engine_topic = "node_groups"
             elif main_topic == "k8scluster" and item in (
                 "upgrade",
                 "get_creds",
@@ -1811,6 +1829,22 @@ class Server(object):
                     outdata = self.engine.get_item_list_cluster(
                         engine_session, engine_topic, kwargs, api_req=True
                     )
+                elif topic == "clusters" and item == "nodegroup" and args:
+                    _id = args[0]
+                    outdata = outdata = self.engine.get_item(
+                        engine_session, engine_topic, _id, kwargs, True
+                    )
+                elif topic == "clusters" and item == "nodegroup":
+                    kwargs["cluster_id"] = _id
+                    outdata = self.engine.get_item_list(
+                        engine_session, engine_topic, kwargs, api_req=True
+                    )
+                elif topic == "clusters" and item == "ksus":
+                    engine_topic = "ksus"
+                    kwargs["cluster_id"] = _id
+                    outdata = self.engine.get_cluster_list_ksu(
+                        engine_session, engine_topic, kwargs, api_req=True
+                    )
                 elif not _id:
                     outdata = self.engine.get_item_list(
                         engine_session, engine_topic, kwargs, api_req=True
@@ -2053,10 +2087,24 @@ class Server(object):
                     )
                     outdata = {"op_id": op_id}
                 elif topic == "clusters" and item in ("upgrade", "scale"):
-                    op_id = self.engine.update_cluster(
+                    op_id = self.engine.update_item(
                         engine_session, engine_topic, _id, item, indata
                     )
                     outdata = {"op_id": op_id}
+                elif topic == "clusters" and item == "nodegroup":
+                    indata["cluster_id"] = _id
+                    if args:
+                        _id = args[0]
+                        op_id = self.engine.update_item(
+                            engine_session, engine_topic, _id, item, indata
+                        )
+                        outdata = {"op_id": op_id}
+                    else:
+                        _id, _ = self.engine.new_item(
+                            rollback, engine_session, engine_topic, indata, kwargs
+                        )
+                        self._set_location_header(main_topic, version, topic, _id)
+                        outdata = {"_id": _id, "id": _id}
                 else:
                     _id, op_id = self.engine.new_item(
                         rollback,
@@ -2119,6 +2167,11 @@ class Server(object):
                             if op_id
                             else HTTPStatus.NO_CONTENT.value
                         )
+                    elif topic == "clusters" and item == "nodegroup" and args:
+                        _id = args[0]
+                        op_id = self.engine.del_item(engine_session, engine_topic, _id)
+                        if op_id:
+                            outdata = {"_id": op_id}
                     elif topic == "ksus":
                         op_id = self.engine.delete_ksu(
                             engine_session, engine_topic, _id, indata
@@ -2206,6 +2259,11 @@ class Server(object):
                         )
                         if not completed:
                             cherrypy.response.headers["Transaction-Id"] = id
+                elif topic == "clusters" and item == "nodegroup" and args:
+                    _id = args[0]
+                    op_id = self.engine.edit_item(
+                        engine_session, engine_topic, _id, indata, kwargs
+                    )
                 else:
                     op_id = self.engine.edit_item(
                         engine_session, engine_topic, _id, indata, kwargs
index 1df72b4..2272fa0 100644 (file)
@@ -1125,7 +1125,10 @@ clustercreation_new_schema = {
             # "minItems": 2, # Minimum 2 subnets
             "uniqueItems": True,  # Subnet IDs must be unique
         },
-        "iam_role": string_schema,
+        "iam_role": {
+            "type": "string",
+            "pattern": "^arn:aws:iam::\\d{12}:role\\/[A-Za-z0-9]+$",
+        },
     },
     "required": ["vim_account", "name", "k8s_version"],
     "additionalProperties": False,
@@ -1283,6 +1286,61 @@ attach_dettach_profile_schema = {
     },
     "additionalProperties": False,
 }
+
+node_create_new_schema = {
+    "title": "node creation operation input schema",
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "type": "object",
+    "properties": {
+        "name": name_schema,
+        "cluster_id": id_schema,
+        "description": string_schema,
+        "node_count": integer0_schema,
+        "node_size": string_schema,
+        "private_subnet": {
+            "type": "array",
+            "items": {
+                "type": "string",
+                "pattern": "^subnet-[a-f0-9]+$",
+            },
+            "uniqueItems": True,
+        },
+        "public_subnet": {
+            "type": "array",
+            "items": {
+                "type": "string",
+                "pattern": "^subnet-[a-f0-9]+$",
+            },
+            "uniqueItems": True,
+        },
+        "iam_role": {
+            "type": "string",
+            "pattern": "^arn:aws:iam::\\d{12}:role\\/[A-Za-z0-9]+$",
+        },
+    },
+    "required": [
+        "name",
+        "cluster_id",
+        "node_size",
+        "node_count",
+    ],
+    "anyOf": [{"required": ["private_subnet"]}, {"required": ["public_subnet"]}],
+    "additionalProperties": False,
+}
+
+node_edit_schema = {
+    "title": "node update operation input schema",
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "type": "object",
+    "properties": {
+        "name": name_schema,
+        "cluster_id": id_schema,
+        "description": string_schema,
+        "node_count": integer0_schema,
+    },
+    "additionalProperties": False,
+}
+
 # USERS
 project_role_mappings = {
     "title": "list pf projects/roles",