Feature 11022,11025: Advanced Cluster Management 71/14471/13
authoryshah <shahithya.y@tataelxsi.co.in>
Fri, 5 Jul 2024 13:06:31 +0000 (13:06 +0000)
committeryshah <shahithya.y@tataelxsi.co.in>
Fri, 16 Aug 2024 14:23:46 +0000 (14:23 +0000)
Change-Id: I4168366f79b11de15f6808977fb15a3ff270f519
Signed-off-by: yshah <shahithya.y@tataelxsi.co.in>
osm_nbi/admin_topics.py
osm_nbi/base_topic.py
osm_nbi/descriptor_topics.py
osm_nbi/engine.py
osm_nbi/instance_topics.py
osm_nbi/k8s_topics.py
osm_nbi/nbi.py
osm_nbi/validation.py

index 788ae3e..f43ea18 100644 (file)
@@ -387,6 +387,7 @@ class CommonVimWimSdn(BaseTopic):
         """
         super().format_on_new(content, project_id=project_id, make_public=make_public)
         content["schema_version"] = schema_version = "1.11"
+        content["key"] = "registered"
 
         # encrypt passwords
         if content.get(self.password_to_encrypt):
@@ -1020,6 +1021,7 @@ class UserTopicAuth(UserTopic):
         try:
             if not content:
                 content = self.show(session, _id)
+
             indata = self._validate_input_edit(indata, content, force=session["force"])
             content = self.check_conflict_on_edit(session, content, indata, _id=_id)
             # self.format_on_edit(content, indata)
index ab60dc1..ce3400d 100644 (file)
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
+import logging
 import random
 import string
 from uuid import uuid4
@@ -202,7 +202,7 @@ class BaseTopic:
         self.db = db
         self.fs = fs
         self.msg = msg
-        self.logger = logging.getLogger("nbi.base")
+        self.logger = logging.getLogger("nbi.base")
         self.auth = auth
 
     @staticmethod
@@ -267,7 +267,6 @@ class BaseTopic:
         :return: The same input content, or a changed version of it.
         """
         if self.schema_edit:
-            # self.logger.info("the schema edit is : {}".format(self.schema_edit))
             validate_input(input, self.schema_edit)
         return input
 
@@ -555,7 +554,6 @@ class BaseTopic:
         # Only perform SOL005 projection if we are serving an external request
         if api_req:
             self.sol005_projection(data)
-
         return data
 
         # TODO transform data for SOL005 URL requests
index e8c609a..287dbda 100644 (file)
@@ -326,6 +326,7 @@ class DescriptorTopic(BaseTopic):
                     HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
                 )
             file_pkg = self.fs.file_open(file_path, "a+b")
+
             if isinstance(indata, dict):
                 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
                 file_pkg.write(indata_text.encode(encoding="utf-8"))
@@ -817,9 +818,9 @@ class VnfdTopic(DescriptorTopic):
             return
 
         _filter = self._get_project_filter(session)
-
         # check vnfrs using this vnfd
         _filter["vnfd-id"] = _id
+
         if self.db.get_list("vnfrs", _filter):
             raise EngineException(
                 "There is at least one VNF instance using this descriptor",
@@ -829,6 +830,7 @@ class VnfdTopic(DescriptorTopic):
         # check NSD referencing this VNFD
         del _filter["vnfd-id"]
         _filter["vnfd-id"] = descriptor_id
+
         if self.db.get_list("nsds", _filter):
             raise EngineException(
                 "There is at least one NS package referencing this descriptor",
@@ -2253,7 +2255,7 @@ class NsConfigTemplateTopic(DescriptorTopic):
 
         # check NS CONFIG TEMPLATE used by NS
         ns_config_template_id = _id
-        # self.logger.info("The id is : {}".format(_id))
+
         if self.db.get_list(
             "nsrs", {"instantiate_params.nsConfigTemplateId": ns_config_template_id}
         ):
index e419e8a..bac0802 100644 (file)
@@ -60,6 +60,8 @@ from osm_nbi.k8s_topics import (
     AppTopic,
     ResourceTopic,
     K8saddTopic,
+    KsusTopic,
+    OkaTopic,
 )
 from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic
 from osm_nbi.pmjobs_topics import PmJobsTopic
@@ -106,6 +108,8 @@ class Engine(object):
         "apps": AppTopic,
         "resources": ResourceTopic,
         "k8sops": K8saddTopic,
+        "ksus": KsusTopic,
+        "oka_packages": OkaTopic,
         # [NEW_TOPIC]: add an entry here
         # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters
     }
@@ -319,6 +323,51 @@ class Engine(object):
                 session, _id, indata, kwargs, headers
             )
 
+    def clone(
+        self, rollback, session, topic, _id, indata=None, kwargs=None, headers=None
+    ):
+        if topic not in self.map_topic:
+            raise EngineException(
+                "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+            )
+        with self.write_lock:
+            return self.map_topic[topic].clone(
+                rollback, session, _id, indata, kwargs, headers
+            )
+
+    def move_ksu(self, session, topic, _id, indata=None, kwargs=None):
+        if topic not in self.map_topic:
+            raise EngineException(
+                "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+            )
+
+        with self.write_lock:
+            return self.map_topic[topic].move_ksu(session, _id, indata, kwargs)
+
+    def get_cluster_info(self, session, topic, _id, item):
+        if topic not in self.map_topic:
+            raise EngineException(
+                "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+            )
+        return self.map_topic[topic].get_cluster_info(session, _id, item)
+
+    def update_cluster(self, session, topic, _id, item, indata):
+        if topic not in self.map_topic:
+            raise EngineException(
+                "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+            )
+        return self.map_topic[topic].update_cluster(session, _id, item, indata)
+
+    def delete_ksu(self, session, topic, _id, indata):
+        if topic not in self.map_topic:
+            raise EngineException(
+                "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+            )
+        with self.write_lock:
+            return self.map_topic[topic].delete_ksu(
+                session, _id, indata, not_send_msg=None
+            )
+
     def get_item_list(self, session, topic, filter_q=None, api_req=False):
         """
         Get a list of items
index 96dc007..6308e56 100644 (file)
@@ -2427,7 +2427,6 @@ class NsLcmOpTopic(BaseTopic):
                     new_sw_version = vnfd.get("software-version", "1.0")
                     if new_sw_version != old_sw_version:
                         vnf_index = vnfr["member-vnf-index-ref"]
-                        # self.logger.info("nsr {}".format(nsr))
                         for vdu in vnfd["vdu"]:
                             self.nsrtopic._add_shared_volumes_to_nsr(
                                 vdu, vnfd, nsr, vnf_index, latest_vnfd_revision
@@ -3087,10 +3086,6 @@ class NsiLcmOpTopic(BaseTopic):
                         self.db.set_one("nsis", {"_id": nsir["_id"]}, _update)
                 except (DbException, EngineException) as e:
                     if e.http_code == HTTPStatus.NOT_FOUND:
-                        # self.logger.info(
-                        #    logging_prefix
-                        #    + "skipping NS={} because not found".format(nsr_id)
-                        # )
                         pass
                     else:
                         raise
index 9a6eb06..2ce01e0 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# import logging
+import logging
+import yaml
+import tarfile
+import shutil
+import os
 from http import HTTPStatus
+from uuid import uuid4
 
+from time import time
 from osm_nbi.base_topic import BaseTopic, EngineException
 
+from osm_nbi.descriptor_topics import DescriptorTopic
 from osm_nbi.validation import (
     ValidationError,
     clustercreation_new_schema,
@@ -31,12 +38,17 @@ from osm_nbi.validation import (
     resource_profile_create_edit_schema,
     k8scluster_new_schema,
     attach_dettach_profile_schema,
+    ksu_schema,
+    oka_schema,
 )
-from osm_common.dbbase import DbException
+from osm_common.dbbase import deep_update_rfc7396, DbException
 from osm_common.msgbase import MsgException
 from osm_common.fsbase import FsException
 
-__author__ = "Shrinithi R <shrinithi.r@tataelxsi.co.in>"
+__author__ = (
+    "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
+    "Shahithya Y <shahithya.y@tataelxsi.co.in>",
+)
 
 
 class InfraContTopic(BaseTopic):
@@ -47,7 +59,6 @@ class InfraContTopic(BaseTopic):
 
     def __init__(self, db, fs, msg, auth):
         BaseTopic.__init__(self, db, fs, msg, auth)
-        # self.logger = logging.getLogger("nbi.k8s_topics")
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None):
         # To create the new infra controller profile
@@ -79,7 +90,6 @@ class InfraConfTopic(BaseTopic):
 
     def __init__(self, db, fs, msg, auth):
         BaseTopic.__init__(self, db, fs, msg, auth)
-        # self.logger = logging.getLogger("nbi.k8s_topics")
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None):
         # To create the new infra config profile
@@ -111,7 +121,6 @@ class AppTopic(BaseTopic):
 
     def __init__(self, db, fs, msg, auth):
         BaseTopic.__init__(self, db, fs, msg, auth)
-        # self.logger = logging.getLogger("nbi.k8s_topics")
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None):
         # To create the new app profile
@@ -179,7 +188,6 @@ class K8sTopic(BaseTopic):
         self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
         self.resource_topic = ResourceTopic(db, fs, msg, auth)
         self.app_topic = AppTopic(db, fs, msg, auth)
-        # self.logger = logging.getLogger("nbi.k8s_topics")
 
     def new(self, rollback, session, indata=None, kwargs=None, headers=None):
         """
@@ -448,6 +456,39 @@ class K8sTopic(BaseTopic):
                 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
             )
 
+    def get_cluster_info(self, session, _id, item):
+        if not self.multiproject:
+            filter_db = {}
+        else:
+            filter_db = self._get_project_filter(session)
+        # To allow project&user addressing by name AS WELL AS _id
+        filter_db[BaseTopic.id_field(self.topic, _id)] = _id
+        data = self.db.get_one(self.topic, filter_db)
+        self._send_msg(item, {"_id": _id})
+        return data
+
+    def update_cluster(self, session, _id, item, indata):
+        if not self.multiproject:
+            filter_db = {}
+        else:
+            filter_db = self._get_project_filter(session)
+        # To allow project&user addressing by name AS WELL AS _id
+        filter_db[BaseTopic.id_field(self.topic, _id)] = _id
+        data = self.db.get_one(self.topic, filter_db)
+        data["operatingState"] = "PROCESSING"
+        data["resourceState"] = "IN_PROGRESS"
+        operation_params = indata
+        self.format_on_operation(
+            data,
+            item,
+            operation_params,
+        )
+        self.db.set_one(self.topic, {"_id": _id}, data)
+        op_id = data["current_operation"]
+        data = {"cluster_id": _id, "operation_id": op_id}
+        self._send_msg(item, data)
+        return op_id
+
 
 class K8saddTopic(BaseTopic):
     topic = "clusters"
@@ -590,3 +631,598 @@ class K8saddTopic(BaseTopic):
             not_send_msg=not_send_msg,
         )
         return None
+
+
+class KsusTopic(BaseTopic):
+    topic = "ksus"
+    okapkg_topic = "okas"
+    infra_topic = "k8sinfra"
+    topic_msg = "ksu"
+    schema_new = ksu_schema
+    schema_edit = ksu_schema
+
+    def __init__(self, db, fs, msg, auth):
+        BaseTopic.__init__(self, db, fs, msg, auth)
+        self.logger = logging.getLogger("nbi.ksus")
+
+    @staticmethod
+    def format_on_new(content, project_id=None, make_public=False):
+        BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
+        content["state"] = "IN_CREATION"
+        content["operatingState"] = "PROCESSING"
+        content["resourceState"] = "IN_PROGRESS"
+
+    def new(self, rollback, session, indata=None, kwargs=None, headers=None):
+        _id_list = []
+        op_id = str(uuid4())
+        for ksus in indata["ksus"]:
+            content = ksus
+            oka = content["oka"][0]
+            oka_flag = ""
+            if oka["_id"]:
+                oka_flag = "_id"
+            elif oka["sw_catalog_path"]:
+                oka_flag = "sw_catalog_path"
+
+            for okas in content["oka"]:
+                if okas["_id"] and okas["sw_catalog_path"]:
+                    raise EngineException(
+                        "Cannot create ksu with both OKA and SW catalog path",
+                        HTTPStatus.UNPROCESSABLE_ENTITY,
+                    )
+                if not okas["sw_catalog_path"]:
+                    okas.pop("sw_catalog_path")
+                elif not okas["_id"]:
+                    okas.pop("_id")
+                if oka_flag not in okas.keys():
+                    raise EngineException(
+                        "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
+                        HTTPStatus.UNPROCESSABLE_ENTITY,
+                    )
+
+            # Override descriptor with query string kwargs
+            content = self._remove_envelop(content)
+            self._update_input_with_kwargs(content, kwargs)
+            content = self._validate_input_new(input=content, force=session["force"])
+
+            # Check for unique name
+            self.check_unique_name(session, content["name"])
+
+            self.check_conflict_on_new(session, content)
+
+            operation_params = {}
+            for content_key, content_value in content.items():
+                operation_params[content_key] = content_value
+            self.format_on_new(
+                content, project_id=session["project_id"], make_public=session["public"]
+            )
+            content["current_operation"] = op_id
+            op_id = self.format_on_operation(
+                content,
+                operation_type="create",
+                operation_params=operation_params,
+            )
+            content["git_name"] = self.create_gitname(content, session)
+
+            # Update Oka_package usage state
+            for okas in content["oka"]:
+                if "_id" in okas.keys():
+                    self.update_usage_state(session, okas)
+
+            _id = self.db.create(self.topic, content)
+            rollback.append({"topic": self.topic, "_id": _id})
+
+            if not op_id:
+                op_id = content["current_operation"]
+            _id_list.append(_id)
+        data = {"ksus_list": _id_list, "operation_id": op_id}
+        self._send_msg("create", data)
+        return _id_list, op_id
+
+    def clone(self, rollback, session, _id, indata, kwargs, headers):
+        filter_db = self._get_project_filter(session)
+        filter_db[BaseTopic.id_field(self.topic, _id)] = _id
+        data = self.db.get_one(self.topic, filter_db)
+
+        data["current_operation"] = None
+        op_id = self.format_on_operation(
+            data,
+            "clone",
+            indata,
+        )
+        self.db.set_one(self.topic, {"_id": data["_id"]}, data)
+        self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
+        return op_id
+
+    def update_usage_state(self, session, oka_content):
+        _id = oka_content["_id"]
+        filter_db = self._get_project_filter(session)
+        filter_db[BaseTopic.id_field(self.topic, _id)] = _id
+
+        data = self.db.get_one(self.okapkg_topic, filter_db)
+        if data["_admin"]["usageState"] == "NOT_IN_USE":
+            usage_state_update = {
+                "_admin.usageState": "IN_USE",
+            }
+            self.db.set_one(
+                self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
+            )
+
+    def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
+        indata = self._remove_envelop(indata)
+
+        # Override descriptor with query string kwargs
+        if kwargs:
+            self._update_input_with_kwargs(indata, kwargs)
+        try:
+            if indata and session.get("set_project"):
+                raise EngineException(
+                    "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+                    HTTPStatus.UNPROCESSABLE_ENTITY,
+                )
+            # TODO self._check_edition(session, indata, _id, force)
+            if not content:
+                content = self.show(session, _id)
+            indata = self._validate_input_edit(
+                input=indata, content=content, force=session["force"]
+            )
+            operation_params = indata
+            deep_update_rfc7396(content, indata)
+
+            # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
+            _id = content.get("_id") or _id
+            content["current_operation"] = None
+            self.format_on_operation(
+                content,
+                "move",
+                operation_params,
+            )
+            if content.get("_admin"):
+                now = time()
+                content["_admin"]["modified"] = now
+            content["operatingState"] = "PROCESSING"
+            content["resourceState"] = "IN_PROGRESS"
+
+            self.db.replace(self.topic, _id, content)
+
+            op_id = content["current_operation"]
+            data = {"ksus_list": [content["_id"]], "operation_id": op_id}
+            self._send_msg("move", data)
+            return op_id
+        except ValidationError as e:
+            raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+    def check_conflict_on_edit(self, session, final_content, edit_content, _id):
+        if final_content["name"] != edit_content["name"]:
+            self.check_unique_name(session, edit_content["name"])
+        return final_content
+
+    @staticmethod
+    def format_on_edit(final_content, edit_content):
+        BaseTopic.format_on_operation(
+            final_content,
+            "update",
+            edit_content,
+        )
+        final_content["operatingState"] = "PROCESSING"
+        final_content["resourceState"] = "IN_PROGRESS"
+        if final_content.get("_admin"):
+            now = time()
+            final_content["_admin"]["modified"] = now
+        return final_content["current_operation"]
+
+    def edit(self, session, _id, indata, kwargs):
+        _id_list = []
+        op_id = str(uuid4())
+        if _id == "update":
+            for ksus in indata["ksus"]:
+                content = ksus
+                _id = content["_id"]
+                _id_list.append(_id)
+                content.pop("_id")
+                op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
+        else:
+            content = indata
+            _id_list.append(_id)
+            op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
+
+        data = {"ksus_list": _id_list, "operation_id": op_id}
+        self._send_msg("edit", data)
+        return op_id
+
+    def edit_ksu(self, session, _id, op_id, indata, kwargs):
+        content = None
+        indata = self._remove_envelop(indata)
+
+        # Override descriptor with query string kwargs
+        if kwargs:
+            self._update_input_with_kwargs(indata, kwargs)
+        try:
+            if indata and session.get("set_project"):
+                raise EngineException(
+                    "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+                    HTTPStatus.UNPROCESSABLE_ENTITY,
+                )
+            # TODO self._check_edition(session, indata, _id, force)
+            if not content:
+                content = self.show(session, _id)
+
+            for okas in indata["oka"]:
+                if not okas["_id"]:
+                    okas.pop("_id")
+                if not okas["sw_catalog_path"]:
+                    okas.pop("sw_catalog_path")
+
+            indata = self._validate_input_edit(indata, content, force=session["force"])
+
+            # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
+            _id = content.get("_id") or _id
+
+            content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+            content["current_operation"] = op_id
+            op_id = self.format_on_edit(content, indata)
+            self.db.replace(self.topic, _id, content)
+            return op_id
+        except ValidationError as e:
+            raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+    def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
+        _id_list = []
+        op_id = str(uuid4())
+        if _id == "delete":
+            for ksus in indata["ksus"]:
+                content = ksus
+                _id = content["_id"]
+                _id_list.append(_id)
+                content.pop("_id")
+                op_id = self.delete(session, _id, op_id)
+        else:
+            _id_list.append(_id)
+            op_id = self.delete(session, _id, op_id)
+
+        data = {"ksus_list": _id_list, "operation_id": op_id}
+        self._send_msg("delete", data)
+        return op_id
+
+    def delete(self, session, _id, op_id):
+        if not self.multiproject:
+            filter_q = {}
+        else:
+            filter_q = self._get_project_filter(session)
+        filter_q[self.id_field(self.topic, _id)] = _id
+        item_content = self.db.get_one(self.topic, filter_q)
+        item_content["state"] = "IN_DELETION"
+        item_content["operatingState"] = "PROCESSING"
+        item_content["resourceState"] = "IN_PROGRESS"
+        item_content["current_operation"] = op_id
+        self.format_on_operation(
+            item_content,
+            "delete",
+            None,
+        )
+        self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
+
+        if item_content["oka"][0].get("_id"):
+            used_oka = {}
+            existing_oka = []
+            for okas in item_content["oka"]:
+                used_oka["_id"] = okas["_id"]
+
+            filter = self._get_project_filter(session)
+            data = self.db.get_list(self.topic, filter)
+
+            if data:
+                for ksus in data:
+                    if ksus["_id"] != _id:
+                        for okas in ksus["oka"]:
+                            if okas["_id"] not in existing_oka:
+                                existing_oka.append(okas["_id"])
+
+            if used_oka:
+                for oka, oka_id in used_oka.items():
+                    if oka_id not in existing_oka:
+                        self.db.set_one(
+                            self.okapkg_topic,
+                            {"_id": oka_id},
+                            {"_admin.usageState": "NOT_IN_USE"},
+                        )
+        return op_id
+
+
+class OkaTopic(DescriptorTopic):
+    topic = "okas"
+    topic_msg = "oka"
+    schema_new = oka_schema
+    schema_edit = oka_schema
+
+    def __init__(self, db, fs, msg, auth):
+        super().__init__(db, fs, msg, auth)
+        self.logger = logging.getLogger("nbi.oka")
+
+    @staticmethod
+    def format_on_new(content, project_id=None, make_public=False):
+        DescriptorTopic.format_on_new(
+            content, project_id=project_id, make_public=make_public
+        )
+        content["state"] = "PENDING_CONTENT"
+        content["operatingState"] = "PROCESSING"
+        content["resourceState"] = "IN_PROGRESS"
+
+    def check_conflict_on_del(self, session, _id, db_content):
+        usage_state = db_content["_admin"]["usageState"]
+        if usage_state == "IN_USE":
+            raise EngineException(
+                "There is a KSU using this package",
+                http_code=HTTPStatus.CONFLICT,
+            )
+
+    def check_conflict_on_edit(self, session, final_content, edit_content, _id):
+        if (
+            final_content["name"] == edit_content["name"]
+            and final_content["description"] == edit_content["description"]
+        ):
+            raise EngineException(
+                "No update",
+                http_code=HTTPStatus.CONFLICT,
+            )
+        if final_content["name"] != edit_content["name"]:
+            self.check_unique_name(session, edit_content["name"])
+        return final_content
+
+    def edit(self, session, _id, indata=None, kwargs=None, content=None):
+        indata = self._remove_envelop(indata)
+
+        # Override descriptor with query string kwargs
+        if kwargs:
+            self._update_input_with_kwargs(indata, kwargs)
+        try:
+            if indata and session.get("set_project"):
+                raise EngineException(
+                    "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+                    HTTPStatus.UNPROCESSABLE_ENTITY,
+                )
+            # TODO self._check_edition(session, indata, _id, force)
+            if not content:
+                content = self.show(session, _id)
+
+            indata = self._validate_input_edit(indata, content, force=session["force"])
+
+            # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
+            _id = content.get("_id") or _id
+
+            content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+            op_id = self.format_on_edit(content, indata)
+            deep_update_rfc7396(content, indata)
+
+            self.db.replace(self.topic, _id, content)
+            return op_id
+        except ValidationError as e:
+            raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+    def delete(self, session, _id, dry_run=False, not_send_msg=None):
+        if not self.multiproject:
+            filter_q = {}
+        else:
+            filter_q = self._get_project_filter(session)
+        filter_q[self.id_field(self.topic, _id)] = _id
+        item_content = self.db.get_one(self.topic, filter_q)
+        item_content["state"] = "IN_DELETION"
+        item_content["operatingState"] = "PROCESSING"
+        self.check_conflict_on_del(session, _id, item_content)
+        item_content["current_operation"] = None
+        self.format_on_operation(
+            item_content,
+            "delete",
+            None,
+        )
+        op_id = item_content["current_operation"]
+        self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
+        self._send_msg(
+            "delete", {"oka_id": _id, "operation_id": op_id}, not_send_msg=not_send_msg
+        )
+
+    def new(self, rollback, session, indata=None, kwargs=None, headers=None):
+        # _remove_envelop
+        if indata:
+            if "userDefinedData" in indata:
+                indata = indata["userDefinedData"]
+
+        content = {"_admin": {"userDefinedData": indata, "revision": 0}}
+
+        self._update_input_with_kwargs(content, kwargs)
+        content = BaseTopic._validate_input_new(
+            self, input=kwargs, force=session["force"]
+        )
+
+        self.check_unique_name(session, content["name"])
+        operation_params = {}
+        for content_key, content_value in content.items():
+            operation_params[content_key] = content_value
+        self.format_on_new(
+            content, session["project_id"], make_public=session["public"]
+        )
+        content["current_operation"] = None
+        self.format_on_operation(
+            content,
+            operation_type="create",
+            operation_params=operation_params,
+        )
+        content["git_name"] = self.create_gitname(content, session)
+        _id = self.db.create(self.topic, content)
+        rollback.append({"topic": self.topic, "_id": _id})
+        return _id, None
+
+    def upload_content(self, session, _id, indata, kwargs, headers):
+        current_desc = self.show(session, _id)
+
+        compressed = None
+        content_type = headers.get("Content-Type")
+        if (
+            content_type
+            and "application/gzip" in content_type
+            or "application/x-gzip" in content_type
+        ):
+            compressed = "gzip"
+        if content_type and "application/zip" in content_type:
+            compressed = "zip"
+        filename = headers.get("Content-Filename")
+        if not filename and compressed:
+            filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
+        elif not filename:
+            filename = "package"
+
+        revision = 1
+        if "revision" in current_desc["_admin"]:
+            revision = current_desc["_admin"]["revision"] + 1
+
+        file_pkg = None
+        fs_rollback = []
+
+        try:
+            start = 0
+            # Rather than using a temp folder, we will store the package in a folder based on
+            # the current revision.
+            proposed_revision_path = _id + ":" + str(revision)
+            # all the content is upload here and if ok, it is rename from id_ to is folder
+
+            if start:
+                if not self.fs.file_exists(proposed_revision_path, "dir"):
+                    raise EngineException(
+                        "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
+                    )
+            else:
+                self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+                self.fs.mkdir(proposed_revision_path)
+                fs_rollback.append(proposed_revision_path)
+
+            storage = self.fs.get_params()
+            storage["folder"] = proposed_revision_path
+
+            file_path = (proposed_revision_path, filename)
+            file_pkg = self.fs.file_open(file_path, "a+b")
+
+            filename = indata.filename
+
+            if isinstance(indata, dict):
+                indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
+                file_pkg.write(indata_text.encode(encoding="utf-8"))
+            else:
+                indata_len = 0
+                indata = indata.file
+                while True:
+                    indata_text = indata.read(4096)
+                    indata_len += len(indata_text)
+                    if not indata_text:
+                        break
+                    file_pkg.write(indata_text)
+
+            # PACKAGE UPLOADED
+            file_pkg.seek(0, 0)
+            if compressed == "gzip":
+                tar = tarfile.open(mode="r", fileobj=file_pkg)
+                for tarinfo in tar:
+                    tarname = tarinfo.name
+                    tarname_path = tarname.split("/")
+                    self.logger.debug(
+                        "Tarname: {} Tarname Path: {}".format(tarname, tarname_path)
+                    )
+                storage["zipfile"] = filename
+                self.fs.file_extract(tar, proposed_revision_path)
+            else:
+                content = file_pkg.read()
+                self.logger.debug("Content: {}".format(content))
+
+            # Need to close the file package here so it can be copied from the
+            # revision to the current, unrevisioned record
+            if file_pkg:
+                file_pkg.close()
+            file_pkg = None
+
+            # Fetch both the incoming, proposed revision and the original revision so we
+            # can call a validate method to compare them
+            current_revision_path = _id + "/"
+            self.fs.sync(from_path=current_revision_path)
+            self.fs.sync(from_path=proposed_revision_path)
+
+            if revision > 1:
+                try:
+                    self._validate_descriptor_changes(
+                        _id,
+                        filename,
+                        current_revision_path,
+                        proposed_revision_path,
+                    )
+                except Exception as e:
+                    shutil.rmtree(
+                        self.fs.path + current_revision_path, ignore_errors=True
+                    )
+                    shutil.rmtree(
+                        self.fs.path + proposed_revision_path, ignore_errors=True
+                    )
+                    # Only delete the new revision.  We need to keep the original version in place
+                    # as it has not been changed.
+                    self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+                    raise e
+
+            indata = self._remove_envelop(indata)
+
+            # Override descriptor with query string kwargs
+            if kwargs:
+                self._update_input_with_kwargs(indata, kwargs)
+
+            current_desc["_admin"]["storage"] = storage
+            current_desc["_admin"]["onboardingState"] = "ONBOARDED"
+            current_desc["_admin"]["operationalState"] = "ENABLED"
+            current_desc["_admin"]["modified"] = time()
+            current_desc["_admin"]["revision"] = revision
+
+            deep_update_rfc7396(current_desc, indata)
+
+            # Copy the revision to the active package name by its original id
+            shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
+            os.rename(
+                self.fs.path + proposed_revision_path,
+                self.fs.path + current_revision_path,
+            )
+            self.fs.file_delete(current_revision_path, ignore_non_exist=True)
+            self.fs.mkdir(current_revision_path)
+            self.fs.reverse_sync(from_path=current_revision_path)
+
+            shutil.rmtree(self.fs.path + _id)
+            kwargs = {}
+            kwargs["package"] = filename
+            if headers["Method"] == "POST":
+                current_desc["state"] = "IN_CREATION"
+            elif headers["Method"] in ("PUT", "PATCH"):
+                current_desc["current_operation"] = None
+                self.format_on_operation(
+                    current_desc,
+                    "update",
+                    kwargs,
+                )
+                current_desc["operatingState"] = "PROCESSING"
+                current_desc["resourceState"] = "IN_PROGRESS"
+
+            self.db.replace(self.topic, _id, current_desc)
+
+            #  Store a copy of the package as a point in time revision
+            revision_desc = dict(current_desc)
+            revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
+            self.db.create(self.topic + "_revisions", revision_desc)
+            fs_rollback = []
+
+            op_id = current_desc["current_operation"]
+            if headers["Method"] == "POST":
+                self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
+            elif headers["Method"] == "PUT" or "PATCH":
+                self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
+
+            return True
+
+        except EngineException:
+            raise
+        finally:
+            if file_pkg:
+                file_pkg.close()
+            for file in fs_rollback:
+                self.fs.file_delete(file, ignore_non_exist=True)
index f031665..ea7a236 100644 (file)
@@ -48,6 +48,7 @@ auth_database_version = "1.0"
 nbi_server = None  # instance of Server class
 subscription_thread = None  # instance of SubscriptionThread class
 cef_logger = None
+logger = logging.getLogger("nbi.nbi")
 
 """
 North Bound Interface  (O: OSM specific; 5,X: SOL005 not implemented yet; O5: SOL005 implemented)
@@ -706,6 +707,18 @@ valid_url_methods = {
                         "METHODS": ("DELETE",),
                         "ROLE_PERMISSION": "k8scluster:id:deregister:",
                     },
+                    "get_creds": {
+                        "METHODS": ("GET",),
+                        "ROLE_PERMISSION": "k8scluster:id:get_creds:",
+                    },
+                    "scale": {
+                        "METHODS": ("POST",),
+                        "ROLE_PERMISSION": "k8scluster:id:scale:",
+                    },
+                    "upgrade": {
+                        "METHODS": ("POST",),
+                        "ROLE_PERMISSION": "k8scluster:id:upgrade:",
+                    },
                 },
                 "register": {
                     "METHODS": ("POST",),
@@ -746,6 +759,46 @@ valid_url_methods = {
             },
         }
     },
+    "ksu": {
+        "v1": {
+            "ksus": {
+                "METHODS": ("GET", "POST"),
+                "ROLE_PERMISSION": "ksu:",
+                "<ID>": {
+                    "METHODS": ("GET", "PATCH", "DELETE"),
+                    "ROLE_PERMISSION": "ksu:id:",
+                    "clone": {
+                        "METHODS": ("POST",),
+                        "ROLE_PERMISSION": "ksu:id:clone:",
+                    },
+                    "move": {
+                        "METHODS": ("POST",),
+                        "ROLE_PERMISSION": "ksu:id:move:",
+                    },
+                },
+                "update": {
+                    "METHODS": ("POST",),
+                    "ROLE_PERMISSION": "ksu:",
+                },
+                "delete": {
+                    "METHODS": ("POST",),
+                    "ROLE_PERMISSION": "ksu:",
+                },
+            },
+        }
+    },
+    "oka": {
+        "v1": {
+            "oka_packages": {
+                "METHODS": ("GET", "POST"),
+                "ROLE_PERMISSION": "oka_pkg:",
+                "<ID>": {
+                    "METHODS": ("GET", "PATCH", "DELETE", "PUT"),
+                    "ROLE_PERMISSION": "oka_pkg:id:",
+                },
+            }
+        }
+    },
 }
 
 
@@ -764,6 +817,7 @@ class Server(object):
         self.instance += 1
         self.authenticator = Authenticator(valid_url_methods, valid_query_string)
         self.engine = Engine(self.authenticator)
+        self.logger = logging.getLogger("nbi.server")
 
     def _format_in(self, kwargs):
         error_text = ""  # error_text must be initialized outside try
@@ -793,13 +847,32 @@ class Server(object):
                         "multipart/form-data"
                         in cherrypy.request.headers["Content-Type"]
                     ):
-                        if "descriptor_file" in kwargs:
-                            filecontent = kwargs.pop("descriptor_file")
+                        if (
+                            "descriptor_file" in kwargs
+                            or "package" in kwargs
+                            and "name" in kwargs
+                        ):
+                            filecontent = ""
+                            if "descriptor_file" in kwargs:
+                                filecontent = kwargs.pop("descriptor_file")
+                            if "package" in kwargs:
+                                filecontent = kwargs.pop("package")
+                            if not filecontent.file:
+                                raise NbiException(
+                                    "empty file or content", HTTPStatus.BAD_REQUEST
+                                )
+                            indata = filecontent
+                            if filecontent.content_type.value:
+                                cherrypy.request.headers[
+                                    "Content-Type"
+                                ] = filecontent.content_type.value
+                        elif "package" in kwargs:
+                            filecontent = kwargs.pop("package")
                             if not filecontent.file:
                                 raise NbiException(
                                     "empty file or content", HTTPStatus.BAD_REQUEST
                                 )
-                            indata = filecontent.file  # .read()
+                            indata = filecontent
                             if filecontent.content_type.value:
                                 cherrypy.request.headers[
                                     "Content-Type"
@@ -1550,6 +1623,8 @@ class Server(object):
                 "nspm",
                 "vnflcm",
                 "k8scluster",
+                "ksu",
+                "oka",
             ):
                 raise NbiException(
                     "URL main_topic '{}' not supported".format(main_topic),
@@ -1631,6 +1706,14 @@ class Server(object):
                     engine_topic = "resources"
                 elif topic == "app_profiles":
                     engine_topic = "apps"
+            elif main_topic == "k8scluster" and item in (
+                "upgrade",
+                "get_creds",
+                "scale",
+            ):
+                engine_topic = "k8s"
+            elif main_topic == "ksu" and engine_topic in ("ksus", "clone", "move"):
+                engine_topic = "ksus"
             if (
                 engine_topic == "vims"
             ):  # TODO this is for backward compatibility, it will be removed in the future
@@ -1686,6 +1769,10 @@ class Server(object):
                         filter_q,
                         api_req=True,
                     )
+                elif topic == "clusters" and item == "get_creds":
+                    outdata = self.engine.get_cluster_info(
+                        engine_session, engine_topic, _id, item
+                    )
                 else:
                     if item == "reports":
                         # TODO check that project_id (_id in this context) has permissions
@@ -1706,6 +1793,7 @@ class Server(object):
                     "ns_config_template",
                 ):
                     _id = cherrypy.request.headers.get("Transaction-Id")
+
                     if not _id:
                         _id, _ = self.engine.new_item(
                             rollback,
@@ -1728,6 +1816,33 @@ class Server(object):
                     else:
                         cherrypy.response.headers["Transaction-Id"] = _id
                     outdata = {"id": _id}
+                elif topic == "oka_packages":
+                    _id = cherrypy.request.headers.get("Transaction-Id")
+
+                    if not _id:
+                        _id, _ = self.engine.new_item(
+                            rollback,
+                            engine_session,
+                            engine_topic,
+                            {},
+                            kwargs,
+                            cherrypy.request.headers,
+                        )
+                    cherrypy.request.headers["method"] = cherrypy.request.method
+                    if indata:
+                        completed = self.engine.upload_content(
+                            engine_session,
+                            engine_topic,
+                            _id,
+                            indata,
+                            None,
+                            cherrypy.request.headers,
+                        )
+                    if completed:
+                        self._set_location_header(main_topic, version, topic, _id)
+                    else:
+                        cherrypy.response.headers["Transaction-Id"] = _id
+                    outdata = {"_id": _id}
                 elif topic == "ns_instances_content":
                     # creates NSR
                     _id, _ = self.engine.new_item(
@@ -1865,6 +1980,39 @@ class Server(object):
                     )
                     self._set_location_header(main_topic, version, topic, _id)
                     outdata = {"_id": _id}
+                elif topic == "ksus" and item:
+                    if item == "clone":
+                        _id = self.engine.clone(
+                            rollback,
+                            engine_session,
+                            engine_topic,
+                            _id,
+                            indata,
+                            kwargs,
+                            cherrypy.request.headers,
+                        )
+                        self._set_location_header(main_topic, version, topic, _id)
+                        outdata = {"id": _id}
+                    if item == "move":
+                        op_id = self.engine.move_ksu(
+                            engine_session, engine_topic, _id, indata, kwargs
+                        )
+                        outdata = {"op_id": op_id}
+                elif topic == "ksus" and _id == "delete":
+                    op_id = self.engine.delete_ksu(
+                        engine_session, engine_topic, _id, indata
+                    )
+                    outdata = {"op_id": op_id}
+                elif topic == "ksus" and _id == "update":
+                    op_id = self.engine.edit_item(
+                        engine_session, engine_topic, _id, indata, kwargs
+                    )
+                    outdata = {"op_id": op_id}
+                elif topic == "clusters" and item in ("upgrade", "scale"):
+                    op_id = self.engine.update_cluster(
+                        engine_session, engine_topic, _id, item, indata
+                    )
+                    outdata = {"op_id": op_id}
                 else:
                     _id, op_id = self.engine.new_item(
                         rollback,
@@ -1927,6 +2075,11 @@ class Server(object):
                             if op_id
                             else HTTPStatus.NO_CONTENT.value
                         )
+                    elif topic == "ksus":
+                        op_id = self.engine.delete_ksu(
+                            engine_session, engine_topic, _id, indata
+                        )
+                        outdata = {"op_id": op_id}
                     # if there is not any deletion in process, delete
                     elif not op_id:
                         op_id = self.engine.del_item(engine_session, engine_topic, _id)
@@ -1974,6 +2127,41 @@ class Server(object):
                     op_id = self.engine.edit(
                         engine_session, engine_topic, _id, item, indata, kwargs
                     )
+                elif topic == "oka_packages" and method == "PATCH":
+                    if kwargs:
+                        op_id = self.engine.edit_item(
+                            engine_session, engine_topic, _id, None, kwargs
+                        )
+                    if indata:
+                        if indata.get("name") or indata.get("description"):
+                            op_id = self.engine.edit_item(
+                                engine_session, engine_topic, _id, indata, kwargs
+                            )
+                        else:
+                            cherrypy.request.headers["method"] = cherrypy.request.method
+                            completed = self.engine.upload_content(
+                                engine_session,
+                                engine_topic,
+                                _id,
+                                indata,
+                                {},
+                                cherrypy.request.headers,
+                            )
+                            if not completed:
+                                cherrypy.response.headers["Transaction-Id"] = id
+                elif topic == "oka_packages" and method == "PUT":
+                    if indata:
+                        cherrypy.request.headers["method"] = cherrypy.request.method
+                        completed = self.engine.upload_content(
+                            engine_session,
+                            engine_topic,
+                            _id,
+                            indata,
+                            {},
+                            cherrypy.request.headers,
+                        )
+                        if not completed:
+                            cherrypy.response.headers["Transaction-Id"] = id
                 else:
                     op_id = self.engine.edit_item(
                         engine_session, engine_topic, _id, indata, kwargs
index 2373ee9..a1911dd 100644 (file)
@@ -1654,6 +1654,48 @@ vnf_subscription = {
     "required": ["filter", "CallbackUri"],
 }
 
+oka_schema = {
+    "title": "Create OKA package input schema",
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "type": "object",
+    "properties": {
+        "name": name_schema,
+        "description": description_schema,
+    },
+    "additionalProperties": False,
+}
+
+ksu_schema = {
+    "title": "ksu schema",
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "type": "object",
+    "properties": {
+        "name": name_schema,
+        "description": description_schema,
+        "profile": {
+            "type": "object",
+            "properties": {
+                "profile_type": string_schema,
+                "_id": id_schema,
+            },
+            "additionalProperties": False,
+        },
+        "oka": {
+            "type": "array",
+            "items": {
+                "type": "object",
+                "properties": {
+                    "_id": id_schema,
+                    "sw_catalog_path": string_schema,
+                    "transformation": object_schema,
+                },
+                "additionalProperties": False,
+            },
+        },
+    },
+    "additionalProperties": False,
+}
+
 
 class ValidationError(Exception):
     def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY):