Feature 11073: Enhanced OSM declarative modelling for applications. App as first... 16/15316/1
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Tue, 5 Aug 2025 16:21:26 +0000 (18:21 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Tue, 5 Aug 2025 16:39:12 +0000 (18:39 +0200)
Change-Id: Id4b4b188e02000f8df033d8851394314c3795762
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
osm_nbi/engine.py
osm_nbi/k8s_topics.py
osm_nbi/nbi.py
osm_nbi/validation.py

index ffbb07d..f9a1502 100644 (file)
@@ -57,10 +57,11 @@ from osm_nbi.k8s_topics import (
     ClusterTopic,
     InfraContTopic,
     InfraConfTopic,
-    AppTopic,
+    AppProfileTopic,
     ResourceTopic,
     ClusterOpsTopic,
     KsusTopic,
+    AppInstanceTopic,
     OkaTopic,
     NodeGroupTopic,
 )
@@ -106,10 +107,11 @@ class Engine(object):
         "cluster": ClusterTopic,
         "infras_cont": InfraContTopic,
         "infras_conf": InfraConfTopic,
-        "apps": AppTopic,
+        "apps": AppProfileTopic,
         "resources": ResourceTopic,
         "clusterops": ClusterOpsTopic,
         "ksus": KsusTopic,
+        "appinstances": AppInstanceTopic,
         "oka_packages": OkaTopic,
         "node_groups": NodeGroupTopic,
         # [NEW_TOPIC]: add an entry here
@@ -377,6 +379,13 @@ class Engine(object):
                 session, _id, indata, not_send_msg=None
             )
 
+    def update_appinstance(self, session, topic, _id, item, indata):
+        if topic not in self.map_topic:
+            raise EngineException(
+                "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+            )
+        return self.map_topic[topic].update_appinstance(session, _id, item, indata)
+
     def get_item_list(self, session, topic, filter_q=None, api_req=False):
         """
         Get a list of items
index 89f8031..7d8ffb8 100644 (file)
@@ -41,6 +41,7 @@ from osm_nbi.validation import (
     clusterregistration_new_schema,
     attach_dettach_profile_schema,
     ksu_schema,
+    app_instance_schema,
     oka_schema,
     node_create_new_schema,
     node_edit_schema,
@@ -103,7 +104,7 @@ class InfraConfTopic(ProfileTopic):
         return _id
 
 
-class AppTopic(ProfileTopic):
+class AppProfileTopic(ProfileTopic):
     topic = "k8sapp"
     topic_msg = "k8s_app"
     schema_new = app_profile_create_new_schema
@@ -162,7 +163,7 @@ class ClusterTopic(ACMTopic):
         self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
         self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
         self.resource_topic = ResourceTopic(db, fs, msg, auth)
-        self.app_topic = AppTopic(db, fs, msg, auth)
+        self.app_topic = AppProfileTopic(db, fs, msg, auth)
 
     @staticmethod
     def format_on_new(content, project_id=None, make_public=False):
@@ -829,7 +830,7 @@ class ClusterOpsTopic(ACMTopic):
         self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
         self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
         self.resource_topic = ResourceTopic(db, fs, msg, auth)
-        self.app_topic = AppTopic(db, fs, msg, auth)
+        self.app_topic = AppProfileTopic(db, fs, msg, auth)
 
     @staticmethod
     def format_on_new(content, project_id=None, make_public=False):
@@ -1037,7 +1038,6 @@ class ClusterOpsTopic(ACMTopic):
 class KsusTopic(ACMTopic):
     topic = "ksus"
     okapkg_topic = "okas"
-    infra_topic = "k8sinfra"
     topic_msg = "ksu"
     schema_new = ksu_schema
     schema_edit = ksu_schema
@@ -1418,6 +1418,190 @@ class KsusTopic(ACMTopic):
         return op_id, not_send_msg
 
 
+class AppInstanceTopic(ACMTopic):
+    topic = "appinstances"
+    okapkg_topic = "okas"
+    topic_msg = "appinstance"
+    schema_new = app_instance_schema
+    schema_edit = app_instance_schema
+
+    def __init__(self, db, fs, msg, auth):
+        super().__init__(db, fs, msg, auth)
+        self.logger = logging.getLogger("nbi.appinstances")
+
+    @staticmethod
+    def format_on_new(content, project_id=None, make_public=False):
+        BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
+        content["current_operation"] = None
+        content["state"] = "IN_CREATION"
+        content["operatingState"] = "PROCESSING"
+        content["resourceState"] = "IN_PROGRESS"
+
+    def new(self, rollback, session, indata=None, kwargs=None, headers=None):
+        if indata.get("oka") and indata.get("sw_catalog_path"):
+            raise EngineException(
+                "Cannot create app instance with both OKA and SW catalog path",
+                HTTPStatus.UNPROCESSABLE_ENTITY,
+            )
+
+        # Override descriptor with query string kwargs
+        content = self._remove_envelop(indata)
+        self._update_input_with_kwargs(content, kwargs)
+        content = self._validate_input_new(input=content, force=session["force"])
+
+        # Check for unique name
+        self.check_unique_name(session, content["name"])
+
+        self.check_conflict_on_new(session, content)
+
+        operation_params = {}
+        for content_key, content_value in content.items():
+            operation_params[content_key] = content_value
+        self.format_on_new(
+            content, project_id=session["project_id"], make_public=session["public"]
+        )
+        op_id = self.format_on_operation(
+            content,
+            operation_type="create",
+            operation_params=operation_params,
+        )
+        content["git_name"] = self.create_gitname(content, session)
+
+        oka_id = content.get("oka")
+        if oka_id:
+            self.update_oka_usage_state(session, oka_id)
+
+        _id = self.db.create(self.topic, content)
+        rollback.append({"topic": self.topic, "_id": _id})
+        self._send_msg("create", content=content)
+        return _id, op_id
+
+    def update_oka_usage_state(self, session, oka_id):
+        filter_db = self._get_project_filter(session)
+        filter_db[BaseTopic.id_field(self.topic, oka_id)] = oka_id
+
+        data = self.db.get_one(self.okapkg_topic, filter_db)
+        if data["_admin"]["usageState"] == "NOT_IN_USE":
+            usage_state_update = {
+                "_admin.usageState": "IN_USE",
+            }
+            self.db.set_one(
+                self.okapkg_topic, {"_id": oka_id}, update_dict=usage_state_update
+            )
+
+    def check_conflict_on_edit(self, session, final_content, edit_content, _id):
+        if final_content["name"] != edit_content["name"]:
+            self.check_unique_name(session, edit_content["name"])
+        return final_content
+
+    @staticmethod
+    def format_on_edit(final_content, edit_content):
+        op_id = ACMTopic.format_on_operation(
+            final_content,
+            "update",
+            edit_content,
+        )
+        final_content["operatingState"] = "PROCESSING"
+        final_content["resourceState"] = "IN_PROGRESS"
+        if final_content.get("_admin"):
+            now = time()
+            final_content["_admin"]["modified"] = now
+        return op_id
+
+    def edit(self, session, _id, indata, kwargs):
+        content = None
+        indata = self._remove_envelop(indata)
+
+        # Override descriptor with query string kwargs
+        if kwargs:
+            self._update_input_with_kwargs(indata, kwargs)
+        try:
+            if indata and session.get("set_project"):
+                raise EngineException(
+                    "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+                    HTTPStatus.UNPROCESSABLE_ENTITY,
+                )
+            # TODO self._check_edition(session, indata, _id, force)
+            if not content:
+                content = self.show(session, _id)
+
+            indata = self._validate_input_edit(indata, content, force=session["force"])
+
+            # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
+            _id = content.get("_id") or _id
+
+            content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+            op_id = self.format_on_edit(content, indata)
+            self.db.replace(self.topic, _id, content)
+            return op_id
+        except ValidationError as e:
+            raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+    def update_appinstance(self, session, _id, item, indata):
+        if not self.multiproject:
+            filter_db = {}
+        else:
+            filter_db = self._get_project_filter(session)
+        # To allow project&user addressing by name AS WELL AS _id
+        filter_db[BaseTopic.id_field(self.topic, _id)] = _id
+        validate_input(indata, app_instance_schema)
+        data = self.db.get_one(self.topic, filter_db)
+        operation_params = {}
+        data["operatingState"] = "PROCESSING"
+        data["resourceState"] = "IN_PROGRESS"
+        operation_params = indata
+        op_id = self.format_on_operation(
+            data,
+            item,
+            operation_params,
+        )
+        self.db.set_one(self.topic, {"_id": _id}, data)
+        data = {"appinstance": _id, "operation_id": op_id}
+        self._send_msg(item, data)
+        return op_id
+
+    def delete(self, session, _id, not_send_msg=None):
+        if not self.multiproject:
+            filter_q = {}
+        else:
+            filter_q = self._get_project_filter(session)
+        filter_q[self.id_field(self.topic, _id)] = _id
+        item_content = self.db.get_one(self.topic, filter_q)
+        item_content["state"] = "IN_DELETION"
+        item_content["operatingState"] = "PROCESSING"
+        item_content["resourceState"] = "IN_PROGRESS"
+        op_id = self.format_on_operation(
+            item_content,
+            "delete",
+            None,
+        )
+        self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
+
+        # Check if the profile exists. If it doesn't, no message should be sent to Kafka
+        not_send_msg2 = not_send_msg
+        profile_id = item_content["profile"]["_id"]
+        profile_type = item_content["profile"]["profile_type"]
+        profile_collection_map = {
+            "app_profiles": "k8sapp",
+            "resource_profiles": "k8sresource",
+            "infra_controller_profiles": "k8sinfra_controller",
+            "infra_config_profiles": "k8sinfra_config",
+        }
+        profile_collection = profile_collection_map[profile_type]
+        profile_content = self.db.get_one(
+            profile_collection, {"_id": profile_id}, fail_on_empty=False
+        )
+        if not profile_content:
+            self.db.del_one(self.topic, filter_q)
+            not_send_msg2 = True
+        self._send_msg(
+            "delete",
+            {"appinstance": _id, "operation_id": op_id, "force": session["force"]},
+            not_send_msg=not_send_msg2,
+        )
+        return op_id
+
+
 class OkaTopic(DescriptorTopic, ACMOperationTopic):
     topic = "okas"
     topic_msg = "oka"
index 2cddabb..09ae05d 100644 (file)
@@ -815,6 +815,22 @@ valid_url_methods = {
             },
         }
     },
+    "appinstance": {
+        "v1": {
+            "appinstances": {
+                "METHODS": ("GET", "POST"),
+                "ROLE_PERMISSION": "appinstance:",
+                "<ID>": {
+                    "METHODS": ("GET", "PATCH", "DELETE"),
+                    "ROLE_PERMISSION": "appinstance:id:",
+                },
+                "update": {
+                    "METHODS": ("POST",),
+                    "ROLE_PERMISSION": "appinstance:",
+                },
+            },
+        }
+    },
     "oka": {
         "v1": {
             "oka_packages": {
@@ -1662,6 +1678,7 @@ class Server(object):
                 "vnflcm",
                 "k8scluster",
                 "ksu",
+                "appinstance",
                 "oka",
             ):
                 raise NbiException(
@@ -1748,6 +1765,8 @@ class Server(object):
                     engine_topic = "node_groups"
             elif main_topic == "ksu" and engine_topic in ("ksus", "clone", "move"):
                 engine_topic = "ksus"
+            elif main_topic == "appinstance":
+                engine_topic = "appinstances"
             if (
                 engine_topic == "vims"
             ):  # TODO this is for backward compatibility, it will be removed in the future
@@ -2080,6 +2099,11 @@ class Server(object):
                         engine_session, engine_topic, _id, indata, kwargs
                     )
                     outdata = {"op_id": op_id}
+                elif topic == "appinstances" and item == "update":
+                    op_id = self.engine.update_appinstance(
+                        engine_session, engine_topic, _id, indata, kwargs
+                    )
+                    outdata = {"op_id": op_id}
                 elif topic == "clusters" and item in ("upgrade", "scale"):
                     op_id = self.engine.update_item(
                         engine_session, engine_topic, _id, item, indata
index 2272fa0..349ca05 100644 (file)
@@ -1827,6 +1827,26 @@ ksu_schema = {
 }
 
 
+app_instance_schema = {
+    "title": "app instance schema",
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "type": "object",
+    "properties": {
+        "name": name_schema,
+        "description": description_schema,
+        "profile": id_schema,
+        "profile_type": profile_type_schema,
+        "oka": id_schema,
+        "sw_catalog_path": string_schema,
+        "model": object_schema,
+        "params": object_schema,
+        "secret_params": object_schema,
+    },
+    "additionalProperties": False,
+    "required": ["name", "profile", "profile_type"],
+}
+
+
 class ValidationError(Exception):
     def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY):
         self.http_code = http_code