clusterregistration_new_schema,
attach_dettach_profile_schema,
ksu_schema,
+ app_instance_schema,
oka_schema,
node_create_new_schema,
node_edit_schema,
return _id
-class AppTopic(ProfileTopic):
+class AppProfileTopic(ProfileTopic):
topic = "k8sapp"
topic_msg = "k8s_app"
schema_new = app_profile_create_new_schema
self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
self.resource_topic = ResourceTopic(db, fs, msg, auth)
- self.app_topic = AppTopic(db, fs, msg, auth)
+ self.app_topic = AppProfileTopic(db, fs, msg, auth)
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
self.resource_topic = ResourceTopic(db, fs, msg, auth)
- self.app_topic = AppTopic(db, fs, msg, auth)
+ self.app_topic = AppProfileTopic(db, fs, msg, auth)
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
class KsusTopic(ACMTopic):
topic = "ksus"
okapkg_topic = "okas"
- infra_topic = "k8sinfra"
topic_msg = "ksu"
schema_new = ksu_schema
schema_edit = ksu_schema
return op_id, not_send_msg
+class AppInstanceTopic(ACMTopic):
+ topic = "appinstances"
+ okapkg_topic = "okas"
+ topic_msg = "appinstance"
+ schema_new = app_instance_schema
+ schema_edit = app_instance_schema
+
+ def __init__(self, db, fs, msg, auth):
+ super().__init__(db, fs, msg, auth)
+ self.logger = logging.getLogger("nbi.appinstances")
+
+ @staticmethod
+ def format_on_new(content, project_id=None, make_public=False):
+ BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
+ content["current_operation"] = None
+ content["state"] = "IN_CREATION"
+ content["operatingState"] = "PROCESSING"
+ content["resourceState"] = "IN_PROGRESS"
+
+ def new(self, rollback, session, indata=None, kwargs=None, headers=None):
+ if indata.get("oka") and indata.get("sw_catalog_path"):
+ raise EngineException(
+ "Cannot create app instance with both OKA and SW catalog path",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ # Override descriptor with query string kwargs
+ content = self._remove_envelop(indata)
+ self._update_input_with_kwargs(content, kwargs)
+ content = self._validate_input_new(input=content, force=session["force"])
+
+ # Check for unique name
+ self.check_unique_name(session, content["name"])
+
+ self.check_conflict_on_new(session, content)
+
+ operation_params = {}
+ for content_key, content_value in content.items():
+ operation_params[content_key] = content_value
+ self.format_on_new(
+ content, project_id=session["project_id"], make_public=session["public"]
+ )
+ op_id = self.format_on_operation(
+ content,
+ operation_type="create",
+ operation_params=operation_params,
+ )
+ content["git_name"] = self.create_gitname(content, session)
+
+ oka_id = content.get("oka")
+ if oka_id:
+ self.update_oka_usage_state(session, oka_id)
+
+ _id = self.db.create(self.topic, content)
+ rollback.append({"topic": self.topic, "_id": _id})
+ self._send_msg("create", content=content)
+ return _id, op_id
+
+ def update_oka_usage_state(self, session, oka_id):
+ filter_db = self._get_project_filter(session)
+ filter_db[BaseTopic.id_field(self.topic, oka_id)] = oka_id
+
+ data = self.db.get_one(self.okapkg_topic, filter_db)
+ if data["_admin"]["usageState"] == "NOT_IN_USE":
+ usage_state_update = {
+ "_admin.usageState": "IN_USE",
+ }
+ self.db.set_one(
+ self.okapkg_topic, {"_id": oka_id}, update_dict=usage_state_update
+ )
+
+ def check_conflict_on_edit(self, session, final_content, edit_content, _id):
+ if final_content["name"] != edit_content["name"]:
+ self.check_unique_name(session, edit_content["name"])
+ return final_content
+
+ @staticmethod
+ def format_on_edit(final_content, edit_content):
+ op_id = ACMTopic.format_on_operation(
+ final_content,
+ "update",
+ edit_content,
+ )
+ final_content["operatingState"] = "PROCESSING"
+ final_content["resourceState"] = "IN_PROGRESS"
+ if final_content.get("_admin"):
+ now = time()
+ final_content["_admin"]["modified"] = now
+ return op_id
+
+ def edit(self, session, _id, indata, kwargs):
+ content = None
+ indata = self._remove_envelop(indata)
+
+ # Override descriptor with query string kwargs
+ if kwargs:
+ self._update_input_with_kwargs(indata, kwargs)
+ try:
+ if indata and session.get("set_project"):
+ raise EngineException(
+ "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ # TODO self._check_edition(session, indata, _id, force)
+ if not content:
+ content = self.show(session, _id)
+
+ indata = self._validate_input_edit(indata, content, force=session["force"])
+
+ # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
+ _id = content.get("_id") or _id
+
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
+ op_id = self.format_on_edit(content, indata)
+ self.db.replace(self.topic, _id, content)
+ return op_id
+ except ValidationError as e:
+ raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ def update_appinstance(self, session, _id, item, indata):
+ if not self.multiproject:
+ filter_db = {}
+ else:
+ filter_db = self._get_project_filter(session)
+ # To allow project&user addressing by name AS WELL AS _id
+ filter_db[BaseTopic.id_field(self.topic, _id)] = _id
+ validate_input(indata, app_instance_schema)
+ data = self.db.get_one(self.topic, filter_db)
+ operation_params = {}
+ data["operatingState"] = "PROCESSING"
+ data["resourceState"] = "IN_PROGRESS"
+ operation_params = indata
+ op_id = self.format_on_operation(
+ data,
+ item,
+ operation_params,
+ )
+ self.db.set_one(self.topic, {"_id": _id}, data)
+ data = {"appinstance": _id, "operation_id": op_id}
+ self._send_msg(item, data)
+ return op_id
+
+ def delete(self, session, _id, not_send_msg=None):
+ if not self.multiproject:
+ filter_q = {}
+ else:
+ filter_q = self._get_project_filter(session)
+ filter_q[self.id_field(self.topic, _id)] = _id
+ item_content = self.db.get_one(self.topic, filter_q)
+ item_content["state"] = "IN_DELETION"
+ item_content["operatingState"] = "PROCESSING"
+ item_content["resourceState"] = "IN_PROGRESS"
+ op_id = self.format_on_operation(
+ item_content,
+ "delete",
+ None,
+ )
+ self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
+
+ # Check if the profile exists. If it doesn't, no message should be sent to Kafka
+ not_send_msg2 = not_send_msg
+ profile_id = item_content["profile"]["_id"]
+ profile_type = item_content["profile"]["profile_type"]
+ profile_collection_map = {
+ "app_profiles": "k8sapp",
+ "resource_profiles": "k8sresource",
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ }
+ profile_collection = profile_collection_map[profile_type]
+ profile_content = self.db.get_one(
+ profile_collection, {"_id": profile_id}, fail_on_empty=False
+ )
+ if not profile_content:
+ self.db.del_one(self.topic, filter_q)
+ not_send_msg2 = True
+ self._send_msg(
+ "delete",
+ {"appinstance": _id, "operation_id": op_id, "force": session["force"]},
+ not_send_msg=not_send_msg2,
+ )
+ return op_id
+
+
class OkaTopic(DescriptorTopic, ACMOperationTopic):
topic = "okas"
topic_msg = "oka"