From: yshah Date: Fri, 5 Jul 2024 13:06:31 +0000 (+0000) Subject: Feature 11022,11025: Advanced Cluster Management X-Git-Tag: release-v16.0-start~5 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F71%2F14471%2F13;p=osm%2FNBI.git Feature 11022,11025: Advanced Cluster Management Change-Id: I4168366f79b11de15f6808977fb15a3ff270f519 Signed-off-by: yshah --- diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index 788ae3e..f43ea18 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -387,6 +387,7 @@ class CommonVimWimSdn(BaseTopic): """ super().format_on_new(content, project_id=project_id, make_public=make_public) content["schema_version"] = schema_version = "1.11" + content["key"] = "registered" # encrypt passwords if content.get(self.password_to_encrypt): @@ -1020,6 +1021,7 @@ class UserTopicAuth(UserTopic): try: if not content: content = self.show(session, _id) + indata = self._validate_input_edit(indata, content, force=session["force"]) content = self.check_conflict_on_edit(session, content, indata, _id=_id) # self.format_on_edit(content, indata) diff --git a/osm_nbi/base_topic.py b/osm_nbi/base_topic.py index ab60dc1..ce3400d 100644 --- a/osm_nbi/base_topic.py +++ b/osm_nbi/base_topic.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import logging +import logging import random import string from uuid import uuid4 @@ -202,7 +202,7 @@ class BaseTopic: self.db = db self.fs = fs self.msg = msg - # self.logger = logging.getLogger("nbi.base") + self.logger = logging.getLogger("nbi.base") self.auth = auth @staticmethod @@ -267,7 +267,6 @@ class BaseTopic: :return: The same input content, or a changed version of it. """ if self.schema_edit: - # self.logger.info("the schema edit is : {}".format(self.schema_edit)) validate_input(input, self.schema_edit) return input @@ -555,7 +554,6 @@ class BaseTopic: # Only perform SOL005 projection if we are serving an external request if api_req: self.sol005_projection(data) - return data # TODO transform data for SOL005 URL requests diff --git a/osm_nbi/descriptor_topics.py b/osm_nbi/descriptor_topics.py index e8c609a..287dbda 100644 --- a/osm_nbi/descriptor_topics.py +++ b/osm_nbi/descriptor_topics.py @@ -326,6 +326,7 @@ class DescriptorTopic(BaseTopic): HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE, ) file_pkg = self.fs.file_open(file_path, "a+b") + if isinstance(indata, dict): indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False) file_pkg.write(indata_text.encode(encoding="utf-8")) @@ -817,9 +818,9 @@ class VnfdTopic(DescriptorTopic): return _filter = self._get_project_filter(session) - # check vnfrs using this vnfd _filter["vnfd-id"] = _id + if self.db.get_list("vnfrs", _filter): raise EngineException( "There is at least one VNF instance using this descriptor", @@ -829,6 +830,7 @@ class VnfdTopic(DescriptorTopic): # check NSD referencing this VNFD del _filter["vnfd-id"] _filter["vnfd-id"] = descriptor_id + if self.db.get_list("nsds", _filter): raise EngineException( "There is at least one NS package referencing this descriptor", @@ -2253,7 +2255,7 @@ class NsConfigTemplateTopic(DescriptorTopic): # check NS CONFIG TEMPLATE used by NS ns_config_template_id = _id - # self.logger.info("The id is : {}".format(_id)) + if self.db.get_list( "nsrs", {"instantiate_params.nsConfigTemplateId": ns_config_template_id} ): diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index e419e8a..bac0802 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -60,6 +60,8 @@ from osm_nbi.k8s_topics import ( AppTopic, ResourceTopic, K8saddTopic, + KsusTopic, + OkaTopic, ) from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic from osm_nbi.pmjobs_topics import PmJobsTopic @@ -106,6 +108,8 @@ class Engine(object): "apps": AppTopic, "resources": ResourceTopic, "k8sops": K8saddTopic, + "ksus": KsusTopic, + "oka_packages": OkaTopic, # [NEW_TOPIC]: add an entry here # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters } @@ -319,6 +323,51 @@ class Engine(object): session, _id, indata, kwargs, headers ) + def clone( + self, rollback, session, topic, _id, indata=None, kwargs=None, headers=None + ): + if topic not in self.map_topic: + raise EngineException( + "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR + ) + with self.write_lock: + return self.map_topic[topic].clone( + rollback, session, _id, indata, kwargs, headers + ) + + def move_ksu(self, session, topic, _id, indata=None, kwargs=None): + if topic not in self.map_topic: + raise EngineException( + "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR + ) + + with self.write_lock: + return self.map_topic[topic].move_ksu(session, _id, indata, kwargs) + + def get_cluster_info(self, session, topic, _id, item): + if topic not in self.map_topic: + raise EngineException( + "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR + ) + return self.map_topic[topic].get_cluster_info(session, _id, item) + + def update_cluster(self, session, topic, _id, item, indata): + if topic not in self.map_topic: + raise EngineException( + "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR + ) + return self.map_topic[topic].update_cluster(session, _id, item, indata) + + def delete_ksu(self, session, topic, _id, indata): + if topic not in self.map_topic: + raise EngineException( + "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR + ) + with self.write_lock: + return self.map_topic[topic].delete_ksu( + session, _id, indata, not_send_msg=None + ) + def get_item_list(self, session, topic, filter_q=None, api_req=False): """ Get a list of items diff --git a/osm_nbi/instance_topics.py b/osm_nbi/instance_topics.py index 96dc007..6308e56 100644 --- a/osm_nbi/instance_topics.py +++ b/osm_nbi/instance_topics.py @@ -2427,7 +2427,6 @@ class NsLcmOpTopic(BaseTopic): new_sw_version = vnfd.get("software-version", "1.0") if new_sw_version != old_sw_version: vnf_index = vnfr["member-vnf-index-ref"] - # self.logger.info("nsr {}".format(nsr)) for vdu in vnfd["vdu"]: self.nsrtopic._add_shared_volumes_to_nsr( vdu, vnfd, nsr, vnf_index, latest_vnfd_revision @@ -3087,10 +3086,6 @@ class NsiLcmOpTopic(BaseTopic): self.db.set_one("nsis", {"_id": nsir["_id"]}, _update) except (DbException, EngineException) as e: if e.http_code == HTTPStatus.NOT_FOUND: - # self.logger.info( - # logging_prefix - # + "skipping NS={} because not found".format(nsr_id) - # ) pass else: raise diff --git a/osm_nbi/k8s_topics.py b/osm_nbi/k8s_topics.py index 9a6eb06..2ce01e0 100644 --- a/osm_nbi/k8s_topics.py +++ b/osm_nbi/k8s_topics.py @@ -13,11 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import logging +import logging +import yaml +import tarfile +import shutil +import os from http import HTTPStatus +from uuid import uuid4 +from time import time from osm_nbi.base_topic import BaseTopic, EngineException +from osm_nbi.descriptor_topics import DescriptorTopic from osm_nbi.validation import ( ValidationError, clustercreation_new_schema, @@ -31,12 +38,17 @@ from osm_nbi.validation import ( resource_profile_create_edit_schema, k8scluster_new_schema, attach_dettach_profile_schema, + ksu_schema, + oka_schema, ) -from osm_common.dbbase import DbException +from osm_common.dbbase import deep_update_rfc7396, DbException from osm_common.msgbase import MsgException from osm_common.fsbase import FsException -__author__ = "Shrinithi R " +__author__ = ( + "Shrinithi R ", + "Shahithya Y ", +) class InfraContTopic(BaseTopic): @@ -47,7 +59,6 @@ class InfraContTopic(BaseTopic): def __init__(self, db, fs, msg, auth): BaseTopic.__init__(self, db, fs, msg, auth) - # self.logger = logging.getLogger("nbi.k8s_topics") def new(self, rollback, session, indata=None, kwargs=None, headers=None): # To create the new infra controller profile @@ -79,7 +90,6 @@ class InfraConfTopic(BaseTopic): def __init__(self, db, fs, msg, auth): BaseTopic.__init__(self, db, fs, msg, auth) - # self.logger = logging.getLogger("nbi.k8s_topics") def new(self, rollback, session, indata=None, kwargs=None, headers=None): # To create the new infra config profile @@ -111,7 +121,6 @@ class AppTopic(BaseTopic): def __init__(self, db, fs, msg, auth): BaseTopic.__init__(self, db, fs, msg, auth) - # self.logger = logging.getLogger("nbi.k8s_topics") def new(self, rollback, session, indata=None, kwargs=None, headers=None): # To create the new app profile @@ -179,7 +188,6 @@ class K8sTopic(BaseTopic): self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth) self.resource_topic = ResourceTopic(db, fs, msg, auth) self.app_topic = AppTopic(db, fs, msg, auth) - # self.logger = logging.getLogger("nbi.k8s_topics") def new(self, rollback, session, indata=None, kwargs=None, headers=None): """ @@ -448,6 +456,39 @@ class K8sTopic(BaseTopic): f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY ) + def get_cluster_info(self, session, _id, item): + if not self.multiproject: + filter_db = {} + else: + filter_db = self._get_project_filter(session) + # To allow project&user addressing by name AS WELL AS _id + filter_db[BaseTopic.id_field(self.topic, _id)] = _id + data = self.db.get_one(self.topic, filter_db) + self._send_msg(item, {"_id": _id}) + return data + + def update_cluster(self, session, _id, item, indata): + if not self.multiproject: + filter_db = {} + else: + filter_db = self._get_project_filter(session) + # To allow project&user addressing by name AS WELL AS _id + filter_db[BaseTopic.id_field(self.topic, _id)] = _id + data = self.db.get_one(self.topic, filter_db) + data["operatingState"] = "PROCESSING" + data["resourceState"] = "IN_PROGRESS" + operation_params = indata + self.format_on_operation( + data, + item, + operation_params, + ) + self.db.set_one(self.topic, {"_id": _id}, data) + op_id = data["current_operation"] + data = {"cluster_id": _id, "operation_id": op_id} + self._send_msg(item, data) + return op_id + class K8saddTopic(BaseTopic): topic = "clusters" @@ -590,3 +631,598 @@ class K8saddTopic(BaseTopic): not_send_msg=not_send_msg, ) return None + + +class KsusTopic(BaseTopic): + topic = "ksus" + okapkg_topic = "okas" + infra_topic = "k8sinfra" + topic_msg = "ksu" + schema_new = ksu_schema + schema_edit = ksu_schema + + def __init__(self, db, fs, msg, auth): + BaseTopic.__init__(self, db, fs, msg, auth) + self.logger = logging.getLogger("nbi.ksus") + + @staticmethod + def format_on_new(content, project_id=None, make_public=False): + BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public) + content["state"] = "IN_CREATION" + content["operatingState"] = "PROCESSING" + content["resourceState"] = "IN_PROGRESS" + + def new(self, rollback, session, indata=None, kwargs=None, headers=None): + _id_list = [] + op_id = str(uuid4()) + for ksus in indata["ksus"]: + content = ksus + oka = content["oka"][0] + oka_flag = "" + if oka["_id"]: + oka_flag = "_id" + elif oka["sw_catalog_path"]: + oka_flag = "sw_catalog_path" + + for okas in content["oka"]: + if okas["_id"] and okas["sw_catalog_path"]: + raise EngineException( + "Cannot create ksu with both OKA and SW catalog path", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + if not okas["sw_catalog_path"]: + okas.pop("sw_catalog_path") + elif not okas["_id"]: + okas.pop("_id") + if oka_flag not in okas.keys(): + raise EngineException( + "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + # Override descriptor with query string kwargs + content = self._remove_envelop(content) + self._update_input_with_kwargs(content, kwargs) + content = self._validate_input_new(input=content, force=session["force"]) + + # Check for unique name + self.check_unique_name(session, content["name"]) + + self.check_conflict_on_new(session, content) + + operation_params = {} + for content_key, content_value in content.items(): + operation_params[content_key] = content_value + self.format_on_new( + content, project_id=session["project_id"], make_public=session["public"] + ) + content["current_operation"] = op_id + op_id = self.format_on_operation( + content, + operation_type="create", + operation_params=operation_params, + ) + content["git_name"] = self.create_gitname(content, session) + + # Update Oka_package usage state + for okas in content["oka"]: + if "_id" in okas.keys(): + self.update_usage_state(session, okas) + + _id = self.db.create(self.topic, content) + rollback.append({"topic": self.topic, "_id": _id}) + + if not op_id: + op_id = content["current_operation"] + _id_list.append(_id) + data = {"ksus_list": _id_list, "operation_id": op_id} + self._send_msg("create", data) + return _id_list, op_id + + def clone(self, rollback, session, _id, indata, kwargs, headers): + filter_db = self._get_project_filter(session) + filter_db[BaseTopic.id_field(self.topic, _id)] = _id + data = self.db.get_one(self.topic, filter_db) + + data["current_operation"] = None + op_id = self.format_on_operation( + data, + "clone", + indata, + ) + self.db.set_one(self.topic, {"_id": data["_id"]}, data) + self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id}) + return op_id + + def update_usage_state(self, session, oka_content): + _id = oka_content["_id"] + filter_db = self._get_project_filter(session) + filter_db[BaseTopic.id_field(self.topic, _id)] = _id + + data = self.db.get_one(self.okapkg_topic, filter_db) + if data["_admin"]["usageState"] == "NOT_IN_USE": + usage_state_update = { + "_admin.usageState": "IN_USE", + } + self.db.set_one( + self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update + ) + + def move_ksu(self, session, _id, indata=None, kwargs=None, content=None): + indata = self._remove_envelop(indata) + + # Override descriptor with query string kwargs + if kwargs: + self._update_input_with_kwargs(indata, kwargs) + try: + if indata and session.get("set_project"): + raise EngineException( + "Cannot edit content and set to project (query string SET_PROJECT) at same time", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + # TODO self._check_edition(session, indata, _id, force) + if not content: + content = self.show(session, _id) + indata = self._validate_input_edit( + input=indata, content=content, force=session["force"] + ) + operation_params = indata + deep_update_rfc7396(content, indata) + + # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name + _id = content.get("_id") or _id + content["current_operation"] = None + self.format_on_operation( + content, + "move", + operation_params, + ) + if content.get("_admin"): + now = time() + content["_admin"]["modified"] = now + content["operatingState"] = "PROCESSING" + content["resourceState"] = "IN_PROGRESS" + + self.db.replace(self.topic, _id, content) + + op_id = content["current_operation"] + data = {"ksus_list": [content["_id"]], "operation_id": op_id} + self._send_msg("move", data) + return op_id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + + def check_conflict_on_edit(self, session, final_content, edit_content, _id): + if final_content["name"] != edit_content["name"]: + self.check_unique_name(session, edit_content["name"]) + return final_content + + @staticmethod + def format_on_edit(final_content, edit_content): + BaseTopic.format_on_operation( + final_content, + "update", + edit_content, + ) + final_content["operatingState"] = "PROCESSING" + final_content["resourceState"] = "IN_PROGRESS" + if final_content.get("_admin"): + now = time() + final_content["_admin"]["modified"] = now + return final_content["current_operation"] + + def edit(self, session, _id, indata, kwargs): + _id_list = [] + op_id = str(uuid4()) + if _id == "update": + for ksus in indata["ksus"]: + content = ksus + _id = content["_id"] + _id_list.append(_id) + content.pop("_id") + op_id = self.edit_ksu(session, _id, op_id, content, kwargs) + else: + content = indata + _id_list.append(_id) + op_id = self.edit_ksu(session, _id, op_id, content, kwargs) + + data = {"ksus_list": _id_list, "operation_id": op_id} + self._send_msg("edit", data) + return op_id + + def edit_ksu(self, session, _id, op_id, indata, kwargs): + content = None + indata = self._remove_envelop(indata) + + # Override descriptor with query string kwargs + if kwargs: + self._update_input_with_kwargs(indata, kwargs) + try: + if indata and session.get("set_project"): + raise EngineException( + "Cannot edit content and set to project (query string SET_PROJECT) at same time", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + # TODO self._check_edition(session, indata, _id, force) + if not content: + content = self.show(session, _id) + + for okas in indata["oka"]: + if not okas["_id"]: + okas.pop("_id") + if not okas["sw_catalog_path"]: + okas.pop("sw_catalog_path") + + indata = self._validate_input_edit(indata, content, force=session["force"]) + + # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name + _id = content.get("_id") or _id + + content = self.check_conflict_on_edit(session, content, indata, _id=_id) + content["current_operation"] = op_id + op_id = self.format_on_edit(content, indata) + self.db.replace(self.topic, _id, content) + return op_id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + + def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None): + _id_list = [] + op_id = str(uuid4()) + if _id == "delete": + for ksus in indata["ksus"]: + content = ksus + _id = content["_id"] + _id_list.append(_id) + content.pop("_id") + op_id = self.delete(session, _id, op_id) + else: + _id_list.append(_id) + op_id = self.delete(session, _id, op_id) + + data = {"ksus_list": _id_list, "operation_id": op_id} + self._send_msg("delete", data) + return op_id + + def delete(self, session, _id, op_id): + if not self.multiproject: + filter_q = {} + else: + filter_q = self._get_project_filter(session) + filter_q[self.id_field(self.topic, _id)] = _id + item_content = self.db.get_one(self.topic, filter_q) + item_content["state"] = "IN_DELETION" + item_content["operatingState"] = "PROCESSING" + item_content["resourceState"] = "IN_PROGRESS" + item_content["current_operation"] = op_id + self.format_on_operation( + item_content, + "delete", + None, + ) + self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content) + + if item_content["oka"][0].get("_id"): + used_oka = {} + existing_oka = [] + for okas in item_content["oka"]: + used_oka["_id"] = okas["_id"] + + filter = self._get_project_filter(session) + data = self.db.get_list(self.topic, filter) + + if data: + for ksus in data: + if ksus["_id"] != _id: + for okas in ksus["oka"]: + if okas["_id"] not in existing_oka: + existing_oka.append(okas["_id"]) + + if used_oka: + for oka, oka_id in used_oka.items(): + if oka_id not in existing_oka: + self.db.set_one( + self.okapkg_topic, + {"_id": oka_id}, + {"_admin.usageState": "NOT_IN_USE"}, + ) + return op_id + + +class OkaTopic(DescriptorTopic): + topic = "okas" + topic_msg = "oka" + schema_new = oka_schema + schema_edit = oka_schema + + def __init__(self, db, fs, msg, auth): + super().__init__(db, fs, msg, auth) + self.logger = logging.getLogger("nbi.oka") + + @staticmethod + def format_on_new(content, project_id=None, make_public=False): + DescriptorTopic.format_on_new( + content, project_id=project_id, make_public=make_public + ) + content["state"] = "PENDING_CONTENT" + content["operatingState"] = "PROCESSING" + content["resourceState"] = "IN_PROGRESS" + + def check_conflict_on_del(self, session, _id, db_content): + usage_state = db_content["_admin"]["usageState"] + if usage_state == "IN_USE": + raise EngineException( + "There is a KSU using this package", + http_code=HTTPStatus.CONFLICT, + ) + + def check_conflict_on_edit(self, session, final_content, edit_content, _id): + if ( + final_content["name"] == edit_content["name"] + and final_content["description"] == edit_content["description"] + ): + raise EngineException( + "No update", + http_code=HTTPStatus.CONFLICT, + ) + if final_content["name"] != edit_content["name"]: + self.check_unique_name(session, edit_content["name"]) + return final_content + + def edit(self, session, _id, indata=None, kwargs=None, content=None): + indata = self._remove_envelop(indata) + + # Override descriptor with query string kwargs + if kwargs: + self._update_input_with_kwargs(indata, kwargs) + try: + if indata and session.get("set_project"): + raise EngineException( + "Cannot edit content and set to project (query string SET_PROJECT) at same time", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + # TODO self._check_edition(session, indata, _id, force) + if not content: + content = self.show(session, _id) + + indata = self._validate_input_edit(indata, content, force=session["force"]) + + # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name + _id = content.get("_id") or _id + + content = self.check_conflict_on_edit(session, content, indata, _id=_id) + op_id = self.format_on_edit(content, indata) + deep_update_rfc7396(content, indata) + + self.db.replace(self.topic, _id, content) + return op_id + except ValidationError as e: + raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) + + def delete(self, session, _id, dry_run=False, not_send_msg=None): + if not self.multiproject: + filter_q = {} + else: + filter_q = self._get_project_filter(session) + filter_q[self.id_field(self.topic, _id)] = _id + item_content = self.db.get_one(self.topic, filter_q) + item_content["state"] = "IN_DELETION" + item_content["operatingState"] = "PROCESSING" + self.check_conflict_on_del(session, _id, item_content) + item_content["current_operation"] = None + self.format_on_operation( + item_content, + "delete", + None, + ) + op_id = item_content["current_operation"] + self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content) + self._send_msg( + "delete", {"oka_id": _id, "operation_id": op_id}, not_send_msg=not_send_msg + ) + + def new(self, rollback, session, indata=None, kwargs=None, headers=None): + # _remove_envelop + if indata: + if "userDefinedData" in indata: + indata = indata["userDefinedData"] + + content = {"_admin": {"userDefinedData": indata, "revision": 0}} + + self._update_input_with_kwargs(content, kwargs) + content = BaseTopic._validate_input_new( + self, input=kwargs, force=session["force"] + ) + + self.check_unique_name(session, content["name"]) + operation_params = {} + for content_key, content_value in content.items(): + operation_params[content_key] = content_value + self.format_on_new( + content, session["project_id"], make_public=session["public"] + ) + content["current_operation"] = None + self.format_on_operation( + content, + operation_type="create", + operation_params=operation_params, + ) + content["git_name"] = self.create_gitname(content, session) + _id = self.db.create(self.topic, content) + rollback.append({"topic": self.topic, "_id": _id}) + return _id, None + + def upload_content(self, session, _id, indata, kwargs, headers): + current_desc = self.show(session, _id) + + compressed = None + content_type = headers.get("Content-Type") + if ( + content_type + and "application/gzip" in content_type + or "application/x-gzip" in content_type + ): + compressed = "gzip" + if content_type and "application/zip" in content_type: + compressed = "zip" + filename = headers.get("Content-Filename") + if not filename and compressed: + filename = "package.tar.gz" if compressed == "gzip" else "package.zip" + elif not filename: + filename = "package" + + revision = 1 + if "revision" in current_desc["_admin"]: + revision = current_desc["_admin"]["revision"] + 1 + + file_pkg = None + fs_rollback = [] + + try: + start = 0 + # Rather than using a temp folder, we will store the package in a folder based on + # the current revision. + proposed_revision_path = _id + ":" + str(revision) + # all the content is upload here and if ok, it is rename from id_ to is folder + + if start: + if not self.fs.file_exists(proposed_revision_path, "dir"): + raise EngineException( + "invalid Transaction-Id header", HTTPStatus.NOT_FOUND + ) + else: + self.fs.file_delete(proposed_revision_path, ignore_non_exist=True) + self.fs.mkdir(proposed_revision_path) + fs_rollback.append(proposed_revision_path) + + storage = self.fs.get_params() + storage["folder"] = proposed_revision_path + + file_path = (proposed_revision_path, filename) + file_pkg = self.fs.file_open(file_path, "a+b") + + filename = indata.filename + + if isinstance(indata, dict): + indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False) + file_pkg.write(indata_text.encode(encoding="utf-8")) + else: + indata_len = 0 + indata = indata.file + while True: + indata_text = indata.read(4096) + indata_len += len(indata_text) + if not indata_text: + break + file_pkg.write(indata_text) + + # PACKAGE UPLOADED + file_pkg.seek(0, 0) + if compressed == "gzip": + tar = tarfile.open(mode="r", fileobj=file_pkg) + for tarinfo in tar: + tarname = tarinfo.name + tarname_path = tarname.split("/") + self.logger.debug( + "Tarname: {} Tarname Path: {}".format(tarname, tarname_path) + ) + storage["zipfile"] = filename + self.fs.file_extract(tar, proposed_revision_path) + else: + content = file_pkg.read() + self.logger.debug("Content: {}".format(content)) + + # Need to close the file package here so it can be copied from the + # revision to the current, unrevisioned record + if file_pkg: + file_pkg.close() + file_pkg = None + + # Fetch both the incoming, proposed revision and the original revision so we + # can call a validate method to compare them + current_revision_path = _id + "/" + self.fs.sync(from_path=current_revision_path) + self.fs.sync(from_path=proposed_revision_path) + + if revision > 1: + try: + self._validate_descriptor_changes( + _id, + filename, + current_revision_path, + proposed_revision_path, + ) + except Exception as e: + shutil.rmtree( + self.fs.path + current_revision_path, ignore_errors=True + ) + shutil.rmtree( + self.fs.path + proposed_revision_path, ignore_errors=True + ) + # Only delete the new revision. We need to keep the original version in place + # as it has not been changed. + self.fs.file_delete(proposed_revision_path, ignore_non_exist=True) + raise e + + indata = self._remove_envelop(indata) + + # Override descriptor with query string kwargs + if kwargs: + self._update_input_with_kwargs(indata, kwargs) + + current_desc["_admin"]["storage"] = storage + current_desc["_admin"]["onboardingState"] = "ONBOARDED" + current_desc["_admin"]["operationalState"] = "ENABLED" + current_desc["_admin"]["modified"] = time() + current_desc["_admin"]["revision"] = revision + + deep_update_rfc7396(current_desc, indata) + + # Copy the revision to the active package name by its original id + shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True) + os.rename( + self.fs.path + proposed_revision_path, + self.fs.path + current_revision_path, + ) + self.fs.file_delete(current_revision_path, ignore_non_exist=True) + self.fs.mkdir(current_revision_path) + self.fs.reverse_sync(from_path=current_revision_path) + + shutil.rmtree(self.fs.path + _id) + kwargs = {} + kwargs["package"] = filename + if headers["Method"] == "POST": + current_desc["state"] = "IN_CREATION" + elif headers["Method"] in ("PUT", "PATCH"): + current_desc["current_operation"] = None + self.format_on_operation( + current_desc, + "update", + kwargs, + ) + current_desc["operatingState"] = "PROCESSING" + current_desc["resourceState"] = "IN_PROGRESS" + + self.db.replace(self.topic, _id, current_desc) + + # Store a copy of the package as a point in time revision + revision_desc = dict(current_desc) + revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"]) + self.db.create(self.topic + "_revisions", revision_desc) + fs_rollback = [] + + op_id = current_desc["current_operation"] + if headers["Method"] == "POST": + self._send_msg("create", {"oka_id": _id, "operation_id": op_id}) + elif headers["Method"] == "PUT" or "PATCH": + self._send_msg("edit", {"oka_id": _id, "operation_id": op_id}) + + return True + + except EngineException: + raise + finally: + if file_pkg: + file_pkg.close() + for file in fs_rollback: + self.fs.file_delete(file, ignore_non_exist=True) diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index f031665..ea7a236 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -48,6 +48,7 @@ auth_database_version = "1.0" nbi_server = None # instance of Server class subscription_thread = None # instance of SubscriptionThread class cef_logger = None +logger = logging.getLogger("nbi.nbi") """ North Bound Interface (O: OSM specific; 5,X: SOL005 not implemented yet; O5: SOL005 implemented) @@ -706,6 +707,18 @@ valid_url_methods = { "METHODS": ("DELETE",), "ROLE_PERMISSION": "k8scluster:id:deregister:", }, + "get_creds": { + "METHODS": ("GET",), + "ROLE_PERMISSION": "k8scluster:id:get_creds:", + }, + "scale": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "k8scluster:id:scale:", + }, + "upgrade": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "k8scluster:id:upgrade:", + }, }, "register": { "METHODS": ("POST",), @@ -746,6 +759,46 @@ valid_url_methods = { }, } }, + "ksu": { + "v1": { + "ksus": { + "METHODS": ("GET", "POST"), + "ROLE_PERMISSION": "ksu:", + "": { + "METHODS": ("GET", "PATCH", "DELETE"), + "ROLE_PERMISSION": "ksu:id:", + "clone": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "ksu:id:clone:", + }, + "move": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "ksu:id:move:", + }, + }, + "update": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "ksu:", + }, + "delete": { + "METHODS": ("POST",), + "ROLE_PERMISSION": "ksu:", + }, + }, + } + }, + "oka": { + "v1": { + "oka_packages": { + "METHODS": ("GET", "POST"), + "ROLE_PERMISSION": "oka_pkg:", + "": { + "METHODS": ("GET", "PATCH", "DELETE", "PUT"), + "ROLE_PERMISSION": "oka_pkg:id:", + }, + } + } + }, } @@ -764,6 +817,7 @@ class Server(object): self.instance += 1 self.authenticator = Authenticator(valid_url_methods, valid_query_string) self.engine = Engine(self.authenticator) + self.logger = logging.getLogger("nbi.server") def _format_in(self, kwargs): error_text = "" # error_text must be initialized outside try @@ -793,13 +847,32 @@ class Server(object): "multipart/form-data" in cherrypy.request.headers["Content-Type"] ): - if "descriptor_file" in kwargs: - filecontent = kwargs.pop("descriptor_file") + if ( + "descriptor_file" in kwargs + or "package" in kwargs + and "name" in kwargs + ): + filecontent = "" + if "descriptor_file" in kwargs: + filecontent = kwargs.pop("descriptor_file") + if "package" in kwargs: + filecontent = kwargs.pop("package") + if not filecontent.file: + raise NbiException( + "empty file or content", HTTPStatus.BAD_REQUEST + ) + indata = filecontent + if filecontent.content_type.value: + cherrypy.request.headers[ + "Content-Type" + ] = filecontent.content_type.value + elif "package" in kwargs: + filecontent = kwargs.pop("package") if not filecontent.file: raise NbiException( "empty file or content", HTTPStatus.BAD_REQUEST ) - indata = filecontent.file # .read() + indata = filecontent if filecontent.content_type.value: cherrypy.request.headers[ "Content-Type" @@ -1550,6 +1623,8 @@ class Server(object): "nspm", "vnflcm", "k8scluster", + "ksu", + "oka", ): raise NbiException( "URL main_topic '{}' not supported".format(main_topic), @@ -1631,6 +1706,14 @@ class Server(object): engine_topic = "resources" elif topic == "app_profiles": engine_topic = "apps" + elif main_topic == "k8scluster" and item in ( + "upgrade", + "get_creds", + "scale", + ): + engine_topic = "k8s" + elif main_topic == "ksu" and engine_topic in ("ksus", "clone", "move"): + engine_topic = "ksus" if ( engine_topic == "vims" ): # TODO this is for backward compatibility, it will be removed in the future @@ -1686,6 +1769,10 @@ class Server(object): filter_q, api_req=True, ) + elif topic == "clusters" and item == "get_creds": + outdata = self.engine.get_cluster_info( + engine_session, engine_topic, _id, item + ) else: if item == "reports": # TODO check that project_id (_id in this context) has permissions @@ -1706,6 +1793,7 @@ class Server(object): "ns_config_template", ): _id = cherrypy.request.headers.get("Transaction-Id") + if not _id: _id, _ = self.engine.new_item( rollback, @@ -1728,6 +1816,33 @@ class Server(object): else: cherrypy.response.headers["Transaction-Id"] = _id outdata = {"id": _id} + elif topic == "oka_packages": + _id = cherrypy.request.headers.get("Transaction-Id") + + if not _id: + _id, _ = self.engine.new_item( + rollback, + engine_session, + engine_topic, + {}, + kwargs, + cherrypy.request.headers, + ) + cherrypy.request.headers["method"] = cherrypy.request.method + if indata: + completed = self.engine.upload_content( + engine_session, + engine_topic, + _id, + indata, + None, + cherrypy.request.headers, + ) + if completed: + self._set_location_header(main_topic, version, topic, _id) + else: + cherrypy.response.headers["Transaction-Id"] = _id + outdata = {"_id": _id} elif topic == "ns_instances_content": # creates NSR _id, _ = self.engine.new_item( @@ -1865,6 +1980,39 @@ class Server(object): ) self._set_location_header(main_topic, version, topic, _id) outdata = {"_id": _id} + elif topic == "ksus" and item: + if item == "clone": + _id = self.engine.clone( + rollback, + engine_session, + engine_topic, + _id, + indata, + kwargs, + cherrypy.request.headers, + ) + self._set_location_header(main_topic, version, topic, _id) + outdata = {"id": _id} + if item == "move": + op_id = self.engine.move_ksu( + engine_session, engine_topic, _id, indata, kwargs + ) + outdata = {"op_id": op_id} + elif topic == "ksus" and _id == "delete": + op_id = self.engine.delete_ksu( + engine_session, engine_topic, _id, indata + ) + outdata = {"op_id": op_id} + elif topic == "ksus" and _id == "update": + op_id = self.engine.edit_item( + engine_session, engine_topic, _id, indata, kwargs + ) + outdata = {"op_id": op_id} + elif topic == "clusters" and item in ("upgrade", "scale"): + op_id = self.engine.update_cluster( + engine_session, engine_topic, _id, item, indata + ) + outdata = {"op_id": op_id} else: _id, op_id = self.engine.new_item( rollback, @@ -1927,6 +2075,11 @@ class Server(object): if op_id else HTTPStatus.NO_CONTENT.value ) + elif topic == "ksus": + op_id = self.engine.delete_ksu( + engine_session, engine_topic, _id, indata + ) + outdata = {"op_id": op_id} # if there is not any deletion in process, delete elif not op_id: op_id = self.engine.del_item(engine_session, engine_topic, _id) @@ -1974,6 +2127,41 @@ class Server(object): op_id = self.engine.edit( engine_session, engine_topic, _id, item, indata, kwargs ) + elif topic == "oka_packages" and method == "PATCH": + if kwargs: + op_id = self.engine.edit_item( + engine_session, engine_topic, _id, None, kwargs + ) + if indata: + if indata.get("name") or indata.get("description"): + op_id = self.engine.edit_item( + engine_session, engine_topic, _id, indata, kwargs + ) + else: + cherrypy.request.headers["method"] = cherrypy.request.method + completed = self.engine.upload_content( + engine_session, + engine_topic, + _id, + indata, + {}, + cherrypy.request.headers, + ) + if not completed: + cherrypy.response.headers["Transaction-Id"] = id + elif topic == "oka_packages" and method == "PUT": + if indata: + cherrypy.request.headers["method"] = cherrypy.request.method + completed = self.engine.upload_content( + engine_session, + engine_topic, + _id, + indata, + {}, + cherrypy.request.headers, + ) + if not completed: + cherrypy.response.headers["Transaction-Id"] = id else: op_id = self.engine.edit_item( engine_session, engine_topic, _id, indata, kwargs diff --git a/osm_nbi/validation.py b/osm_nbi/validation.py index 2373ee9..a1911dd 100644 --- a/osm_nbi/validation.py +++ b/osm_nbi/validation.py @@ -1654,6 +1654,48 @@ vnf_subscription = { "required": ["filter", "CallbackUri"], } +oka_schema = { + "title": "Create OKA package input schema", + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "name": name_schema, + "description": description_schema, + }, + "additionalProperties": False, +} + +ksu_schema = { + "title": "ksu schema", + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "name": name_schema, + "description": description_schema, + "profile": { + "type": "object", + "properties": { + "profile_type": string_schema, + "_id": id_schema, + }, + "additionalProperties": False, + }, + "oka": { + "type": "array", + "items": { + "type": "object", + "properties": { + "_id": id_schema, + "sw_catalog_path": string_schema, + "transformation": object_schema, + }, + "additionalProperties": False, + }, + }, + }, + "additionalProperties": False, +} + class ValidationError(Exception): def __init__(self, message, http_code=HTTPStatus.UNPROCESSABLE_ENTITY):