blob: a17c293176d0328d349db27597f16deb074251f4 [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
yshah53cc9eb2024-07-05 13:06:31 +000018import shutil
19import os
rshri2d386cb2024-07-05 14:35:51 +000020from http import HTTPStatus
21
yshah53cc9eb2024-07-05 13:06:31 +000022from time import time
rshri2d386cb2024-07-05 14:35:51 +000023from osm_nbi.base_topic import BaseTopic, EngineException
shrinithi28d887f2025-01-08 05:27:19 +000024from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic
rshri2d386cb2024-07-05 14:35:51 +000025
yshah53cc9eb2024-07-05 13:06:31 +000026from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000027from osm_nbi.validation import (
28 ValidationError,
yshah99122b82024-11-18 07:05:29 +000029 validate_input,
garciadeblasf30e33d2025-09-15 15:42:15 +020030 cluster_creation_new_schema,
yshah99122b82024-11-18 07:05:29 +000031 cluster_edit_schema,
32 cluster_update_schema,
rshri2d386cb2024-07-05 14:35:51 +000033 infra_controller_profile_create_new_schema,
34 infra_config_profile_create_new_schema,
35 app_profile_create_new_schema,
36 resource_profile_create_new_schema,
37 infra_controller_profile_create_edit_schema,
38 infra_config_profile_create_edit_schema,
39 app_profile_create_edit_schema,
40 resource_profile_create_edit_schema,
garciadeblasf30e33d2025-09-15 15:42:15 +020041 cluster_registration_new_schema,
rshri2d386cb2024-07-05 14:35:51 +000042 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000043 ksu_schema,
garciadeblasb798f452025-08-05 18:21:26 +020044 app_instance_schema,
garciadeblased401462026-01-16 02:51:15 +010045 app_instance_edit_schema,
46 app_instance_update_schema,
yshah53cc9eb2024-07-05 13:06:31 +000047 oka_schema,
yshahd23c6a52025-06-13 05:49:31 +000048 node_create_new_schema,
49 node_edit_schema,
rshri2d386cb2024-07-05 14:35:51 +000050)
yshah53cc9eb2024-07-05 13:06:31 +000051from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000052from osm_common.msgbase import MsgException
53from osm_common.fsbase import FsException
54
yshah53cc9eb2024-07-05 13:06:31 +000055__author__ = (
56 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
57 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
58)
rshri2d386cb2024-07-05 14:35:51 +000059
60
shrinithi28d887f2025-01-08 05:27:19 +000061class InfraContTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000062 topic = "k8sinfra_controller"
63 topic_msg = "k8s_infra_controller"
64 schema_new = infra_controller_profile_create_new_schema
65 schema_edit = infra_controller_profile_create_edit_schema
66
67 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000068 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000069
70 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
71 # To create the new infra controller profile
72 return self.new_profile(rollback, session, indata, kwargs, headers)
73
74 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
75 # To create the default infra controller profile while creating the cluster
76 return self.default_profile(rollback, session, indata, kwargs, headers)
77
78 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +000079 check = {"infra_controller_profiles": _id}
80 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +020081 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +000082 return _id
rshri2d386cb2024-07-05 14:35:51 +000083
84
shrinithi28d887f2025-01-08 05:27:19 +000085class InfraConfTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000086 topic = "k8sinfra_config"
87 topic_msg = "k8s_infra_config"
88 schema_new = infra_config_profile_create_new_schema
89 schema_edit = infra_config_profile_create_edit_schema
90
91 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000092 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000093
94 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
95 # To create the new infra config profile
96 return self.new_profile(rollback, session, indata, kwargs, headers)
97
98 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
99 # To create the default infra config profile while creating the cluster
100 return self.default_profile(rollback, session, indata, kwargs, headers)
101
102 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000103 check = {"infra_config_profiles": _id}
104 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200105 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000106 return _id
rshri2d386cb2024-07-05 14:35:51 +0000107
108
garciadeblasb798f452025-08-05 18:21:26 +0200109class AppProfileTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000110 topic = "k8sapp"
111 topic_msg = "k8s_app"
112 schema_new = app_profile_create_new_schema
113 schema_edit = app_profile_create_edit_schema
114
115 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000116 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000117
118 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
119 # To create the new app profile
120 return self.new_profile(rollback, session, indata, kwargs, headers)
121
122 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
123 # To create the default app profile while creating the cluster
124 return self.default_profile(rollback, session, indata, kwargs, headers)
125
126 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000127 check = {"app_profiles": _id}
128 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200129 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000130 return _id
rshri2d386cb2024-07-05 14:35:51 +0000131
132
shrinithi28d887f2025-01-08 05:27:19 +0000133class ResourceTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000134 topic = "k8sresource"
135 topic_msg = "k8s_resource"
136 schema_new = resource_profile_create_new_schema
137 schema_edit = resource_profile_create_edit_schema
138
139 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000140 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000141
142 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
143 # To create the new resource profile
144 return self.new_profile(rollback, session, indata, kwargs, headers)
145
146 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
147 # To create the default resource profile while creating the cluster
148 return self.default_profile(rollback, session, indata, kwargs, headers)
149
150 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000151 check = {"resource_profiles": _id}
152 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200153 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000154 return _id
rshri2d386cb2024-07-05 14:35:51 +0000155
156
shrinithi28d887f2025-01-08 05:27:19 +0000157class ClusterTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000158 topic = "clusters"
159 topic_msg = "cluster"
garciadeblasf30e33d2025-09-15 15:42:15 +0200160 schema_new = cluster_creation_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000161 schema_edit = attach_dettach_profile_schema
162
163 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000164 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000165 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
166 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
167 self.resource_topic = ResourceTopic(db, fs, msg, auth)
garciadeblasb798f452025-08-05 18:21:26 +0200168 self.app_topic = AppProfileTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000169
garciadeblasbecc7052024-11-20 12:04:53 +0100170 @staticmethod
171 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000172 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100173 content["current_operation"] = None
174
rshri2d386cb2024-07-05 14:35:51 +0000175 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
176 """
177 Creates a new k8scluster into database.
178 :param rollback: list to append the created items at database in case a rollback must be done
179 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
180 :param indata: params to be used for the k8cluster
181 :param kwargs: used to override the indata
182 :param headers: http request headers
183 :return: the _id of k8scluster created at database. Or an exception of type
184 EngineException, ValidationError, DbException, FsException, MsgException.
185 Note: Exceptions are not captured on purpose. They should be captured at called
186 """
rshri41b2db92025-06-11 11:17:42 +0000187
rshri2d386cb2024-07-05 14:35:51 +0000188 step = "checking quotas" # first step must be defined outside try
189 try:
rshri41b2db92025-06-11 11:17:42 +0000190 if self.multiproject:
191 self.check_quota(session)
192
193 content = self._remove_envelop(indata)
194
rshri2d386cb2024-07-05 14:35:51 +0000195 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000196 self.cluster_unique_name_check(session, indata["name"])
rshri41b2db92025-06-11 11:17:42 +0000197
rshri2d386cb2024-07-05 14:35:51 +0000198 step = "validating input parameters"
rshri41b2db92025-06-11 11:17:42 +0000199 self._update_input_with_kwargs(content, kwargs)
200
201 content = self._validate_input_new(content, session, force=session["force"])
202
203 operation_params = indata.copy()
204
205 self.check_conflict_on_new(session, content)
206 self.format_on_new(
207 content, project_id=session["project_id"], make_public=session["public"]
208 )
rshri2d386cb2024-07-05 14:35:51 +0000209
210 step = "filling cluster details from input data"
rshri41b2db92025-06-11 11:17:42 +0000211 content = self._create_cluster(
212 content, rollback, session, indata, kwargs, headers
rshri2d386cb2024-07-05 14:35:51 +0000213 )
214
215 step = "creating cluster at database"
rshri41b2db92025-06-11 11:17:42 +0000216 _id = self.db.create(self.topic, content)
217
rshri2d386cb2024-07-05 14:35:51 +0000218 op_id = self.format_on_operation(
rshri41b2db92025-06-11 11:17:42 +0000219 content,
rshri2d386cb2024-07-05 14:35:51 +0000220 "create",
221 operation_params,
222 )
rshri41b2db92025-06-11 11:17:42 +0000223
garciadeblas6e88d9c2024-08-15 10:55:04 +0200224 pubkey, privkey = self._generate_age_key()
rshri41b2db92025-06-11 11:17:42 +0000225 content["age_pubkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200226 pubkey, schema_version="1.11", salt=_id
227 )
rshri41b2db92025-06-11 11:17:42 +0000228 content["age_privkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200229 privkey, schema_version="1.11", salt=_id
230 )
rshri41b2db92025-06-11 11:17:42 +0000231
garciadeblas6e88d9c2024-08-15 10:55:04 +0200232 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000233 rollback.append({"topic": self.topic, "_id": _id})
rshri41b2db92025-06-11 11:17:42 +0000234 self.db.set_one("clusters", {"_id": _id}, content)
rshri2d386cb2024-07-05 14:35:51 +0000235 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
236
rshri50e34dc2024-12-02 03:10:39 +0000237 # To add the content in old collection "k8sclusters"
rshri41b2db92025-06-11 11:17:42 +0000238 self.add_to_old_collection(content, session)
rshri50e34dc2024-12-02 03:10:39 +0000239
rshri2d386cb2024-07-05 14:35:51 +0000240 return _id, None
241 except (
242 ValidationError,
243 EngineException,
244 DbException,
245 MsgException,
246 FsException,
247 ) as e:
248 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
249
rshri41b2db92025-06-11 11:17:42 +0000250 def _validate_input_new(self, content, session, force=False):
251 # validating vim and checking the mandatory parameters
252 vim_type = self.check_vim(session, content["vim_account"])
253
254 # for aws
255 if vim_type == "aws":
256 self._aws_check(content)
257
258 # for azure and gcp
259 elif vim_type in ["azure", "gcp"]:
260 self._params_check(content)
261
262 return super()._validate_input_new(content, force=session["force"])
263
264 def _aws_check(self, indata):
265 if "node_count" in indata or "node_size" in indata:
266 raise ValueError("node_count and node_size are not allowed for AWS")
267 return
268
269 def _params_check(self, indata):
270 if "node_count" not in indata and "node_size" not in indata:
271 raise ValueError("node_count and node_size are mandatory parameter")
272 return
273
274 def _create_cluster(self, content, rollback, session, indata, kwargs, headers):
275 private_subnet = indata.get("private_subnet")
276 public_subnet = indata.get("public_subnet")
277
278 # Enforce: if private_subnet is provided, public_subnet must also be provided
279 if (private_subnet and not public_subnet) or (
280 public_subnet and not private_subnet
281 ):
282 raise ValueError(
283 "'public_subnet' must be provided if 'private_subnet' is given and viceversa."
284 )
285
286 # private Subnet validation
287 if private_subnet:
288 count = len(private_subnet)
289 if count != 2:
290 raise ValueError(
291 f"private_subnet must contain exactly 2 items, got {count}"
292 )
293
294 # public Subnet validation
295 public_subnet = indata.get("public_subnet")
296 if public_subnet:
297 count = len(public_subnet)
298 if count != 1:
299 raise ValueError(
300 f"public_subnet must contain exactly 1 items, got {count}"
301 )
302
303 content["infra_controller_profiles"] = [
304 self._create_default_profiles(
305 rollback, session, indata, kwargs, headers, self.infra_contr_topic
306 )
307 ]
308 content["infra_config_profiles"] = [
309 self._create_default_profiles(
310 rollback, session, indata, kwargs, headers, self.infra_conf_topic
311 )
312 ]
313 content["resource_profiles"] = [
314 self._create_default_profiles(
315 rollback, session, indata, kwargs, headers, self.resource_topic
316 )
317 ]
318 content["app_profiles"] = [
319 self._create_default_profiles(
320 rollback, session, indata, kwargs, headers, self.app_topic
321 )
322 ]
323 content["created"] = "true"
324 content["state"] = "IN_CREATION"
325 content["operatingState"] = "PROCESSING"
326 content["git_name"] = self.create_gitname(content, session)
327 content["resourceState"] = "IN_PROGRESS.REQUEST_RECEIVED"
328
rshri2d386cb2024-07-05 14:35:51 +0000329 # Get the vim_account details
330 vim_account_details = self.db.get_one(
rshri41b2db92025-06-11 11:17:42 +0000331 "vim_accounts", {"name": content["vim_account"]}
rshri2d386cb2024-07-05 14:35:51 +0000332 )
333
garciadeblas616d4c72025-08-05 09:15:33 +0200334 # Add optional fields if they don't exist in the request
rshri41b2db92025-06-11 11:17:42 +0000335 if "region_name" not in indata:
336 region_name = vim_account_details.get("config", {}).get("region_name")
337 if region_name:
338 content["region_name"] = region_name
339
340 if "resource_group" not in indata:
341 resource_group = vim_account_details.get("config", {}).get("resource_group")
342 if resource_group:
343 content["resource_group"] = resource_group
344
345 version = "k8s_version" in content
346 if not version:
garciadeblas616d4c72025-08-05 09:15:33 +0200347 content["k8s_version"] = "1.32"
348 # Additional cluster information, specific for each cluster type
349 content["config"] = indata.get("config", {})
rshri41b2db92025-06-11 11:17:42 +0000350 content["node_count"] = indata.get("node_count", 0)
yshahd23c6a52025-06-13 05:49:31 +0000351 content["ksu_count"] = 0
garciadeblas616d4c72025-08-05 09:15:33 +0200352 self.logger.info(f"content is : {content}")
rshri41b2db92025-06-11 11:17:42 +0000353 return content
rshri2d386cb2024-07-05 14:35:51 +0000354
355 def check_vim(self, session, name):
356 try:
357 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
358 if vim_account_details is not None:
rshri41b2db92025-06-11 11:17:42 +0000359 return vim_account_details["vim_type"]
rshri2d386cb2024-07-05 14:35:51 +0000360 except ValidationError as e:
361 raise EngineException(
362 e,
363 HTTPStatus.UNPROCESSABLE_ENTITY,
364 )
365
366 def _create_default_profiles(
367 self, rollback, session, indata, kwargs, headers, topic
368 ):
369 topic = self.to_select_topic(topic)
370 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
371 return default_profiles
372
373 def to_select_topic(self, topic):
374 if topic == "infra_controller_profiles":
375 topic = self.infra_contr_topic
376 elif topic == "infra_config_profiles":
377 topic = self.infra_conf_topic
378 elif topic == "resource_profiles":
379 topic = self.resource_topic
380 elif topic == "app_profiles":
381 topic = self.app_topic
382 return topic
383
384 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
385 try:
386 filter_q = self._get_project_filter(session)
387 filter_q[self.id_field(self.topic, _id)] = _id
388 content = self.db.get_one(self.topic, filter_q)
389 existing_profiles = []
390 topic = None
391 topic = self.to_select_topic(profile)
392 for profile_id in content[profile]:
393 data = topic.show(session, profile_id, filter_q, api_req)
394 existing_profiles.append(data)
395 return existing_profiles
396 except ValidationError as e:
397 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
398
399 def state_check(self, profile_id, session, topic):
400 topic = self.to_select_topic(topic)
401 content = topic.show(session, profile_id, filter_q=None, api_req=False)
402 state = content["state"]
403 if state == "CREATED":
404 return
405 else:
406 raise EngineException(
407 f" {profile_id} is not in created state",
408 HTTPStatus.UNPROCESSABLE_ENTITY,
409 )
410
411 def edit(self, session, _id, item, indata=None, kwargs=None):
rshri50e34dc2024-12-02 03:10:39 +0000412 if item not in (
yshah99122b82024-11-18 07:05:29 +0000413 "infra_controller_profiles",
414 "infra_config_profiles",
415 "app_profiles",
416 "resource_profiles",
417 ):
418 self.schema_edit = cluster_edit_schema
419 super().edit(session, _id, indata=item, kwargs=kwargs, content=None)
rshri2d386cb2024-07-05 14:35:51 +0000420 else:
yshah99122b82024-11-18 07:05:29 +0000421 indata = self._remove_envelop(indata)
422 indata = self._validate_input_edit(
423 indata, content=None, force=session["force"]
424 )
425 if indata.get("add_profile"):
426 self.add_profile(session, _id, item, indata)
427 elif indata.get("remove_profile"):
428 self.remove_profile(session, _id, item, indata)
429 else:
430 error_msg = "Add / remove operation is only applicable"
431 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
rshri2d386cb2024-07-05 14:35:51 +0000432
yshah00a620a2025-01-16 12:06:40 +0000433 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
garciadeblas1a980602025-12-03 08:58:42 +0100434 check_dict = self.db.get_one(self.topic, {"_id": _id})
435 if "name" in indata and check_dict["name"] != indata["name"]:
yshah00a620a2025-01-16 12:06:40 +0000436 self.check_unique_name(session, indata["name"])
437 _filter = {"name": indata["name"]}
438 topic_list = [
439 "k8sclusters",
440 "k8sinfra_controller",
441 "k8sinfra_config",
442 "k8sapp",
443 "k8sresource",
444 ]
445 # Check unique name for k8scluster and profiles
446 for topic in topic_list:
447 if self.db.get_one(
448 topic, _filter, fail_on_empty=False, fail_on_more=False
449 ):
450 raise EngineException(
451 "name '{}' already exists for {}".format(indata["name"], topic),
452 HTTPStatus.CONFLICT,
453 )
454 # Replace name in k8scluster and profiles
455 for topic in topic_list:
garciadeblas1a980602025-12-03 08:58:42 +0100456 data = self.db.get_one(topic, {"name": check_dict["name"]})
yshah00a620a2025-01-16 12:06:40 +0000457 data["name"] = indata["name"]
458 self.db.replace(topic, data["_id"], data)
459 return True
460
rshri2d386cb2024-07-05 14:35:51 +0000461 def add_profile(self, session, _id, item, indata=None):
yshahf8f07632025-01-17 04:46:03 +0000462 check = {"cluster": _id, item: indata["add_profile"][0]["id"]}
463 self.check_dependency(check)
rshri2d386cb2024-07-05 14:35:51 +0000464 indata = self._remove_envelop(indata)
465 operation_params = indata
466 profile_id = indata["add_profile"][0]["id"]
467 # check state
468 self.state_check(profile_id, session, item)
469 filter_q = self._get_project_filter(session)
470 filter_q[self.id_field(self.topic, _id)] = _id
471 content = self.db.get_one(self.topic, filter_q)
472 profile_list = content[item]
473
474 if profile_id not in profile_list:
475 content["operatingState"] = "PROCESSING"
rshri2d386cb2024-07-05 14:35:51 +0000476 op_id = self.format_on_operation(
477 content,
478 "add",
479 operation_params,
480 )
481 self.db.set_one("clusters", {"_id": content["_id"]}, content)
482 self._send_msg(
483 "add",
484 {
485 "cluster_id": _id,
486 "profile_id": profile_id,
487 "profile_type": item,
488 "operation_id": op_id,
489 },
490 )
491 else:
492 raise EngineException(
493 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
494 )
495
496 def _get_default_profiles(self, session, topic):
497 topic = self.to_select_topic(topic)
498 existing_profiles = topic.list(session, filter_q=None, api_req=False)
499 default_profiles = [
500 profile["_id"]
501 for profile in existing_profiles
502 if profile.get("default", False)
503 ]
504 return default_profiles
505
506 def remove_profile(self, session, _id, item, indata):
yshahf8f07632025-01-17 04:46:03 +0000507 check = {"cluster": _id, item: indata["remove_profile"][0]["id"]}
508 self.check_dependency(check)
rshri2d386cb2024-07-05 14:35:51 +0000509 indata = self._remove_envelop(indata)
510 operation_params = indata
511 profile_id = indata["remove_profile"][0]["id"]
512 filter_q = self._get_project_filter(session)
513 filter_q[self.id_field(self.topic, _id)] = _id
514 content = self.db.get_one(self.topic, filter_q)
515 profile_list = content[item]
516
517 default_profiles = self._get_default_profiles(session, item)
518
519 if profile_id in default_profiles:
520 raise EngineException(
521 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
522 )
523 if profile_id in profile_list:
rshri2d386cb2024-07-05 14:35:51 +0000524 op_id = self.format_on_operation(
525 content,
526 "remove",
527 operation_params,
528 )
529 self.db.set_one("clusters", {"_id": content["_id"]}, content)
530 self._send_msg(
531 "remove",
532 {
533 "cluster_id": _id,
534 "profile_id": profile_id,
535 "profile_type": item,
536 "operation_id": op_id,
537 },
538 )
539 else:
540 raise EngineException(
541 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
542 )
543
shahithyab9eb4142024-10-17 05:51:39 +0000544 def get_cluster_creds(self, session, _id, item):
yshah53cc9eb2024-07-05 13:06:31 +0000545 if not self.multiproject:
546 filter_db = {}
547 else:
548 filter_db = self._get_project_filter(session)
yshah53cc9eb2024-07-05 13:06:31 +0000549 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
garciadeblasbecc7052024-11-20 12:04:53 +0100550 operation_params = None
shahithyab9eb4142024-10-17 05:51:39 +0000551 data = self.db.get_one(self.topic, filter_db)
garciadeblasbecc7052024-11-20 12:04:53 +0100552 op_id = self.format_on_operation(data, item, operation_params)
shahithyab9eb4142024-10-17 05:51:39 +0000553 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
554 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id})
555 return op_id
556
557 def get_cluster_creds_file(self, session, _id, item, op_id):
558 if not self.multiproject:
559 filter_db = {}
560 else:
561 filter_db = self._get_project_filter(session)
562 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
shahithya8bded112024-10-15 08:01:44 +0000563
564 data = self.db.get_one(self.topic, filter_db)
shahithyab9eb4142024-10-17 05:51:39 +0000565 creds_flag = None
566 for operations in data["operationHistory"]:
567 if operations["op_id"] == op_id:
568 creds_flag = operations["result"]
569 self.logger.info("Creds Flag: {}".format(creds_flag))
shahithya8bded112024-10-15 08:01:44 +0000570
shahithyab9eb4142024-10-17 05:51:39 +0000571 if creds_flag is True:
572 credentials = data["credentials"]
shahithya8bded112024-10-15 08:01:44 +0000573
shahithyab9eb4142024-10-17 05:51:39 +0000574 file_pkg = None
575 current_path = _id
shahithya8bded112024-10-15 08:01:44 +0000576
shahithyab9eb4142024-10-17 05:51:39 +0000577 self.fs.file_delete(current_path, ignore_non_exist=True)
578 self.fs.mkdir(current_path)
579 filename = "credentials.yaml"
580 file_path = (current_path, filename)
581 self.logger.info("File path: {}".format(file_path))
582 file_pkg = self.fs.file_open(file_path, "a+b")
shahithya8bded112024-10-15 08:01:44 +0000583
shahithyab9eb4142024-10-17 05:51:39 +0000584 credentials_yaml = yaml.safe_dump(
585 credentials, indent=4, default_flow_style=False
586 )
587 file_pkg.write(credentials_yaml.encode(encoding="utf-8"))
shahithya8bded112024-10-15 08:01:44 +0000588
shahithyab9eb4142024-10-17 05:51:39 +0000589 if file_pkg:
590 file_pkg.close()
591 file_pkg = None
592 self.fs.sync(from_path=current_path)
shahithya8bded112024-10-15 08:01:44 +0000593
shahithyab9eb4142024-10-17 05:51:39 +0000594 return (
595 self.fs.file_open((current_path, filename), "rb"),
596 "text/plain",
597 )
598 else:
599 raise EngineException(
600 "Not possible to get the credentials of the cluster",
601 HTTPStatus.UNPROCESSABLE_ENTITY,
602 )
yshah53cc9eb2024-07-05 13:06:31 +0000603
yshahd23c6a52025-06-13 05:49:31 +0000604 def update_item(self, session, _id, item, indata):
yshah53cc9eb2024-07-05 13:06:31 +0000605 if not self.multiproject:
606 filter_db = {}
607 else:
608 filter_db = self._get_project_filter(session)
609 # To allow project&user addressing by name AS WELL AS _id
610 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
yshah99122b82024-11-18 07:05:29 +0000611 validate_input(indata, cluster_update_schema)
yshah53cc9eb2024-07-05 13:06:31 +0000612 data = self.db.get_one(self.topic, filter_db)
yshah99122b82024-11-18 07:05:29 +0000613 operation_params = {}
yshah53cc9eb2024-07-05 13:06:31 +0000614 data["operatingState"] = "PROCESSING"
615 data["resourceState"] = "IN_PROGRESS"
616 operation_params = indata
yshahd0c876f2024-11-11 09:24:48 +0000617 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000618 data,
619 item,
620 operation_params,
621 )
622 self.db.set_one(self.topic, {"_id": _id}, data)
yshah53cc9eb2024-07-05 13:06:31 +0000623 data = {"cluster_id": _id, "operation_id": op_id}
624 self._send_msg(item, data)
625 return op_id
626
shrinithi28d887f2025-01-08 05:27:19 +0000627 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
628 op_id = self.common_delete(_id, db_content)
yshah781ce732025-02-28 08:56:13 +0000629 return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]}
rshri2d386cb2024-07-05 14:35:51 +0000630
shrinithi28d887f2025-01-08 05:27:19 +0000631 def delete(self, session, _id, dry_run=False, not_send_msg=None):
garciadeblas1a980602025-12-03 08:58:42 +0100632 check_dict = {"cluster": _id}
633 self.check_dependency(check_dict, operation_type="delete")
shrinithi28d887f2025-01-08 05:27:19 +0000634 filter_q = self._get_project_filter(session)
635 filter_q[self.id_field(self.topic, _id)] = _id
garciadeblas1a980602025-12-03 08:58:42 +0100636 check_dict = self.db.get_one(self.topic, filter_q)
637 op_id = check_dict["current_operation"]
638 if check_dict["created"] == "false":
shrinithi28d887f2025-01-08 05:27:19 +0000639 raise EngineException(
garciadeblas3d1d6272025-02-04 11:55:36 +0100640 "Cannot delete registered cluster. Please deregister.",
shrinithi28d887f2025-01-08 05:27:19 +0000641 HTTPStatus.UNPROCESSABLE_ENTITY,
642 )
garciadeblas14fed6f2025-04-02 12:53:22 +0200643 super().delete(session, _id, dry_run, not_send_msg)
shrinithi75492bd2025-03-21 18:37:44 +0000644 return op_id
shrinithi28d887f2025-01-08 05:27:19 +0000645
646
yshahd23c6a52025-06-13 05:49:31 +0000647class NodeGroupTopic(ACMTopic):
648 topic = "nodegroups"
649 topic_msg = "nodegroup"
650 schema_new = node_create_new_schema
651 schema_edit = node_edit_schema
652
653 def __init__(self, db, fs, msg, auth):
654 BaseTopic.__init__(self, db, fs, msg, auth)
655
656 @staticmethod
657 def format_on_new(content, project_id=None, make_public=False):
658 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
659 content["current_operation"] = None
660 content["state"] = "IN_CREATION"
661 content["operatingState"] = "PROCESSING"
662 content["resourceState"] = "IN_PROGRESS"
663
664 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
665 self.logger.info(f"Indata: {indata}")
666 self.check_unique_name(session, indata["name"])
667
668 indata = self._remove_envelop(indata)
669 self._update_input_with_kwargs(indata, kwargs)
670 if not indata.get("private_subnet") and not indata.get("public_subnet"):
671 raise EngineException(
672 "Please provide atleast one subnet",
673 HTTPStatus.UNPROCESSABLE_ENTITY,
674 )
675 content = self._validate_input_new(indata, session["force"])
676
677 self.logger.info(f"Indata: {indata}")
678 self.logger.info(f"Content: {content}")
679 cluster_id = content["cluster_id"]
680 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
681 private_subnet = db_cluster.get("private_subnet")
682 public_subnet = db_cluster.get("public_subnet")
683 if content.get("private_subnet"):
684 for subnet in content["private_subnet"]:
685 if subnet not in private_subnet:
686 raise EngineException(
687 "No External subnet is used to add nodegroup",
688 HTTPStatus.UNPROCESSABLE_ENTITY,
689 )
690 if content.get("public_subnet"):
691 for subnet in content["public_subnet"]:
692 if subnet not in public_subnet:
693 raise EngineException(
694 "No External subnet is used to add nodegroup",
695 HTTPStatus.UNPROCESSABLE_ENTITY,
696 )
697
698 operation_params = {}
699 for content_key, content_value in content.items():
700 operation_params[content_key] = content_value
701 self.format_on_new(
702 content, session["project_id"], make_public=session["public"]
703 )
704 content["git_name"] = self.create_gitname(content, session)
705 self.logger.info(f"Operation Params: {operation_params}")
706 op_id = self.format_on_operation(
707 content,
708 "create",
709 operation_params,
710 )
711 node_count = db_cluster.get("node_count")
712 new_node_count = node_count + 1
713 self.logger.info(f"New Node count: {new_node_count}")
714 db_cluster["node_count"] = new_node_count
715 self.db.set_one("clusters", {"_id": cluster_id}, db_cluster)
716 _id = self.db.create(self.topic, content)
717 self._send_msg("add_nodegroup", {"nodegroup_id": _id, "operation_id": op_id})
718 return _id, op_id
719
720 def list(self, session, filter_q=None, api_req=False):
721 db_filter = {}
722 if filter_q.get("cluster_id"):
723 db_filter["cluster_id"] = filter_q.get("cluster_id")
724 data_list = self.db.get_list(self.topic, db_filter)
725 cluster_data = self.db.get_one("clusters", {"_id": db_filter["cluster_id"]})
726 self.logger.info(f"Cluster Data: {cluster_data}")
727 self.logger.info(f"Data: {data_list}")
728 if filter_q.get("cluster_id"):
729 outdata = {}
730 outdata["count"] = cluster_data["node_count"]
731 outdata["data"] = data_list
732 self.logger.info(f"Outdata: {outdata}")
733 return outdata
734 if api_req:
735 data_list = [self.sol005_projection(inst) for inst in data_list]
736 return data_list
737
738 def delete(self, session, _id, dry_run=False, not_send_msg=None):
739 if not self.multiproject:
740 filter_q = {}
741 else:
742 filter_q = self._get_project_filter(session)
743 filter_q[self.id_field(self.topic, _id)] = _id
744 item_content = self.db.get_one(self.topic, filter_q)
745 item_content["state"] = "IN_DELETION"
746 item_content["operatingState"] = "PROCESSING"
747 item_content["resourceState"] = "IN_PROGRESS"
748 self.check_conflict_on_del(session, _id, item_content)
749 op_id = self.format_on_operation(
750 item_content,
751 "delete",
752 None,
753 )
754 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
755 self._send_msg(
756 "delete_nodegroup",
757 {"nodegroup_id": _id, "operation_id": op_id},
758 not_send_msg=not_send_msg,
759 )
760 return op_id
761
762 def update_item(self, session, _id, item, indata):
763 content = None
764 try:
765 if not content:
766 content = self.db.get_one(self.topic, {"_id": _id})
767 indata = self._validate_input_edit(indata, content, force=session["force"])
768 _id = content.get("_id") or _id
769
770 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
771 op_id = self.format_on_edit(content, indata)
772 op_id = ACMTopic.format_on_operation(
773 content,
774 "scale",
775 indata,
776 )
777 self.logger.info(f"op_id: {op_id}")
778 content["operatingState"] = "PROCESSING"
779 content["resourceState"] = "IN_PROGRESS"
780 self.db.replace(self.topic, _id, content)
781 self._send_msg(
782 "scale_nodegroup", {"nodegroup_id": _id, "operation_id": op_id}
783 )
784 return op_id
785 except ValidationError as e:
786 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
787
788 def edit(self, session, _id, indata, kwargs):
789 content = None
790
791 # Override descriptor with query string kwargs
792 if kwargs:
793 self._update_input_with_kwargs(indata, kwargs)
794 try:
795 if indata and session.get("set_project"):
796 raise EngineException(
797 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
798 HTTPStatus.UNPROCESSABLE_ENTITY,
799 )
800 # TODO self._check_edition(session, indata, _id, force)
801 if not content:
802 content = self.db.get_one(self.topic, {"_id": _id})
803
804 indata = self._validate_input_edit(indata, content, force=session["force"])
805 self.logger.info(f"Indata: {indata}")
806
807 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
808 _id = content.get("_id") or _id
809
810 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
811 if "name" in indata and "description" in indata:
812 content["name"] = indata["name"]
813 content["description"] = indata["description"]
814 elif "name" in indata:
815 content["name"] = indata["name"]
816 elif "description" in indata:
817 content["description"] = indata["description"]
818 op_id = self.format_on_edit(content, indata)
819 self.db.set_one(self.topic, {"_id": _id}, content)
820 return op_id
821 except ValidationError as e:
822 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
823
824
shrinithi28d887f2025-01-08 05:27:19 +0000825class ClusterOpsTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000826 topic = "clusters"
827 topic_msg = "cluster"
garciadeblasf30e33d2025-09-15 15:42:15 +0200828 schema_new = cluster_registration_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000829
830 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000831 super().__init__(db, fs, msg, auth)
garciadeblas3d5dc322025-04-03 23:57:04 +0200832 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
833 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
834 self.resource_topic = ResourceTopic(db, fs, msg, auth)
garciadeblasb798f452025-08-05 18:21:26 +0200835 self.app_topic = AppProfileTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000836
garciadeblasbecc7052024-11-20 12:04:53 +0100837 @staticmethod
838 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000839 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100840 content["current_operation"] = None
841
rshri2d386cb2024-07-05 14:35:51 +0000842 def add(self, rollback, session, indata, kwargs=None, headers=None):
843 step = "checking quotas"
844 try:
845 self.check_quota(session)
846 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000847 self.cluster_unique_name_check(session, indata["name"])
848 # self.check_unique_name(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000849 step = "validating input parameters"
850 cls_add_request = self._remove_envelop(indata)
851 self._update_input_with_kwargs(cls_add_request, kwargs)
852 cls_add_request = self._validate_input_new(
853 cls_add_request, session["force"]
854 )
855 operation_params = cls_add_request
856
857 step = "filling cluster details from input data"
garciadeblas3d5dc322025-04-03 23:57:04 +0200858 cls_add_request = self._add_cluster(
859 cls_add_request, rollback, session, indata, kwargs, headers
860 )
rshri2d386cb2024-07-05 14:35:51 +0000861
rshri17b09ec2024-11-07 05:48:12 +0000862 step = "registering the cluster at database"
rshri2d386cb2024-07-05 14:35:51 +0000863 self.format_on_new(
rshri17b09ec2024-11-07 05:48:12 +0000864 cls_add_request, session["project_id"], make_public=session["public"]
rshri2d386cb2024-07-05 14:35:51 +0000865 )
rshri2d386cb2024-07-05 14:35:51 +0000866 op_id = self.format_on_operation(
rshri17b09ec2024-11-07 05:48:12 +0000867 cls_add_request,
rshri2d386cb2024-07-05 14:35:51 +0000868 "register",
869 operation_params,
870 )
rshri17b09ec2024-11-07 05:48:12 +0000871 _id = self.db.create(self.topic, cls_add_request)
garciadeblas9d9d9262024-09-25 11:25:33 +0200872 pubkey, privkey = self._generate_age_key()
rshri17b09ec2024-11-07 05:48:12 +0000873 cls_add_request["age_pubkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200874 pubkey, schema_version="1.11", salt=_id
875 )
rshri17b09ec2024-11-07 05:48:12 +0000876 cls_add_request["age_privkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200877 privkey, schema_version="1.11", salt=_id
878 )
879 # TODO: set age_pubkey and age_privkey in the default profiles
rshri17b09ec2024-11-07 05:48:12 +0000880 self.db.set_one(self.topic, {"_id": _id}, cls_add_request)
rshri2d386cb2024-07-05 14:35:51 +0000881 rollback.append({"topic": self.topic, "_id": _id})
882 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
rshri50e34dc2024-12-02 03:10:39 +0000883
884 # To add the content in old collection "k8sclusters"
885 self.add_to_old_collection(cls_add_request, session)
886
rshri2d386cb2024-07-05 14:35:51 +0000887 return _id, None
888 except (
889 ValidationError,
890 EngineException,
891 DbException,
892 MsgException,
893 FsException,
894 ) as e:
895 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
896
garciadeblas3d5dc322025-04-03 23:57:04 +0200897 def _add_cluster(self, cls_add_request, rollback, session, indata, kwargs, headers):
rshri2d386cb2024-07-05 14:35:51 +0000898 cls_add = {
899 "name": cls_add_request["name"],
rshri2d386cb2024-07-05 14:35:51 +0000900 "credentials": cls_add_request["credentials"],
901 "vim_account": cls_add_request["vim_account"],
rshri17b09ec2024-11-07 05:48:12 +0000902 "bootstrap": cls_add_request["bootstrap"],
garciadeblas616d4c72025-08-05 09:15:33 +0200903 "openshift": cls_add_request.get("openshift", False),
garciadeblas3d5dc322025-04-03 23:57:04 +0200904 "infra_controller_profiles": [
905 self._create_default_profiles(
906 rollback, session, indata, kwargs, headers, self.infra_contr_topic
907 )
908 ],
909 "infra_config_profiles": [
910 self._create_default_profiles(
911 rollback, session, indata, kwargs, headers, self.infra_conf_topic
912 )
913 ],
914 "resource_profiles": [
915 self._create_default_profiles(
916 rollback, session, indata, kwargs, headers, self.resource_topic
917 )
918 ],
919 "app_profiles": [
920 self._create_default_profiles(
921 rollback, session, indata, kwargs, headers, self.app_topic
922 )
923 ],
rshri2d386cb2024-07-05 14:35:51 +0000924 "created": "false",
925 "state": "IN_CREATION",
926 "operatingState": "PROCESSING",
927 "git_name": self.create_gitname(cls_add_request, session),
928 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
929 }
rshri17b09ec2024-11-07 05:48:12 +0000930 # Add optional fields if they exist in the request
931 if "description" in cls_add_request:
932 cls_add["description"] = cls_add_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000933 return cls_add
934
garciadeblas3d5dc322025-04-03 23:57:04 +0200935 def check_vim(self, session, name):
936 try:
937 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
938 if vim_account_details is not None:
939 return name
940 except ValidationError as e:
941 raise EngineException(
942 e,
943 HTTPStatus.UNPROCESSABLE_ENTITY,
944 )
945
946 def _create_default_profiles(
947 self, rollback, session, indata, kwargs, headers, topic
948 ):
949 topic = self.to_select_topic(topic)
950 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
951 return default_profiles
952
953 def to_select_topic(self, topic):
954 if topic == "infra_controller_profiles":
955 topic = self.infra_contr_topic
956 elif topic == "infra_config_profiles":
957 topic = self.infra_conf_topic
958 elif topic == "resource_profiles":
959 topic = self.resource_topic
960 elif topic == "app_profiles":
961 topic = self.app_topic
962 return topic
963
rshri2d386cb2024-07-05 14:35:51 +0000964 def remove(self, session, _id, dry_run=False, not_send_msg=None):
965 """
966 Delete item by its internal _id
967 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
968 :param _id: server internal id
969 :param dry_run: make checking but do not delete
970 :param not_send_msg: To not send message (False) or store content (list) instead
971 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
972 """
973
974 # To allow addressing projects and users by name AS WELL AS by _id
975 if not self.multiproject:
976 filter_q = {}
977 else:
978 filter_q = self._get_project_filter(session)
979 filter_q[self.id_field(self.topic, _id)] = _id
980 item_content = self.db.get_one(self.topic, filter_q)
981
rshri2d386cb2024-07-05 14:35:51 +0000982 op_id = self.format_on_operation(
983 item_content,
984 "deregister",
985 None,
986 )
987 self.db.set_one(self.topic, {"_id": _id}, item_content)
988
989 self.check_conflict_on_del(session, _id, item_content)
990 if dry_run:
991 return None
992
993 if self.multiproject and session["project_id"]:
994 # remove reference from project_read if there are more projects referencing it. If it last one,
995 # do not remove reference, but delete
996 other_projects_referencing = next(
997 (
998 p
999 for p in item_content["_admin"]["projects_read"]
1000 if p not in session["project_id"] and p != "ANY"
1001 ),
1002 None,
1003 )
1004
1005 # check if there are projects referencing it (apart from ANY, that means, public)....
1006 if other_projects_referencing:
1007 # remove references but not delete
1008 update_dict_pull = {
1009 "_admin.projects_read": session["project_id"],
1010 "_admin.projects_write": session["project_id"],
1011 }
1012 self.db.set_one(
1013 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
1014 )
1015 return None
1016 else:
1017 can_write = next(
1018 (
1019 p
1020 for p in item_content["_admin"]["projects_write"]
1021 if p == "ANY" or p in session["project_id"]
1022 ),
1023 None,
1024 )
1025 if not can_write:
1026 raise EngineException(
1027 "You have not write permission to delete it",
1028 http_code=HTTPStatus.UNAUTHORIZED,
1029 )
1030
1031 # delete
1032 self._send_msg(
1033 "deregister",
1034 {"cluster_id": _id, "operation_id": op_id},
1035 not_send_msg=not_send_msg,
1036 )
shrinithi75492bd2025-03-21 18:37:44 +00001037 return _id
yshah53cc9eb2024-07-05 13:06:31 +00001038
1039
shrinithi28d887f2025-01-08 05:27:19 +00001040class KsusTopic(ACMTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001041 topic = "ksus"
1042 okapkg_topic = "okas"
yshah53cc9eb2024-07-05 13:06:31 +00001043 topic_msg = "ksu"
1044 schema_new = ksu_schema
1045 schema_edit = ksu_schema
yshahd23c6a52025-06-13 05:49:31 +00001046 MAP_PROFILE = {
1047 "infra_controller_profiles": "infra-controllers",
1048 "infra_config_profiles": "infra-configs",
1049 "resource_profiles": "managed_resources",
1050 "app_profiles": "apps",
1051 }
yshah53cc9eb2024-07-05 13:06:31 +00001052
1053 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +00001054 super().__init__(db, fs, msg, auth)
yshah53cc9eb2024-07-05 13:06:31 +00001055 self.logger = logging.getLogger("nbi.ksus")
1056
1057 @staticmethod
1058 def format_on_new(content, project_id=None, make_public=False):
1059 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +01001060 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001061 content["state"] = "IN_CREATION"
1062 content["operatingState"] = "PROCESSING"
1063 content["resourceState"] = "IN_PROGRESS"
1064
1065 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1066 _id_list = []
yshahf8f07632025-01-17 04:46:03 +00001067 for content in indata["ksus"]:
garciadeblas1a980602025-12-03 08:58:42 +01001068 check_dict = {content["profile"]["profile_type"]: content["profile"]["_id"]}
1069 check_dict["okas"] = []
yshah53cc9eb2024-07-05 13:06:31 +00001070
1071 for okas in content["oka"]:
garciadeblas1a980602025-12-03 08:58:42 +01001072 if "_id" in okas and "sw_catalog_path" in okas:
yshah53cc9eb2024-07-05 13:06:31 +00001073 raise EngineException(
1074 "Cannot create ksu with both OKA and SW catalog path",
1075 HTTPStatus.UNPROCESSABLE_ENTITY,
1076 )
garciadeblas1a980602025-12-03 08:58:42 +01001077 elif "_id" not in okas and "sw_catalog_path" not in okas:
yshah53cc9eb2024-07-05 13:06:31 +00001078 raise EngineException(
garciadeblas1a980602025-12-03 08:58:42 +01001079 "Cannot create ksu. Either oka id or SW catalog path is required for all OKA in a KSU",
yshah53cc9eb2024-07-05 13:06:31 +00001080 HTTPStatus.UNPROCESSABLE_ENTITY,
1081 )
garciadeblas1a980602025-12-03 08:58:42 +01001082 elif "_id" in okas:
1083 check_dict["okas"].append(okas["_id"])
1084 self.check_dependency(check_dict)
yshah53cc9eb2024-07-05 13:06:31 +00001085
1086 # Override descriptor with query string kwargs
1087 content = self._remove_envelop(content)
1088 self._update_input_with_kwargs(content, kwargs)
1089 content = self._validate_input_new(input=content, force=session["force"])
1090
1091 # Check for unique name
1092 self.check_unique_name(session, content["name"])
1093
1094 self.check_conflict_on_new(session, content)
1095
1096 operation_params = {}
1097 for content_key, content_value in content.items():
1098 operation_params[content_key] = content_value
1099 self.format_on_new(
1100 content, project_id=session["project_id"], make_public=session["public"]
1101 )
yshah53cc9eb2024-07-05 13:06:31 +00001102 op_id = self.format_on_operation(
1103 content,
1104 operation_type="create",
1105 operation_params=operation_params,
1106 )
1107 content["git_name"] = self.create_gitname(content, session)
1108
1109 # Update Oka_package usage state
1110 for okas in content["oka"]:
1111 if "_id" in okas.keys():
1112 self.update_usage_state(session, okas)
1113
yshahd23c6a52025-06-13 05:49:31 +00001114 profile_id = content["profile"].get("_id")
1115 profile_type = content["profile"].get("profile_type")
1116 db_cluster_list = self.db.get_list("clusters")
1117 for db_cluster in db_cluster_list:
1118 if db_cluster.get("created") == "true":
1119 profile_list = db_cluster[profile_type]
1120 if profile_id in profile_list:
1121 ksu_count = db_cluster.get("ksu_count")
1122 new_ksu_count = ksu_count + 1
1123 self.logger.info(f"New KSU count: {new_ksu_count}")
1124 db_cluster["ksu_count"] = new_ksu_count
1125 self.db.set_one(
1126 "clusters", {"_id": db_cluster["_id"]}, db_cluster
1127 )
1128
yshah53cc9eb2024-07-05 13:06:31 +00001129 _id = self.db.create(self.topic, content)
1130 rollback.append({"topic": self.topic, "_id": _id})
yshah53cc9eb2024-07-05 13:06:31 +00001131 _id_list.append(_id)
1132 data = {"ksus_list": _id_list, "operation_id": op_id}
1133 self._send_msg("create", data)
1134 return _id_list, op_id
1135
1136 def clone(self, rollback, session, _id, indata, kwargs, headers):
garciadeblas1a980602025-12-03 08:58:42 +01001137 check_dict = {
yshahf8f07632025-01-17 04:46:03 +00001138 "ksu": _id,
1139 indata["profile"]["profile_type"]: indata["profile"]["_id"],
1140 }
garciadeblas1a980602025-12-03 08:58:42 +01001141 self.check_dependency(check_dict)
yshah53cc9eb2024-07-05 13:06:31 +00001142 filter_db = self._get_project_filter(session)
1143 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1144 data = self.db.get_one(self.topic, filter_db)
1145
yshah53cc9eb2024-07-05 13:06:31 +00001146 op_id = self.format_on_operation(
1147 data,
1148 "clone",
1149 indata,
1150 )
1151 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
1152 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
1153 return op_id
1154
1155 def update_usage_state(self, session, oka_content):
1156 _id = oka_content["_id"]
1157 filter_db = self._get_project_filter(session)
1158 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1159
1160 data = self.db.get_one(self.okapkg_topic, filter_db)
1161 if data["_admin"]["usageState"] == "NOT_IN_USE":
1162 usage_state_update = {
1163 "_admin.usageState": "IN_USE",
1164 }
1165 self.db.set_one(
1166 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
1167 )
1168
1169 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
garciadeblas1a980602025-12-03 08:58:42 +01001170 check_dict = {
yshahf8f07632025-01-17 04:46:03 +00001171 "ksu": _id,
1172 indata["profile"]["profile_type"]: indata["profile"]["_id"],
1173 }
garciadeblas1a980602025-12-03 08:58:42 +01001174 self.check_dependency(check_dict)
yshah53cc9eb2024-07-05 13:06:31 +00001175 indata = self._remove_envelop(indata)
1176
1177 # Override descriptor with query string kwargs
1178 if kwargs:
1179 self._update_input_with_kwargs(indata, kwargs)
1180 try:
1181 if indata and session.get("set_project"):
1182 raise EngineException(
1183 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1184 HTTPStatus.UNPROCESSABLE_ENTITY,
1185 )
1186 # TODO self._check_edition(session, indata, _id, force)
1187 if not content:
1188 content = self.show(session, _id)
1189 indata = self._validate_input_edit(
1190 input=indata, content=content, force=session["force"]
1191 )
1192 operation_params = indata
1193 deep_update_rfc7396(content, indata)
1194
1195 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1196 _id = content.get("_id") or _id
yshahd0c876f2024-11-11 09:24:48 +00001197 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001198 content,
1199 "move",
1200 operation_params,
1201 )
1202 if content.get("_admin"):
1203 now = time()
1204 content["_admin"]["modified"] = now
1205 content["operatingState"] = "PROCESSING"
1206 content["resourceState"] = "IN_PROGRESS"
1207
1208 self.db.replace(self.topic, _id, content)
yshah53cc9eb2024-07-05 13:06:31 +00001209 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
1210 self._send_msg("move", data)
1211 return op_id
1212 except ValidationError as e:
1213 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1214
1215 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1216 if final_content["name"] != edit_content["name"]:
1217 self.check_unique_name(session, edit_content["name"])
1218 return final_content
1219
1220 @staticmethod
1221 def format_on_edit(final_content, edit_content):
shrinithi28d887f2025-01-08 05:27:19 +00001222 op_id = ACMTopic.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001223 final_content,
1224 "update",
1225 edit_content,
1226 )
1227 final_content["operatingState"] = "PROCESSING"
1228 final_content["resourceState"] = "IN_PROGRESS"
1229 if final_content.get("_admin"):
1230 now = time()
1231 final_content["_admin"]["modified"] = now
yshahd0c876f2024-11-11 09:24:48 +00001232 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001233
1234 def edit(self, session, _id, indata, kwargs):
1235 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001236 if _id == "update":
1237 for ksus in indata["ksus"]:
1238 content = ksus
1239 _id = content["_id"]
1240 _id_list.append(_id)
1241 content.pop("_id")
yshahd0c876f2024-11-11 09:24:48 +00001242 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001243 else:
1244 content = indata
1245 _id_list.append(_id)
yshahd0c876f2024-11-11 09:24:48 +00001246 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001247
1248 data = {"ksus_list": _id_list, "operation_id": op_id}
1249 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +00001250
yshahd23c6a52025-06-13 05:49:31 +00001251 def cluster_list_ksu(self, session, filter_q=None, api_req=None):
1252 db_filter = {}
1253 if filter_q.get("cluster_id"):
1254 db_filter["_id"] = filter_q.get("cluster_id")
1255 ksu_data_list = []
1256
1257 cluster_data = self.db.get_one("clusters", db_filter)
1258 profiles_list = [
1259 "infra_controller_profiles",
1260 "infra_config_profiles",
1261 "app_profiles",
1262 "resource_profiles",
1263 ]
1264 for profile in profiles_list:
1265 data_list = []
1266 for profile_id in cluster_data[profile]:
1267 filter_q = {"profile": {"_id": profile_id, "profile_type": profile}}
1268 data_list = self.db.get_list(self.topic, filter_q)
1269 for ksu_data in data_list:
1270 ksu_data["package_name"] = []
1271 ksu_data["package_path"] = []
1272 for okas in ksu_data["operationHistory"][0]["operationParams"]["oka"]:
1273 sw_catalog_path = okas.get("sw_catalog_path")
1274 if sw_catalog_path:
1275 parts = sw_catalog_path.rsplit("/", 2)
1276 self.logger.info(f"Parts: {parts}")
1277 ksu_data["package_name"].append(parts[-2])
1278 ksu_data["package_path"].append("/".join(parts[:-1]))
1279 else:
1280 oka_id = okas["_id"]
1281 db_oka = self.db.get_one("okas", {"_id": oka_id})
1282 oka_type = self.MAP_PROFILE[
1283 db_oka.get("profile_type", "infra_controller_profiles")
1284 ]
1285 ksu_data["package_name"].append(db_oka["git_name"].lower())
1286 ksu_data["package_path"].append(
1287 f"{oka_type}/{db_oka['git_name'].lower()}"
1288 )
1289 ksu_data_list.append(ksu_data)
1290
1291 outdata = {}
1292 outdata["count"] = cluster_data["ksu_count"]
1293 outdata["data"] = ksu_data_list
1294 self.logger.info(f"Outdata: {outdata}")
1295 return outdata
1296
yshahd0c876f2024-11-11 09:24:48 +00001297 def edit_ksu(self, session, _id, indata, kwargs):
garciadeblas1a980602025-12-03 08:58:42 +01001298 check_dict = {
yshahf8f07632025-01-17 04:46:03 +00001299 "ksu": _id,
1300 }
1301 if indata.get("profile"):
garciadeblas1a980602025-12-03 08:58:42 +01001302 check_dict[indata["profile"]["profile_type"]] = indata["profile"]["_id"]
yshahf8f07632025-01-17 04:46:03 +00001303 if indata.get("oka"):
garciadeblas1a980602025-12-03 08:58:42 +01001304 check_dict["okas"] = []
yshahf8f07632025-01-17 04:46:03 +00001305 for oka in indata["oka"]:
1306 if oka.get("_id") is not None:
garciadeblas1a980602025-12-03 08:58:42 +01001307 check_dict["okas"].append(oka["_id"])
yshahf8f07632025-01-17 04:46:03 +00001308
garciadeblas1a980602025-12-03 08:58:42 +01001309 self.check_dependency(check_dict)
yshah53cc9eb2024-07-05 13:06:31 +00001310 content = None
1311 indata = self._remove_envelop(indata)
1312
1313 # Override descriptor with query string kwargs
1314 if kwargs:
1315 self._update_input_with_kwargs(indata, kwargs)
1316 try:
1317 if indata and session.get("set_project"):
1318 raise EngineException(
1319 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1320 HTTPStatus.UNPROCESSABLE_ENTITY,
1321 )
1322 # TODO self._check_edition(session, indata, _id, force)
1323 if not content:
1324 content = self.show(session, _id)
1325
1326 for okas in indata["oka"]:
1327 if not okas["_id"]:
1328 okas.pop("_id")
1329 if not okas["sw_catalog_path"]:
1330 okas.pop("sw_catalog_path")
1331
1332 indata = self._validate_input_edit(indata, content, force=session["force"])
1333
1334 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1335 _id = content.get("_id") or _id
1336
1337 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
yshah53cc9eb2024-07-05 13:06:31 +00001338 op_id = self.format_on_edit(content, indata)
1339 self.db.replace(self.topic, _id, content)
1340 return op_id
1341 except ValidationError as e:
1342 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1343
1344 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
1345 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001346 if _id == "delete":
1347 for ksus in indata["ksus"]:
1348 content = ksus
1349 _id = content["_id"]
yshah53cc9eb2024-07-05 13:06:31 +00001350 content.pop("_id")
garciadeblasac285872024-12-05 12:21:09 +01001351 op_id, not_send_msg_ksu = self.delete(session, _id)
1352 if not not_send_msg_ksu:
1353 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001354 else:
garciadeblasac285872024-12-05 12:21:09 +01001355 op_id, not_send_msg_ksu = self.delete(session, _id)
1356 if not not_send_msg_ksu:
1357 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001358
garciadeblasac285872024-12-05 12:21:09 +01001359 if _id_list:
yshah781ce732025-02-28 08:56:13 +00001360 data = {
1361 "ksus_list": _id_list,
1362 "operation_id": op_id,
1363 "force": session["force"],
1364 }
garciadeblasac285872024-12-05 12:21:09 +01001365 self._send_msg("delete", data, not_send_msg)
yshah53cc9eb2024-07-05 13:06:31 +00001366 return op_id
1367
yshahd0c876f2024-11-11 09:24:48 +00001368 def delete(self, session, _id):
yshah53cc9eb2024-07-05 13:06:31 +00001369 if not self.multiproject:
1370 filter_q = {}
1371 else:
1372 filter_q = self._get_project_filter(session)
1373 filter_q[self.id_field(self.topic, _id)] = _id
1374 item_content = self.db.get_one(self.topic, filter_q)
yshahf8f07632025-01-17 04:46:03 +00001375
1376 check = {
1377 "ksu": _id,
1378 item_content["profile"]["profile_type"]: item_content["profile"]["_id"],
1379 }
1380 self.check_dependency(check, operation_type="delete")
1381
yshah53cc9eb2024-07-05 13:06:31 +00001382 item_content["state"] = "IN_DELETION"
1383 item_content["operatingState"] = "PROCESSING"
1384 item_content["resourceState"] = "IN_PROGRESS"
yshahd0c876f2024-11-11 09:24:48 +00001385 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001386 item_content,
1387 "delete",
1388 None,
1389 )
1390 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1391
garciadeblasac285872024-12-05 12:21:09 +01001392 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1393 not_send_msg = None
1394 profile_id = item_content["profile"]["_id"]
1395 profile_type = item_content["profile"]["profile_type"]
1396 profile_collection_map = {
1397 "app_profiles": "k8sapp",
1398 "resource_profiles": "k8sresource",
1399 "infra_controller_profiles": "k8sinfra_controller",
1400 "infra_config_profiles": "k8sinfra_config",
1401 }
1402 profile_collection = profile_collection_map[profile_type]
1403 profile_content = self.db.get_one(
1404 profile_collection, {"_id": profile_id}, fail_on_empty=False
1405 )
1406 if not profile_content:
1407 self.db.del_one(self.topic, filter_q)
1408 not_send_msg = True
1409 return op_id, not_send_msg
yshah53cc9eb2024-07-05 13:06:31 +00001410
1411
garciadeblasb798f452025-08-05 18:21:26 +02001412class AppInstanceTopic(ACMTopic):
1413 topic = "appinstances"
1414 okapkg_topic = "okas"
1415 topic_msg = "appinstance"
1416 schema_new = app_instance_schema
garciadeblased401462026-01-16 02:51:15 +01001417 schema_edit = app_instance_edit_schema
garciadeblasb798f452025-08-05 18:21:26 +02001418
1419 def __init__(self, db, fs, msg, auth):
1420 super().__init__(db, fs, msg, auth)
1421 self.logger = logging.getLogger("nbi.appinstances")
1422
1423 @staticmethod
1424 def format_on_new(content, project_id=None, make_public=False):
1425 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
1426 content["current_operation"] = None
1427 content["state"] = "IN_CREATION"
1428 content["operatingState"] = "PROCESSING"
1429 content["resourceState"] = "IN_PROGRESS"
1430
1431 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1432 if indata.get("oka") and indata.get("sw_catalog_path"):
1433 raise EngineException(
1434 "Cannot create app instance with both OKA and SW catalog path",
1435 HTTPStatus.UNPROCESSABLE_ENTITY,
1436 )
1437
1438 # Override descriptor with query string kwargs
1439 content = self._remove_envelop(indata)
1440 self._update_input_with_kwargs(content, kwargs)
1441 content = self._validate_input_new(input=content, force=session["force"])
1442
1443 # Check for unique name
1444 self.check_unique_name(session, content["name"])
1445
1446 self.check_conflict_on_new(session, content)
1447
1448 operation_params = {}
1449 for content_key, content_value in content.items():
1450 operation_params[content_key] = content_value
1451 self.format_on_new(
1452 content, project_id=session["project_id"], make_public=session["public"]
1453 )
1454 op_id = self.format_on_operation(
1455 content,
1456 operation_type="create",
1457 operation_params=operation_params,
1458 )
1459 content["git_name"] = self.create_gitname(content, session)
1460
1461 oka_id = content.get("oka")
1462 if oka_id:
1463 self.update_oka_usage_state(session, oka_id)
1464
1465 _id = self.db.create(self.topic, content)
1466 rollback.append({"topic": self.topic, "_id": _id})
garciadeblasbe6498e2025-11-05 11:55:37 +01001467 self._send_msg("create", {"appinstance": _id, "operation_id": op_id})
garciadeblasb798f452025-08-05 18:21:26 +02001468 return _id, op_id
1469
1470 def update_oka_usage_state(self, session, oka_id):
1471 filter_db = self._get_project_filter(session)
1472 filter_db[BaseTopic.id_field(self.topic, oka_id)] = oka_id
1473
1474 data = self.db.get_one(self.okapkg_topic, filter_db)
1475 if data["_admin"]["usageState"] == "NOT_IN_USE":
1476 usage_state_update = {
1477 "_admin.usageState": "IN_USE",
1478 }
1479 self.db.set_one(
1480 self.okapkg_topic, {"_id": oka_id}, update_dict=usage_state_update
1481 )
1482
1483 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1484 if final_content["name"] != edit_content["name"]:
1485 self.check_unique_name(session, edit_content["name"])
1486 return final_content
1487
1488 @staticmethod
1489 def format_on_edit(final_content, edit_content):
1490 op_id = ACMTopic.format_on_operation(
1491 final_content,
1492 "update",
1493 edit_content,
1494 )
1495 final_content["operatingState"] = "PROCESSING"
1496 final_content["resourceState"] = "IN_PROGRESS"
1497 if final_content.get("_admin"):
1498 now = time()
1499 final_content["_admin"]["modified"] = now
1500 return op_id
1501
1502 def edit(self, session, _id, indata, kwargs):
1503 content = None
1504 indata = self._remove_envelop(indata)
1505
1506 # Override descriptor with query string kwargs
1507 if kwargs:
1508 self._update_input_with_kwargs(indata, kwargs)
1509 try:
1510 if indata and session.get("set_project"):
1511 raise EngineException(
1512 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1513 HTTPStatus.UNPROCESSABLE_ENTITY,
1514 )
1515 # TODO self._check_edition(session, indata, _id, force)
1516 if not content:
1517 content = self.show(session, _id)
1518
1519 indata = self._validate_input_edit(indata, content, force=session["force"])
1520
1521 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1522 _id = content.get("_id") or _id
1523
1524 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1525 op_id = self.format_on_edit(content, indata)
1526 self.db.replace(self.topic, _id, content)
1527 return op_id
1528 except ValidationError as e:
1529 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1530
garciadeblased401462026-01-16 02:51:15 +01001531 def update_item(self, session, _id, item, indata):
garciadeblasb798f452025-08-05 18:21:26 +02001532 if not self.multiproject:
1533 filter_db = {}
1534 else:
1535 filter_db = self._get_project_filter(session)
1536 # To allow project&user addressing by name AS WELL AS _id
1537 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
garciadeblased401462026-01-16 02:51:15 +01001538 self.logger.info(f"Item: {item}")
1539 self.logger.info(f"Indata before validation: {indata}")
1540 validate_input(indata, app_instance_update_schema)
1541 self.logger.info(f"Indata after validation: {indata}")
garciadeblasb798f452025-08-05 18:21:26 +02001542 data = self.db.get_one(self.topic, filter_db)
1543 operation_params = {}
1544 data["operatingState"] = "PROCESSING"
1545 data["resourceState"] = "IN_PROGRESS"
1546 operation_params = indata
garciadeblased401462026-01-16 02:51:15 +01001547 self.logger.info(f"Operation params: {operation_params}")
garciadeblasb798f452025-08-05 18:21:26 +02001548 op_id = self.format_on_operation(
1549 data,
garciadeblased401462026-01-16 02:51:15 +01001550 "update",
garciadeblasb798f452025-08-05 18:21:26 +02001551 operation_params,
1552 )
1553 self.db.set_one(self.topic, {"_id": _id}, data)
garciadeblased401462026-01-16 02:51:15 +01001554 self._send_msg("update", {"appinstance": _id, "operation_id": op_id})
garciadeblasb798f452025-08-05 18:21:26 +02001555 return op_id
1556
1557 def delete(self, session, _id, not_send_msg=None):
1558 if not self.multiproject:
1559 filter_q = {}
1560 else:
1561 filter_q = self._get_project_filter(session)
1562 filter_q[self.id_field(self.topic, _id)] = _id
1563 item_content = self.db.get_one(self.topic, filter_q)
1564 item_content["state"] = "IN_DELETION"
1565 item_content["operatingState"] = "PROCESSING"
1566 item_content["resourceState"] = "IN_PROGRESS"
1567 op_id = self.format_on_operation(
1568 item_content,
1569 "delete",
1570 None,
1571 )
1572 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1573
1574 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1575 not_send_msg2 = not_send_msg
garciadeblas47f10fa2025-12-17 09:54:17 +01001576 profile_id = item_content["profile"]
1577 profile_type = item_content["profile_type"]
garciadeblasb798f452025-08-05 18:21:26 +02001578 profile_collection_map = {
1579 "app_profiles": "k8sapp",
1580 "resource_profiles": "k8sresource",
1581 "infra_controller_profiles": "k8sinfra_controller",
1582 "infra_config_profiles": "k8sinfra_config",
1583 }
1584 profile_collection = profile_collection_map[profile_type]
1585 profile_content = self.db.get_one(
1586 profile_collection, {"_id": profile_id}, fail_on_empty=False
1587 )
1588 if not profile_content:
1589 self.db.del_one(self.topic, filter_q)
1590 not_send_msg2 = True
1591 self._send_msg(
1592 "delete",
1593 {"appinstance": _id, "operation_id": op_id, "force": session["force"]},
1594 not_send_msg=not_send_msg2,
1595 )
1596 return op_id
1597
1598
shrinithi28d887f2025-01-08 05:27:19 +00001599class OkaTopic(DescriptorTopic, ACMOperationTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001600 topic = "okas"
1601 topic_msg = "oka"
1602 schema_new = oka_schema
1603 schema_edit = oka_schema
1604
1605 def __init__(self, db, fs, msg, auth):
1606 super().__init__(db, fs, msg, auth)
1607 self.logger = logging.getLogger("nbi.oka")
1608
1609 @staticmethod
1610 def format_on_new(content, project_id=None, make_public=False):
1611 DescriptorTopic.format_on_new(
1612 content, project_id=project_id, make_public=make_public
1613 )
garciadeblasbecc7052024-11-20 12:04:53 +01001614 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001615 content["state"] = "PENDING_CONTENT"
1616 content["operatingState"] = "PROCESSING"
1617 content["resourceState"] = "IN_PROGRESS"
1618
1619 def check_conflict_on_del(self, session, _id, db_content):
1620 usage_state = db_content["_admin"]["usageState"]
1621 if usage_state == "IN_USE":
1622 raise EngineException(
1623 "There is a KSU using this package",
1624 http_code=HTTPStatus.CONFLICT,
1625 )
1626
1627 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
yshah00cfe8b2025-01-17 04:05:45 +00001628 if "name" in edit_content:
1629 if final_content["name"] == edit_content["name"]:
1630 name = edit_content["name"]
1631 raise EngineException(
1632 f"No update, new name for the OKA is the same: {name}",
1633 http_code=HTTPStatus.CONFLICT,
1634 )
1635 else:
1636 self.check_unique_name(session, edit_content["name"])
1637 elif (
1638 "description" in edit_content
yshah53cc9eb2024-07-05 13:06:31 +00001639 and final_content["description"] == edit_content["description"]
1640 ):
yshah00cfe8b2025-01-17 04:05:45 +00001641 description = edit_content["description"]
yshah53cc9eb2024-07-05 13:06:31 +00001642 raise EngineException(
yshah00cfe8b2025-01-17 04:05:45 +00001643 f"No update, new description for the OKA is the same: {description}",
yshah53cc9eb2024-07-05 13:06:31 +00001644 http_code=HTTPStatus.CONFLICT,
1645 )
yshah53cc9eb2024-07-05 13:06:31 +00001646 return final_content
1647
1648 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1649 indata = self._remove_envelop(indata)
1650
1651 # Override descriptor with query string kwargs
1652 if kwargs:
1653 self._update_input_with_kwargs(indata, kwargs)
1654 try:
1655 if indata and session.get("set_project"):
1656 raise EngineException(
1657 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1658 HTTPStatus.UNPROCESSABLE_ENTITY,
1659 )
1660 # TODO self._check_edition(session, indata, _id, force)
1661 if not content:
1662 content = self.show(session, _id)
1663
1664 indata = self._validate_input_edit(indata, content, force=session["force"])
1665
1666 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1667 _id = content.get("_id") or _id
1668
1669 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1670 op_id = self.format_on_edit(content, indata)
1671 deep_update_rfc7396(content, indata)
1672
1673 self.db.replace(self.topic, _id, content)
1674 return op_id
1675 except ValidationError as e:
1676 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1677
1678 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +00001679 check = {"oka": _id}
1680 self.check_dependency(check, operation_type="delete")
yshah53cc9eb2024-07-05 13:06:31 +00001681 if not self.multiproject:
1682 filter_q = {}
1683 else:
1684 filter_q = self._get_project_filter(session)
1685 filter_q[self.id_field(self.topic, _id)] = _id
1686 item_content = self.db.get_one(self.topic, filter_q)
1687 item_content["state"] = "IN_DELETION"
1688 item_content["operatingState"] = "PROCESSING"
1689 self.check_conflict_on_del(session, _id, item_content)
yshahd0c876f2024-11-11 09:24:48 +00001690 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001691 item_content,
1692 "delete",
1693 None,
1694 )
yshah53cc9eb2024-07-05 13:06:31 +00001695 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1696 self._send_msg(
yshah781ce732025-02-28 08:56:13 +00001697 "delete",
1698 {"oka_id": _id, "operation_id": op_id, "force": session["force"]},
1699 not_send_msg=not_send_msg,
yshah53cc9eb2024-07-05 13:06:31 +00001700 )
yshahffcac5f2024-08-19 12:49:07 +00001701 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001702
1703 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1704 # _remove_envelop
1705 if indata:
1706 if "userDefinedData" in indata:
1707 indata = indata["userDefinedData"]
1708
1709 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1710
1711 self._update_input_with_kwargs(content, kwargs)
1712 content = BaseTopic._validate_input_new(
1713 self, input=kwargs, force=session["force"]
1714 )
1715
1716 self.check_unique_name(session, content["name"])
1717 operation_params = {}
1718 for content_key, content_value in content.items():
1719 operation_params[content_key] = content_value
1720 self.format_on_new(
1721 content, session["project_id"], make_public=session["public"]
1722 )
yshahd0c876f2024-11-11 09:24:48 +00001723 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001724 content,
1725 operation_type="create",
1726 operation_params=operation_params,
1727 )
1728 content["git_name"] = self.create_gitname(content, session)
1729 _id = self.db.create(self.topic, content)
1730 rollback.append({"topic": self.topic, "_id": _id})
yshahd0c876f2024-11-11 09:24:48 +00001731 return _id, op_id
yshah53cc9eb2024-07-05 13:06:31 +00001732
1733 def upload_content(self, session, _id, indata, kwargs, headers):
yshahf8f07632025-01-17 04:46:03 +00001734 if headers["Method"] in ("PUT", "PATCH"):
1735 check = {"oka": _id}
1736 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001737 current_desc = self.show(session, _id)
1738
1739 compressed = None
1740 content_type = headers.get("Content-Type")
1741 if (
1742 content_type
1743 and "application/gzip" in content_type
1744 or "application/x-gzip" in content_type
1745 ):
1746 compressed = "gzip"
1747 if content_type and "application/zip" in content_type:
1748 compressed = "zip"
1749 filename = headers.get("Content-Filename")
1750 if not filename and compressed:
1751 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1752 elif not filename:
1753 filename = "package"
1754
1755 revision = 1
1756 if "revision" in current_desc["_admin"]:
1757 revision = current_desc["_admin"]["revision"] + 1
1758
1759 file_pkg = None
1760 fs_rollback = []
1761
1762 try:
1763 start = 0
1764 # Rather than using a temp folder, we will store the package in a folder based on
1765 # the current revision.
1766 proposed_revision_path = _id + ":" + str(revision)
1767 # all the content is upload here and if ok, it is rename from id_ to is folder
1768
1769 if start:
1770 if not self.fs.file_exists(proposed_revision_path, "dir"):
1771 raise EngineException(
1772 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1773 )
1774 else:
1775 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1776 self.fs.mkdir(proposed_revision_path)
1777 fs_rollback.append(proposed_revision_path)
1778
1779 storage = self.fs.get_params()
1780 storage["folder"] = proposed_revision_path
yshah2c932bd2024-09-24 18:16:07 +00001781 storage["zipfile"] = filename
yshah53cc9eb2024-07-05 13:06:31 +00001782
1783 file_path = (proposed_revision_path, filename)
1784 file_pkg = self.fs.file_open(file_path, "a+b")
1785
yshah53cc9eb2024-07-05 13:06:31 +00001786 if isinstance(indata, dict):
1787 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1788 file_pkg.write(indata_text.encode(encoding="utf-8"))
1789 else:
1790 indata_len = 0
1791 indata = indata.file
1792 while True:
1793 indata_text = indata.read(4096)
1794 indata_len += len(indata_text)
1795 if not indata_text:
1796 break
1797 file_pkg.write(indata_text)
1798
yshah53cc9eb2024-07-05 13:06:31 +00001799 # Need to close the file package here so it can be copied from the
1800 # revision to the current, unrevisioned record
1801 if file_pkg:
1802 file_pkg.close()
1803 file_pkg = None
1804
1805 # Fetch both the incoming, proposed revision and the original revision so we
1806 # can call a validate method to compare them
1807 current_revision_path = _id + "/"
1808 self.fs.sync(from_path=current_revision_path)
1809 self.fs.sync(from_path=proposed_revision_path)
1810
garciadeblas807b8bf2024-09-23 13:03:00 +02001811 # Is this required?
yshah53cc9eb2024-07-05 13:06:31 +00001812 if revision > 1:
1813 try:
1814 self._validate_descriptor_changes(
1815 _id,
1816 filename,
1817 current_revision_path,
1818 proposed_revision_path,
1819 )
1820 except Exception as e:
1821 shutil.rmtree(
1822 self.fs.path + current_revision_path, ignore_errors=True
1823 )
1824 shutil.rmtree(
1825 self.fs.path + proposed_revision_path, ignore_errors=True
1826 )
1827 # Only delete the new revision. We need to keep the original version in place
1828 # as it has not been changed.
1829 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1830 raise e
1831
1832 indata = self._remove_envelop(indata)
1833
1834 # Override descriptor with query string kwargs
1835 if kwargs:
1836 self._update_input_with_kwargs(indata, kwargs)
1837
1838 current_desc["_admin"]["storage"] = storage
1839 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1840 current_desc["_admin"]["operationalState"] = "ENABLED"
1841 current_desc["_admin"]["modified"] = time()
1842 current_desc["_admin"]["revision"] = revision
1843
1844 deep_update_rfc7396(current_desc, indata)
1845
1846 # Copy the revision to the active package name by its original id
1847 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1848 os.rename(
1849 self.fs.path + proposed_revision_path,
1850 self.fs.path + current_revision_path,
1851 )
1852 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1853 self.fs.mkdir(current_revision_path)
1854 self.fs.reverse_sync(from_path=current_revision_path)
1855
1856 shutil.rmtree(self.fs.path + _id)
1857 kwargs = {}
1858 kwargs["package"] = filename
1859 if headers["Method"] == "POST":
1860 current_desc["state"] = "IN_CREATION"
garciadeblasbecc7052024-11-20 12:04:53 +01001861 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get(
1862 "op_id"
1863 )
yshah53cc9eb2024-07-05 13:06:31 +00001864 elif headers["Method"] in ("PUT", "PATCH"):
yshahd0c876f2024-11-11 09:24:48 +00001865 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001866 current_desc,
1867 "update",
1868 kwargs,
1869 )
1870 current_desc["operatingState"] = "PROCESSING"
1871 current_desc["resourceState"] = "IN_PROGRESS"
1872
1873 self.db.replace(self.topic, _id, current_desc)
1874
1875 # Store a copy of the package as a point in time revision
1876 revision_desc = dict(current_desc)
1877 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1878 self.db.create(self.topic + "_revisions", revision_desc)
1879 fs_rollback = []
1880
yshah53cc9eb2024-07-05 13:06:31 +00001881 if headers["Method"] == "POST":
1882 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1883 elif headers["Method"] == "PUT" or "PATCH":
1884 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1885
1886 return True
1887
1888 except EngineException:
1889 raise
1890 finally:
1891 if file_pkg:
1892 file_pkg.close()
1893 for file in fs_rollback:
1894 self.fs.file_delete(file, ignore_non_exist=True)