blob: a1d21b7b03030925ae1750d4733fb646464114d5 [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
yshah53cc9eb2024-07-05 13:06:31 +000018import shutil
19import os
rshri2d386cb2024-07-05 14:35:51 +000020from http import HTTPStatus
21
yshah53cc9eb2024-07-05 13:06:31 +000022from time import time
rshri2d386cb2024-07-05 14:35:51 +000023from osm_nbi.base_topic import BaseTopic, EngineException
shrinithi28d887f2025-01-08 05:27:19 +000024from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic
rshri2d386cb2024-07-05 14:35:51 +000025
yshah53cc9eb2024-07-05 13:06:31 +000026from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000027from osm_nbi.validation import (
28 ValidationError,
yshah99122b82024-11-18 07:05:29 +000029 validate_input,
garciadeblasf30e33d2025-09-15 15:42:15 +020030 cluster_creation_new_schema,
yshah99122b82024-11-18 07:05:29 +000031 cluster_edit_schema,
32 cluster_update_schema,
rshri2d386cb2024-07-05 14:35:51 +000033 infra_controller_profile_create_new_schema,
34 infra_config_profile_create_new_schema,
35 app_profile_create_new_schema,
36 resource_profile_create_new_schema,
37 infra_controller_profile_create_edit_schema,
38 infra_config_profile_create_edit_schema,
39 app_profile_create_edit_schema,
40 resource_profile_create_edit_schema,
garciadeblasf30e33d2025-09-15 15:42:15 +020041 cluster_registration_new_schema,
rshri2d386cb2024-07-05 14:35:51 +000042 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000043 ksu_schema,
garciadeblasb798f452025-08-05 18:21:26 +020044 app_instance_schema,
yshah53cc9eb2024-07-05 13:06:31 +000045 oka_schema,
yshahd23c6a52025-06-13 05:49:31 +000046 node_create_new_schema,
47 node_edit_schema,
rshri2d386cb2024-07-05 14:35:51 +000048)
yshah53cc9eb2024-07-05 13:06:31 +000049from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000050from osm_common.msgbase import MsgException
51from osm_common.fsbase import FsException
52
yshah53cc9eb2024-07-05 13:06:31 +000053__author__ = (
54 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
55 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
56)
rshri2d386cb2024-07-05 14:35:51 +000057
58
shrinithi28d887f2025-01-08 05:27:19 +000059class InfraContTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000060 topic = "k8sinfra_controller"
61 topic_msg = "k8s_infra_controller"
62 schema_new = infra_controller_profile_create_new_schema
63 schema_edit = infra_controller_profile_create_edit_schema
64
65 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000066 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000067
68 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
69 # To create the new infra controller profile
70 return self.new_profile(rollback, session, indata, kwargs, headers)
71
72 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
73 # To create the default infra controller profile while creating the cluster
74 return self.default_profile(rollback, session, indata, kwargs, headers)
75
76 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +000077 check = {"infra_controller_profiles": _id}
78 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +020079 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +000080 return _id
rshri2d386cb2024-07-05 14:35:51 +000081
82
shrinithi28d887f2025-01-08 05:27:19 +000083class InfraConfTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000084 topic = "k8sinfra_config"
85 topic_msg = "k8s_infra_config"
86 schema_new = infra_config_profile_create_new_schema
87 schema_edit = infra_config_profile_create_edit_schema
88
89 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000090 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000091
92 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
93 # To create the new infra config profile
94 return self.new_profile(rollback, session, indata, kwargs, headers)
95
96 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
97 # To create the default infra config profile while creating the cluster
98 return self.default_profile(rollback, session, indata, kwargs, headers)
99
100 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000101 check = {"infra_config_profiles": _id}
102 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200103 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000104 return _id
rshri2d386cb2024-07-05 14:35:51 +0000105
106
garciadeblasb798f452025-08-05 18:21:26 +0200107class AppProfileTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000108 topic = "k8sapp"
109 topic_msg = "k8s_app"
110 schema_new = app_profile_create_new_schema
111 schema_edit = app_profile_create_edit_schema
112
113 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000114 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000115
116 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
117 # To create the new app profile
118 return self.new_profile(rollback, session, indata, kwargs, headers)
119
120 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
121 # To create the default app profile while creating the cluster
122 return self.default_profile(rollback, session, indata, kwargs, headers)
123
124 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000125 check = {"app_profiles": _id}
126 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200127 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000128 return _id
rshri2d386cb2024-07-05 14:35:51 +0000129
130
shrinithi28d887f2025-01-08 05:27:19 +0000131class ResourceTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000132 topic = "k8sresource"
133 topic_msg = "k8s_resource"
134 schema_new = resource_profile_create_new_schema
135 schema_edit = resource_profile_create_edit_schema
136
137 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000138 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000139
140 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
141 # To create the new resource profile
142 return self.new_profile(rollback, session, indata, kwargs, headers)
143
144 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
145 # To create the default resource profile while creating the cluster
146 return self.default_profile(rollback, session, indata, kwargs, headers)
147
148 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000149 check = {"resource_profiles": _id}
150 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200151 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000152 return _id
rshri2d386cb2024-07-05 14:35:51 +0000153
154
shrinithi28d887f2025-01-08 05:27:19 +0000155class ClusterTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000156 topic = "clusters"
157 topic_msg = "cluster"
garciadeblasf30e33d2025-09-15 15:42:15 +0200158 schema_new = cluster_creation_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000159 schema_edit = attach_dettach_profile_schema
160
161 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000162 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000163 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
164 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
165 self.resource_topic = ResourceTopic(db, fs, msg, auth)
garciadeblasb798f452025-08-05 18:21:26 +0200166 self.app_topic = AppProfileTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000167
garciadeblasbecc7052024-11-20 12:04:53 +0100168 @staticmethod
169 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000170 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100171 content["current_operation"] = None
172
rshri2d386cb2024-07-05 14:35:51 +0000173 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
174 """
175 Creates a new k8scluster into database.
176 :param rollback: list to append the created items at database in case a rollback must be done
177 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
178 :param indata: params to be used for the k8cluster
179 :param kwargs: used to override the indata
180 :param headers: http request headers
181 :return: the _id of k8scluster created at database. Or an exception of type
182 EngineException, ValidationError, DbException, FsException, MsgException.
183 Note: Exceptions are not captured on purpose. They should be captured at called
184 """
rshri41b2db92025-06-11 11:17:42 +0000185
rshri2d386cb2024-07-05 14:35:51 +0000186 step = "checking quotas" # first step must be defined outside try
187 try:
rshri41b2db92025-06-11 11:17:42 +0000188 if self.multiproject:
189 self.check_quota(session)
190
191 content = self._remove_envelop(indata)
192
rshri2d386cb2024-07-05 14:35:51 +0000193 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000194 self.cluster_unique_name_check(session, indata["name"])
rshri41b2db92025-06-11 11:17:42 +0000195
rshri2d386cb2024-07-05 14:35:51 +0000196 step = "validating input parameters"
rshri41b2db92025-06-11 11:17:42 +0000197 self._update_input_with_kwargs(content, kwargs)
198
199 content = self._validate_input_new(content, session, force=session["force"])
200
201 operation_params = indata.copy()
202
203 self.check_conflict_on_new(session, content)
204 self.format_on_new(
205 content, project_id=session["project_id"], make_public=session["public"]
206 )
rshri2d386cb2024-07-05 14:35:51 +0000207
208 step = "filling cluster details from input data"
rshri41b2db92025-06-11 11:17:42 +0000209 content = self._create_cluster(
210 content, rollback, session, indata, kwargs, headers
rshri2d386cb2024-07-05 14:35:51 +0000211 )
212
213 step = "creating cluster at database"
rshri41b2db92025-06-11 11:17:42 +0000214 _id = self.db.create(self.topic, content)
215
rshri2d386cb2024-07-05 14:35:51 +0000216 op_id = self.format_on_operation(
rshri41b2db92025-06-11 11:17:42 +0000217 content,
rshri2d386cb2024-07-05 14:35:51 +0000218 "create",
219 operation_params,
220 )
rshri41b2db92025-06-11 11:17:42 +0000221
garciadeblas6e88d9c2024-08-15 10:55:04 +0200222 pubkey, privkey = self._generate_age_key()
rshri41b2db92025-06-11 11:17:42 +0000223 content["age_pubkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200224 pubkey, schema_version="1.11", salt=_id
225 )
rshri41b2db92025-06-11 11:17:42 +0000226 content["age_privkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200227 privkey, schema_version="1.11", salt=_id
228 )
rshri41b2db92025-06-11 11:17:42 +0000229
garciadeblas6e88d9c2024-08-15 10:55:04 +0200230 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000231 rollback.append({"topic": self.topic, "_id": _id})
rshri41b2db92025-06-11 11:17:42 +0000232 self.db.set_one("clusters", {"_id": _id}, content)
rshri2d386cb2024-07-05 14:35:51 +0000233 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
234
rshri50e34dc2024-12-02 03:10:39 +0000235 # To add the content in old collection "k8sclusters"
rshri41b2db92025-06-11 11:17:42 +0000236 self.add_to_old_collection(content, session)
rshri50e34dc2024-12-02 03:10:39 +0000237
rshri2d386cb2024-07-05 14:35:51 +0000238 return _id, None
239 except (
240 ValidationError,
241 EngineException,
242 DbException,
243 MsgException,
244 FsException,
245 ) as e:
246 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
247
rshri41b2db92025-06-11 11:17:42 +0000248 def _validate_input_new(self, content, session, force=False):
249 # validating vim and checking the mandatory parameters
250 vim_type = self.check_vim(session, content["vim_account"])
251
252 # for aws
253 if vim_type == "aws":
254 self._aws_check(content)
255
256 # for azure and gcp
257 elif vim_type in ["azure", "gcp"]:
258 self._params_check(content)
259
260 return super()._validate_input_new(content, force=session["force"])
261
262 def _aws_check(self, indata):
263 if "node_count" in indata or "node_size" in indata:
264 raise ValueError("node_count and node_size are not allowed for AWS")
265 return
266
267 def _params_check(self, indata):
268 if "node_count" not in indata and "node_size" not in indata:
269 raise ValueError("node_count and node_size are mandatory parameter")
270 return
271
272 def _create_cluster(self, content, rollback, session, indata, kwargs, headers):
273 private_subnet = indata.get("private_subnet")
274 public_subnet = indata.get("public_subnet")
275
276 # Enforce: if private_subnet is provided, public_subnet must also be provided
277 if (private_subnet and not public_subnet) or (
278 public_subnet and not private_subnet
279 ):
280 raise ValueError(
281 "'public_subnet' must be provided if 'private_subnet' is given and viceversa."
282 )
283
284 # private Subnet validation
285 if private_subnet:
286 count = len(private_subnet)
287 if count != 2:
288 raise ValueError(
289 f"private_subnet must contain exactly 2 items, got {count}"
290 )
291
292 # public Subnet validation
293 public_subnet = indata.get("public_subnet")
294 if public_subnet:
295 count = len(public_subnet)
296 if count != 1:
297 raise ValueError(
298 f"public_subnet must contain exactly 1 items, got {count}"
299 )
300
301 content["infra_controller_profiles"] = [
302 self._create_default_profiles(
303 rollback, session, indata, kwargs, headers, self.infra_contr_topic
304 )
305 ]
306 content["infra_config_profiles"] = [
307 self._create_default_profiles(
308 rollback, session, indata, kwargs, headers, self.infra_conf_topic
309 )
310 ]
311 content["resource_profiles"] = [
312 self._create_default_profiles(
313 rollback, session, indata, kwargs, headers, self.resource_topic
314 )
315 ]
316 content["app_profiles"] = [
317 self._create_default_profiles(
318 rollback, session, indata, kwargs, headers, self.app_topic
319 )
320 ]
321 content["created"] = "true"
322 content["state"] = "IN_CREATION"
323 content["operatingState"] = "PROCESSING"
324 content["git_name"] = self.create_gitname(content, session)
325 content["resourceState"] = "IN_PROGRESS.REQUEST_RECEIVED"
326
rshri2d386cb2024-07-05 14:35:51 +0000327 # Get the vim_account details
328 vim_account_details = self.db.get_one(
rshri41b2db92025-06-11 11:17:42 +0000329 "vim_accounts", {"name": content["vim_account"]}
rshri2d386cb2024-07-05 14:35:51 +0000330 )
331
garciadeblas616d4c72025-08-05 09:15:33 +0200332 # Add optional fields if they don't exist in the request
rshri41b2db92025-06-11 11:17:42 +0000333 if "region_name" not in indata:
334 region_name = vim_account_details.get("config", {}).get("region_name")
335 if region_name:
336 content["region_name"] = region_name
337
338 if "resource_group" not in indata:
339 resource_group = vim_account_details.get("config", {}).get("resource_group")
340 if resource_group:
341 content["resource_group"] = resource_group
342
343 version = "k8s_version" in content
344 if not version:
garciadeblas616d4c72025-08-05 09:15:33 +0200345 content["k8s_version"] = "1.32"
346 # Additional cluster information, specific for each cluster type
347 content["config"] = indata.get("config", {})
rshri41b2db92025-06-11 11:17:42 +0000348 content["node_count"] = indata.get("node_count", 0)
yshahd23c6a52025-06-13 05:49:31 +0000349 content["ksu_count"] = 0
garciadeblas616d4c72025-08-05 09:15:33 +0200350 self.logger.info(f"content is : {content}")
rshri41b2db92025-06-11 11:17:42 +0000351 return content
rshri2d386cb2024-07-05 14:35:51 +0000352
353 def check_vim(self, session, name):
354 try:
355 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
356 if vim_account_details is not None:
rshri41b2db92025-06-11 11:17:42 +0000357 return vim_account_details["vim_type"]
rshri2d386cb2024-07-05 14:35:51 +0000358 except ValidationError as e:
359 raise EngineException(
360 e,
361 HTTPStatus.UNPROCESSABLE_ENTITY,
362 )
363
364 def _create_default_profiles(
365 self, rollback, session, indata, kwargs, headers, topic
366 ):
367 topic = self.to_select_topic(topic)
368 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
369 return default_profiles
370
371 def to_select_topic(self, topic):
372 if topic == "infra_controller_profiles":
373 topic = self.infra_contr_topic
374 elif topic == "infra_config_profiles":
375 topic = self.infra_conf_topic
376 elif topic == "resource_profiles":
377 topic = self.resource_topic
378 elif topic == "app_profiles":
379 topic = self.app_topic
380 return topic
381
382 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
383 try:
384 filter_q = self._get_project_filter(session)
385 filter_q[self.id_field(self.topic, _id)] = _id
386 content = self.db.get_one(self.topic, filter_q)
387 existing_profiles = []
388 topic = None
389 topic = self.to_select_topic(profile)
390 for profile_id in content[profile]:
391 data = topic.show(session, profile_id, filter_q, api_req)
392 existing_profiles.append(data)
393 return existing_profiles
394 except ValidationError as e:
395 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
396
397 def state_check(self, profile_id, session, topic):
398 topic = self.to_select_topic(topic)
399 content = topic.show(session, profile_id, filter_q=None, api_req=False)
400 state = content["state"]
401 if state == "CREATED":
402 return
403 else:
404 raise EngineException(
405 f" {profile_id} is not in created state",
406 HTTPStatus.UNPROCESSABLE_ENTITY,
407 )
408
409 def edit(self, session, _id, item, indata=None, kwargs=None):
rshri50e34dc2024-12-02 03:10:39 +0000410 if item not in (
yshah99122b82024-11-18 07:05:29 +0000411 "infra_controller_profiles",
412 "infra_config_profiles",
413 "app_profiles",
414 "resource_profiles",
415 ):
416 self.schema_edit = cluster_edit_schema
417 super().edit(session, _id, indata=item, kwargs=kwargs, content=None)
rshri2d386cb2024-07-05 14:35:51 +0000418 else:
yshah99122b82024-11-18 07:05:29 +0000419 indata = self._remove_envelop(indata)
420 indata = self._validate_input_edit(
421 indata, content=None, force=session["force"]
422 )
423 if indata.get("add_profile"):
424 self.add_profile(session, _id, item, indata)
425 elif indata.get("remove_profile"):
426 self.remove_profile(session, _id, item, indata)
427 else:
428 error_msg = "Add / remove operation is only applicable"
429 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
rshri2d386cb2024-07-05 14:35:51 +0000430
yshah00a620a2025-01-16 12:06:40 +0000431 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
432 check = self.db.get_one(self.topic, {"_id": _id})
433 if "name" in indata and check["name"] != indata["name"]:
434 self.check_unique_name(session, indata["name"])
435 _filter = {"name": indata["name"]}
436 topic_list = [
437 "k8sclusters",
438 "k8sinfra_controller",
439 "k8sinfra_config",
440 "k8sapp",
441 "k8sresource",
442 ]
443 # Check unique name for k8scluster and profiles
444 for topic in topic_list:
445 if self.db.get_one(
446 topic, _filter, fail_on_empty=False, fail_on_more=False
447 ):
448 raise EngineException(
449 "name '{}' already exists for {}".format(indata["name"], topic),
450 HTTPStatus.CONFLICT,
451 )
452 # Replace name in k8scluster and profiles
453 for topic in topic_list:
454 data = self.db.get_one(topic, {"name": check["name"]})
455 data["name"] = indata["name"]
456 self.db.replace(topic, data["_id"], data)
457 return True
458
rshri2d386cb2024-07-05 14:35:51 +0000459 def add_profile(self, session, _id, item, indata=None):
yshahf8f07632025-01-17 04:46:03 +0000460 check = {"cluster": _id, item: indata["add_profile"][0]["id"]}
461 self.check_dependency(check)
rshri2d386cb2024-07-05 14:35:51 +0000462 indata = self._remove_envelop(indata)
463 operation_params = indata
464 profile_id = indata["add_profile"][0]["id"]
465 # check state
466 self.state_check(profile_id, session, item)
467 filter_q = self._get_project_filter(session)
468 filter_q[self.id_field(self.topic, _id)] = _id
469 content = self.db.get_one(self.topic, filter_q)
470 profile_list = content[item]
471
472 if profile_id not in profile_list:
473 content["operatingState"] = "PROCESSING"
rshri2d386cb2024-07-05 14:35:51 +0000474 op_id = self.format_on_operation(
475 content,
476 "add",
477 operation_params,
478 )
479 self.db.set_one("clusters", {"_id": content["_id"]}, content)
480 self._send_msg(
481 "add",
482 {
483 "cluster_id": _id,
484 "profile_id": profile_id,
485 "profile_type": item,
486 "operation_id": op_id,
487 },
488 )
489 else:
490 raise EngineException(
491 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
492 )
493
494 def _get_default_profiles(self, session, topic):
495 topic = self.to_select_topic(topic)
496 existing_profiles = topic.list(session, filter_q=None, api_req=False)
497 default_profiles = [
498 profile["_id"]
499 for profile in existing_profiles
500 if profile.get("default", False)
501 ]
502 return default_profiles
503
504 def remove_profile(self, session, _id, item, indata):
yshahf8f07632025-01-17 04:46:03 +0000505 check = {"cluster": _id, item: indata["remove_profile"][0]["id"]}
506 self.check_dependency(check)
rshri2d386cb2024-07-05 14:35:51 +0000507 indata = self._remove_envelop(indata)
508 operation_params = indata
509 profile_id = indata["remove_profile"][0]["id"]
510 filter_q = self._get_project_filter(session)
511 filter_q[self.id_field(self.topic, _id)] = _id
512 content = self.db.get_one(self.topic, filter_q)
513 profile_list = content[item]
514
515 default_profiles = self._get_default_profiles(session, item)
516
517 if profile_id in default_profiles:
518 raise EngineException(
519 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
520 )
521 if profile_id in profile_list:
rshri2d386cb2024-07-05 14:35:51 +0000522 op_id = self.format_on_operation(
523 content,
524 "remove",
525 operation_params,
526 )
527 self.db.set_one("clusters", {"_id": content["_id"]}, content)
528 self._send_msg(
529 "remove",
530 {
531 "cluster_id": _id,
532 "profile_id": profile_id,
533 "profile_type": item,
534 "operation_id": op_id,
535 },
536 )
537 else:
538 raise EngineException(
539 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
540 )
541
shahithyab9eb4142024-10-17 05:51:39 +0000542 def get_cluster_creds(self, session, _id, item):
yshah53cc9eb2024-07-05 13:06:31 +0000543 if not self.multiproject:
544 filter_db = {}
545 else:
546 filter_db = self._get_project_filter(session)
yshah53cc9eb2024-07-05 13:06:31 +0000547 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
garciadeblasbecc7052024-11-20 12:04:53 +0100548 operation_params = None
shahithyab9eb4142024-10-17 05:51:39 +0000549 data = self.db.get_one(self.topic, filter_db)
garciadeblasbecc7052024-11-20 12:04:53 +0100550 op_id = self.format_on_operation(data, item, operation_params)
shahithyab9eb4142024-10-17 05:51:39 +0000551 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
552 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id})
553 return op_id
554
555 def get_cluster_creds_file(self, session, _id, item, op_id):
556 if not self.multiproject:
557 filter_db = {}
558 else:
559 filter_db = self._get_project_filter(session)
560 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
shahithya8bded112024-10-15 08:01:44 +0000561
562 data = self.db.get_one(self.topic, filter_db)
shahithyab9eb4142024-10-17 05:51:39 +0000563 creds_flag = None
564 for operations in data["operationHistory"]:
565 if operations["op_id"] == op_id:
566 creds_flag = operations["result"]
567 self.logger.info("Creds Flag: {}".format(creds_flag))
shahithya8bded112024-10-15 08:01:44 +0000568
shahithyab9eb4142024-10-17 05:51:39 +0000569 if creds_flag is True:
570 credentials = data["credentials"]
shahithya8bded112024-10-15 08:01:44 +0000571
shahithyab9eb4142024-10-17 05:51:39 +0000572 file_pkg = None
573 current_path = _id
shahithya8bded112024-10-15 08:01:44 +0000574
shahithyab9eb4142024-10-17 05:51:39 +0000575 self.fs.file_delete(current_path, ignore_non_exist=True)
576 self.fs.mkdir(current_path)
577 filename = "credentials.yaml"
578 file_path = (current_path, filename)
579 self.logger.info("File path: {}".format(file_path))
580 file_pkg = self.fs.file_open(file_path, "a+b")
shahithya8bded112024-10-15 08:01:44 +0000581
shahithyab9eb4142024-10-17 05:51:39 +0000582 credentials_yaml = yaml.safe_dump(
583 credentials, indent=4, default_flow_style=False
584 )
585 file_pkg.write(credentials_yaml.encode(encoding="utf-8"))
shahithya8bded112024-10-15 08:01:44 +0000586
shahithyab9eb4142024-10-17 05:51:39 +0000587 if file_pkg:
588 file_pkg.close()
589 file_pkg = None
590 self.fs.sync(from_path=current_path)
shahithya8bded112024-10-15 08:01:44 +0000591
shahithyab9eb4142024-10-17 05:51:39 +0000592 return (
593 self.fs.file_open((current_path, filename), "rb"),
594 "text/plain",
595 )
596 else:
597 raise EngineException(
598 "Not possible to get the credentials of the cluster",
599 HTTPStatus.UNPROCESSABLE_ENTITY,
600 )
yshah53cc9eb2024-07-05 13:06:31 +0000601
yshahd23c6a52025-06-13 05:49:31 +0000602 def update_item(self, session, _id, item, indata):
yshah53cc9eb2024-07-05 13:06:31 +0000603 if not self.multiproject:
604 filter_db = {}
605 else:
606 filter_db = self._get_project_filter(session)
607 # To allow project&user addressing by name AS WELL AS _id
608 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
yshah99122b82024-11-18 07:05:29 +0000609 validate_input(indata, cluster_update_schema)
yshah53cc9eb2024-07-05 13:06:31 +0000610 data = self.db.get_one(self.topic, filter_db)
yshah99122b82024-11-18 07:05:29 +0000611 operation_params = {}
yshah53cc9eb2024-07-05 13:06:31 +0000612 data["operatingState"] = "PROCESSING"
613 data["resourceState"] = "IN_PROGRESS"
614 operation_params = indata
yshahd0c876f2024-11-11 09:24:48 +0000615 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000616 data,
617 item,
618 operation_params,
619 )
620 self.db.set_one(self.topic, {"_id": _id}, data)
yshah53cc9eb2024-07-05 13:06:31 +0000621 data = {"cluster_id": _id, "operation_id": op_id}
622 self._send_msg(item, data)
623 return op_id
624
shrinithi28d887f2025-01-08 05:27:19 +0000625 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
626 op_id = self.common_delete(_id, db_content)
yshah781ce732025-02-28 08:56:13 +0000627 return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]}
rshri2d386cb2024-07-05 14:35:51 +0000628
shrinithi28d887f2025-01-08 05:27:19 +0000629 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000630 check = {"cluster": _id}
631 self.check_dependency(check, operation_type="delete")
shrinithi28d887f2025-01-08 05:27:19 +0000632 filter_q = self._get_project_filter(session)
633 filter_q[self.id_field(self.topic, _id)] = _id
634 check = self.db.get_one(self.topic, filter_q)
shrinithi75492bd2025-03-21 18:37:44 +0000635 op_id = check["current_operation"]
shrinithi28d887f2025-01-08 05:27:19 +0000636 if check["created"] == "false":
637 raise EngineException(
garciadeblas3d1d6272025-02-04 11:55:36 +0100638 "Cannot delete registered cluster. Please deregister.",
shrinithi28d887f2025-01-08 05:27:19 +0000639 HTTPStatus.UNPROCESSABLE_ENTITY,
640 )
garciadeblas14fed6f2025-04-02 12:53:22 +0200641 super().delete(session, _id, dry_run, not_send_msg)
shrinithi75492bd2025-03-21 18:37:44 +0000642 return op_id
shrinithi28d887f2025-01-08 05:27:19 +0000643
644
yshahd23c6a52025-06-13 05:49:31 +0000645class NodeGroupTopic(ACMTopic):
646 topic = "nodegroups"
647 topic_msg = "nodegroup"
648 schema_new = node_create_new_schema
649 schema_edit = node_edit_schema
650
651 def __init__(self, db, fs, msg, auth):
652 BaseTopic.__init__(self, db, fs, msg, auth)
653
654 @staticmethod
655 def format_on_new(content, project_id=None, make_public=False):
656 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
657 content["current_operation"] = None
658 content["state"] = "IN_CREATION"
659 content["operatingState"] = "PROCESSING"
660 content["resourceState"] = "IN_PROGRESS"
661
662 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
663 self.logger.info(f"Indata: {indata}")
664 self.check_unique_name(session, indata["name"])
665
666 indata = self._remove_envelop(indata)
667 self._update_input_with_kwargs(indata, kwargs)
668 if not indata.get("private_subnet") and not indata.get("public_subnet"):
669 raise EngineException(
670 "Please provide atleast one subnet",
671 HTTPStatus.UNPROCESSABLE_ENTITY,
672 )
673 content = self._validate_input_new(indata, session["force"])
674
675 self.logger.info(f"Indata: {indata}")
676 self.logger.info(f"Content: {content}")
677 cluster_id = content["cluster_id"]
678 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
679 private_subnet = db_cluster.get("private_subnet")
680 public_subnet = db_cluster.get("public_subnet")
681 if content.get("private_subnet"):
682 for subnet in content["private_subnet"]:
683 if subnet not in private_subnet:
684 raise EngineException(
685 "No External subnet is used to add nodegroup",
686 HTTPStatus.UNPROCESSABLE_ENTITY,
687 )
688 if content.get("public_subnet"):
689 for subnet in content["public_subnet"]:
690 if subnet not in public_subnet:
691 raise EngineException(
692 "No External subnet is used to add nodegroup",
693 HTTPStatus.UNPROCESSABLE_ENTITY,
694 )
695
696 operation_params = {}
697 for content_key, content_value in content.items():
698 operation_params[content_key] = content_value
699 self.format_on_new(
700 content, session["project_id"], make_public=session["public"]
701 )
702 content["git_name"] = self.create_gitname(content, session)
703 self.logger.info(f"Operation Params: {operation_params}")
704 op_id = self.format_on_operation(
705 content,
706 "create",
707 operation_params,
708 )
709 node_count = db_cluster.get("node_count")
710 new_node_count = node_count + 1
711 self.logger.info(f"New Node count: {new_node_count}")
712 db_cluster["node_count"] = new_node_count
713 self.db.set_one("clusters", {"_id": cluster_id}, db_cluster)
714 _id = self.db.create(self.topic, content)
715 self._send_msg("add_nodegroup", {"nodegroup_id": _id, "operation_id": op_id})
716 return _id, op_id
717
718 def list(self, session, filter_q=None, api_req=False):
719 db_filter = {}
720 if filter_q.get("cluster_id"):
721 db_filter["cluster_id"] = filter_q.get("cluster_id")
722 data_list = self.db.get_list(self.topic, db_filter)
723 cluster_data = self.db.get_one("clusters", {"_id": db_filter["cluster_id"]})
724 self.logger.info(f"Cluster Data: {cluster_data}")
725 self.logger.info(f"Data: {data_list}")
726 if filter_q.get("cluster_id"):
727 outdata = {}
728 outdata["count"] = cluster_data["node_count"]
729 outdata["data"] = data_list
730 self.logger.info(f"Outdata: {outdata}")
731 return outdata
732 if api_req:
733 data_list = [self.sol005_projection(inst) for inst in data_list]
734 return data_list
735
736 def delete(self, session, _id, dry_run=False, not_send_msg=None):
737 if not self.multiproject:
738 filter_q = {}
739 else:
740 filter_q = self._get_project_filter(session)
741 filter_q[self.id_field(self.topic, _id)] = _id
742 item_content = self.db.get_one(self.topic, filter_q)
743 item_content["state"] = "IN_DELETION"
744 item_content["operatingState"] = "PROCESSING"
745 item_content["resourceState"] = "IN_PROGRESS"
746 self.check_conflict_on_del(session, _id, item_content)
747 op_id = self.format_on_operation(
748 item_content,
749 "delete",
750 None,
751 )
752 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
753 self._send_msg(
754 "delete_nodegroup",
755 {"nodegroup_id": _id, "operation_id": op_id},
756 not_send_msg=not_send_msg,
757 )
758 return op_id
759
760 def update_item(self, session, _id, item, indata):
761 content = None
762 try:
763 if not content:
764 content = self.db.get_one(self.topic, {"_id": _id})
765 indata = self._validate_input_edit(indata, content, force=session["force"])
766 _id = content.get("_id") or _id
767
768 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
769 op_id = self.format_on_edit(content, indata)
770 op_id = ACMTopic.format_on_operation(
771 content,
772 "scale",
773 indata,
774 )
775 self.logger.info(f"op_id: {op_id}")
776 content["operatingState"] = "PROCESSING"
777 content["resourceState"] = "IN_PROGRESS"
778 self.db.replace(self.topic, _id, content)
779 self._send_msg(
780 "scale_nodegroup", {"nodegroup_id": _id, "operation_id": op_id}
781 )
782 return op_id
783 except ValidationError as e:
784 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
785
786 def edit(self, session, _id, indata, kwargs):
787 content = None
788
789 # Override descriptor with query string kwargs
790 if kwargs:
791 self._update_input_with_kwargs(indata, kwargs)
792 try:
793 if indata and session.get("set_project"):
794 raise EngineException(
795 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
796 HTTPStatus.UNPROCESSABLE_ENTITY,
797 )
798 # TODO self._check_edition(session, indata, _id, force)
799 if not content:
800 content = self.db.get_one(self.topic, {"_id": _id})
801
802 indata = self._validate_input_edit(indata, content, force=session["force"])
803 self.logger.info(f"Indata: {indata}")
804
805 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
806 _id = content.get("_id") or _id
807
808 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
809 if "name" in indata and "description" in indata:
810 content["name"] = indata["name"]
811 content["description"] = indata["description"]
812 elif "name" in indata:
813 content["name"] = indata["name"]
814 elif "description" in indata:
815 content["description"] = indata["description"]
816 op_id = self.format_on_edit(content, indata)
817 self.db.set_one(self.topic, {"_id": _id}, content)
818 return op_id
819 except ValidationError as e:
820 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
821
822
shrinithi28d887f2025-01-08 05:27:19 +0000823class ClusterOpsTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000824 topic = "clusters"
825 topic_msg = "cluster"
garciadeblasf30e33d2025-09-15 15:42:15 +0200826 schema_new = cluster_registration_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000827
828 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000829 super().__init__(db, fs, msg, auth)
garciadeblas3d5dc322025-04-03 23:57:04 +0200830 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
831 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
832 self.resource_topic = ResourceTopic(db, fs, msg, auth)
garciadeblasb798f452025-08-05 18:21:26 +0200833 self.app_topic = AppProfileTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000834
garciadeblasbecc7052024-11-20 12:04:53 +0100835 @staticmethod
836 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000837 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100838 content["current_operation"] = None
839
rshri2d386cb2024-07-05 14:35:51 +0000840 def add(self, rollback, session, indata, kwargs=None, headers=None):
841 step = "checking quotas"
842 try:
843 self.check_quota(session)
844 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000845 self.cluster_unique_name_check(session, indata["name"])
846 # self.check_unique_name(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000847 step = "validating input parameters"
848 cls_add_request = self._remove_envelop(indata)
849 self._update_input_with_kwargs(cls_add_request, kwargs)
850 cls_add_request = self._validate_input_new(
851 cls_add_request, session["force"]
852 )
853 operation_params = cls_add_request
854
855 step = "filling cluster details from input data"
garciadeblas3d5dc322025-04-03 23:57:04 +0200856 cls_add_request = self._add_cluster(
857 cls_add_request, rollback, session, indata, kwargs, headers
858 )
rshri2d386cb2024-07-05 14:35:51 +0000859
rshri17b09ec2024-11-07 05:48:12 +0000860 step = "registering the cluster at database"
rshri2d386cb2024-07-05 14:35:51 +0000861 self.format_on_new(
rshri17b09ec2024-11-07 05:48:12 +0000862 cls_add_request, session["project_id"], make_public=session["public"]
rshri2d386cb2024-07-05 14:35:51 +0000863 )
rshri2d386cb2024-07-05 14:35:51 +0000864 op_id = self.format_on_operation(
rshri17b09ec2024-11-07 05:48:12 +0000865 cls_add_request,
rshri2d386cb2024-07-05 14:35:51 +0000866 "register",
867 operation_params,
868 )
rshri17b09ec2024-11-07 05:48:12 +0000869 _id = self.db.create(self.topic, cls_add_request)
garciadeblas9d9d9262024-09-25 11:25:33 +0200870 pubkey, privkey = self._generate_age_key()
rshri17b09ec2024-11-07 05:48:12 +0000871 cls_add_request["age_pubkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200872 pubkey, schema_version="1.11", salt=_id
873 )
rshri17b09ec2024-11-07 05:48:12 +0000874 cls_add_request["age_privkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200875 privkey, schema_version="1.11", salt=_id
876 )
877 # TODO: set age_pubkey and age_privkey in the default profiles
rshri17b09ec2024-11-07 05:48:12 +0000878 self.db.set_one(self.topic, {"_id": _id}, cls_add_request)
rshri2d386cb2024-07-05 14:35:51 +0000879 rollback.append({"topic": self.topic, "_id": _id})
880 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
rshri50e34dc2024-12-02 03:10:39 +0000881
882 # To add the content in old collection "k8sclusters"
883 self.add_to_old_collection(cls_add_request, session)
884
rshri2d386cb2024-07-05 14:35:51 +0000885 return _id, None
886 except (
887 ValidationError,
888 EngineException,
889 DbException,
890 MsgException,
891 FsException,
892 ) as e:
893 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
894
garciadeblas3d5dc322025-04-03 23:57:04 +0200895 def _add_cluster(self, cls_add_request, rollback, session, indata, kwargs, headers):
rshri2d386cb2024-07-05 14:35:51 +0000896 cls_add = {
897 "name": cls_add_request["name"],
rshri2d386cb2024-07-05 14:35:51 +0000898 "credentials": cls_add_request["credentials"],
899 "vim_account": cls_add_request["vim_account"],
rshri17b09ec2024-11-07 05:48:12 +0000900 "bootstrap": cls_add_request["bootstrap"],
garciadeblas616d4c72025-08-05 09:15:33 +0200901 "openshift": cls_add_request.get("openshift", False),
garciadeblas3d5dc322025-04-03 23:57:04 +0200902 "infra_controller_profiles": [
903 self._create_default_profiles(
904 rollback, session, indata, kwargs, headers, self.infra_contr_topic
905 )
906 ],
907 "infra_config_profiles": [
908 self._create_default_profiles(
909 rollback, session, indata, kwargs, headers, self.infra_conf_topic
910 )
911 ],
912 "resource_profiles": [
913 self._create_default_profiles(
914 rollback, session, indata, kwargs, headers, self.resource_topic
915 )
916 ],
917 "app_profiles": [
918 self._create_default_profiles(
919 rollback, session, indata, kwargs, headers, self.app_topic
920 )
921 ],
rshri2d386cb2024-07-05 14:35:51 +0000922 "created": "false",
923 "state": "IN_CREATION",
924 "operatingState": "PROCESSING",
925 "git_name": self.create_gitname(cls_add_request, session),
926 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
927 }
rshri17b09ec2024-11-07 05:48:12 +0000928 # Add optional fields if they exist in the request
929 if "description" in cls_add_request:
930 cls_add["description"] = cls_add_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000931 return cls_add
932
garciadeblas3d5dc322025-04-03 23:57:04 +0200933 def check_vim(self, session, name):
934 try:
935 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
936 if vim_account_details is not None:
937 return name
938 except ValidationError as e:
939 raise EngineException(
940 e,
941 HTTPStatus.UNPROCESSABLE_ENTITY,
942 )
943
944 def _create_default_profiles(
945 self, rollback, session, indata, kwargs, headers, topic
946 ):
947 topic = self.to_select_topic(topic)
948 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
949 return default_profiles
950
951 def to_select_topic(self, topic):
952 if topic == "infra_controller_profiles":
953 topic = self.infra_contr_topic
954 elif topic == "infra_config_profiles":
955 topic = self.infra_conf_topic
956 elif topic == "resource_profiles":
957 topic = self.resource_topic
958 elif topic == "app_profiles":
959 topic = self.app_topic
960 return topic
961
rshri2d386cb2024-07-05 14:35:51 +0000962 def remove(self, session, _id, dry_run=False, not_send_msg=None):
963 """
964 Delete item by its internal _id
965 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
966 :param _id: server internal id
967 :param dry_run: make checking but do not delete
968 :param not_send_msg: To not send message (False) or store content (list) instead
969 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
970 """
971
972 # To allow addressing projects and users by name AS WELL AS by _id
973 if not self.multiproject:
974 filter_q = {}
975 else:
976 filter_q = self._get_project_filter(session)
977 filter_q[self.id_field(self.topic, _id)] = _id
978 item_content = self.db.get_one(self.topic, filter_q)
979
rshri2d386cb2024-07-05 14:35:51 +0000980 op_id = self.format_on_operation(
981 item_content,
982 "deregister",
983 None,
984 )
985 self.db.set_one(self.topic, {"_id": _id}, item_content)
986
987 self.check_conflict_on_del(session, _id, item_content)
988 if dry_run:
989 return None
990
991 if self.multiproject and session["project_id"]:
992 # remove reference from project_read if there are more projects referencing it. If it last one,
993 # do not remove reference, but delete
994 other_projects_referencing = next(
995 (
996 p
997 for p in item_content["_admin"]["projects_read"]
998 if p not in session["project_id"] and p != "ANY"
999 ),
1000 None,
1001 )
1002
1003 # check if there are projects referencing it (apart from ANY, that means, public)....
1004 if other_projects_referencing:
1005 # remove references but not delete
1006 update_dict_pull = {
1007 "_admin.projects_read": session["project_id"],
1008 "_admin.projects_write": session["project_id"],
1009 }
1010 self.db.set_one(
1011 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
1012 )
1013 return None
1014 else:
1015 can_write = next(
1016 (
1017 p
1018 for p in item_content["_admin"]["projects_write"]
1019 if p == "ANY" or p in session["project_id"]
1020 ),
1021 None,
1022 )
1023 if not can_write:
1024 raise EngineException(
1025 "You have not write permission to delete it",
1026 http_code=HTTPStatus.UNAUTHORIZED,
1027 )
1028
1029 # delete
1030 self._send_msg(
1031 "deregister",
1032 {"cluster_id": _id, "operation_id": op_id},
1033 not_send_msg=not_send_msg,
1034 )
shrinithi75492bd2025-03-21 18:37:44 +00001035 return _id
yshah53cc9eb2024-07-05 13:06:31 +00001036
1037
shrinithi28d887f2025-01-08 05:27:19 +00001038class KsusTopic(ACMTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001039 topic = "ksus"
1040 okapkg_topic = "okas"
yshah53cc9eb2024-07-05 13:06:31 +00001041 topic_msg = "ksu"
1042 schema_new = ksu_schema
1043 schema_edit = ksu_schema
yshahd23c6a52025-06-13 05:49:31 +00001044 MAP_PROFILE = {
1045 "infra_controller_profiles": "infra-controllers",
1046 "infra_config_profiles": "infra-configs",
1047 "resource_profiles": "managed_resources",
1048 "app_profiles": "apps",
1049 }
yshah53cc9eb2024-07-05 13:06:31 +00001050
1051 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +00001052 super().__init__(db, fs, msg, auth)
yshah53cc9eb2024-07-05 13:06:31 +00001053 self.logger = logging.getLogger("nbi.ksus")
1054
1055 @staticmethod
1056 def format_on_new(content, project_id=None, make_public=False):
1057 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +01001058 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001059 content["state"] = "IN_CREATION"
1060 content["operatingState"] = "PROCESSING"
1061 content["resourceState"] = "IN_PROGRESS"
1062
1063 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1064 _id_list = []
yshahf8f07632025-01-17 04:46:03 +00001065 for content in indata["ksus"]:
1066 check = {content["profile"]["profile_type"]: content["profile"]["_id"]}
yshah53cc9eb2024-07-05 13:06:31 +00001067 oka = content["oka"][0]
1068 oka_flag = ""
1069 if oka["_id"]:
yshahf8f07632025-01-17 04:46:03 +00001070 check["okas"] = []
yshah53cc9eb2024-07-05 13:06:31 +00001071 oka_flag = "_id"
shahithya8bded112024-10-15 08:01:44 +00001072 oka["sw_catalog_path"] = ""
yshah53cc9eb2024-07-05 13:06:31 +00001073 elif oka["sw_catalog_path"]:
1074 oka_flag = "sw_catalog_path"
1075
1076 for okas in content["oka"]:
yshahf8f07632025-01-17 04:46:03 +00001077 if okas.get("_id") is not None:
1078 check["okas"].append(okas["_id"])
yshah53cc9eb2024-07-05 13:06:31 +00001079 if okas["_id"] and okas["sw_catalog_path"]:
1080 raise EngineException(
1081 "Cannot create ksu with both OKA and SW catalog path",
1082 HTTPStatus.UNPROCESSABLE_ENTITY,
1083 )
1084 if not okas["sw_catalog_path"]:
1085 okas.pop("sw_catalog_path")
1086 elif not okas["_id"]:
1087 okas.pop("_id")
1088 if oka_flag not in okas.keys():
1089 raise EngineException(
1090 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
1091 HTTPStatus.UNPROCESSABLE_ENTITY,
1092 )
yshahf8f07632025-01-17 04:46:03 +00001093 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001094
1095 # Override descriptor with query string kwargs
1096 content = self._remove_envelop(content)
1097 self._update_input_with_kwargs(content, kwargs)
1098 content = self._validate_input_new(input=content, force=session["force"])
1099
1100 # Check for unique name
1101 self.check_unique_name(session, content["name"])
1102
1103 self.check_conflict_on_new(session, content)
1104
1105 operation_params = {}
1106 for content_key, content_value in content.items():
1107 operation_params[content_key] = content_value
1108 self.format_on_new(
1109 content, project_id=session["project_id"], make_public=session["public"]
1110 )
yshah53cc9eb2024-07-05 13:06:31 +00001111 op_id = self.format_on_operation(
1112 content,
1113 operation_type="create",
1114 operation_params=operation_params,
1115 )
1116 content["git_name"] = self.create_gitname(content, session)
1117
1118 # Update Oka_package usage state
1119 for okas in content["oka"]:
1120 if "_id" in okas.keys():
1121 self.update_usage_state(session, okas)
1122
yshahd23c6a52025-06-13 05:49:31 +00001123 profile_id = content["profile"].get("_id")
1124 profile_type = content["profile"].get("profile_type")
1125 db_cluster_list = self.db.get_list("clusters")
1126 for db_cluster in db_cluster_list:
1127 if db_cluster.get("created") == "true":
1128 profile_list = db_cluster[profile_type]
1129 if profile_id in profile_list:
1130 ksu_count = db_cluster.get("ksu_count")
1131 new_ksu_count = ksu_count + 1
1132 self.logger.info(f"New KSU count: {new_ksu_count}")
1133 db_cluster["ksu_count"] = new_ksu_count
1134 self.db.set_one(
1135 "clusters", {"_id": db_cluster["_id"]}, db_cluster
1136 )
1137
yshah53cc9eb2024-07-05 13:06:31 +00001138 _id = self.db.create(self.topic, content)
1139 rollback.append({"topic": self.topic, "_id": _id})
yshah53cc9eb2024-07-05 13:06:31 +00001140 _id_list.append(_id)
1141 data = {"ksus_list": _id_list, "operation_id": op_id}
1142 self._send_msg("create", data)
1143 return _id_list, op_id
1144
1145 def clone(self, rollback, session, _id, indata, kwargs, headers):
yshahf8f07632025-01-17 04:46:03 +00001146 check = {
1147 "ksu": _id,
1148 indata["profile"]["profile_type"]: indata["profile"]["_id"],
1149 }
1150 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001151 filter_db = self._get_project_filter(session)
1152 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1153 data = self.db.get_one(self.topic, filter_db)
1154
yshah53cc9eb2024-07-05 13:06:31 +00001155 op_id = self.format_on_operation(
1156 data,
1157 "clone",
1158 indata,
1159 )
1160 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
1161 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
1162 return op_id
1163
1164 def update_usage_state(self, session, oka_content):
1165 _id = oka_content["_id"]
1166 filter_db = self._get_project_filter(session)
1167 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1168
1169 data = self.db.get_one(self.okapkg_topic, filter_db)
1170 if data["_admin"]["usageState"] == "NOT_IN_USE":
1171 usage_state_update = {
1172 "_admin.usageState": "IN_USE",
1173 }
1174 self.db.set_one(
1175 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
1176 )
1177
1178 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
yshahf8f07632025-01-17 04:46:03 +00001179 check = {
1180 "ksu": _id,
1181 indata["profile"]["profile_type"]: indata["profile"]["_id"],
1182 }
1183 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001184 indata = self._remove_envelop(indata)
1185
1186 # Override descriptor with query string kwargs
1187 if kwargs:
1188 self._update_input_with_kwargs(indata, kwargs)
1189 try:
1190 if indata and session.get("set_project"):
1191 raise EngineException(
1192 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1193 HTTPStatus.UNPROCESSABLE_ENTITY,
1194 )
1195 # TODO self._check_edition(session, indata, _id, force)
1196 if not content:
1197 content = self.show(session, _id)
1198 indata = self._validate_input_edit(
1199 input=indata, content=content, force=session["force"]
1200 )
1201 operation_params = indata
1202 deep_update_rfc7396(content, indata)
1203
1204 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1205 _id = content.get("_id") or _id
yshahd0c876f2024-11-11 09:24:48 +00001206 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001207 content,
1208 "move",
1209 operation_params,
1210 )
1211 if content.get("_admin"):
1212 now = time()
1213 content["_admin"]["modified"] = now
1214 content["operatingState"] = "PROCESSING"
1215 content["resourceState"] = "IN_PROGRESS"
1216
1217 self.db.replace(self.topic, _id, content)
yshah53cc9eb2024-07-05 13:06:31 +00001218 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
1219 self._send_msg("move", data)
1220 return op_id
1221 except ValidationError as e:
1222 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1223
1224 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1225 if final_content["name"] != edit_content["name"]:
1226 self.check_unique_name(session, edit_content["name"])
1227 return final_content
1228
1229 @staticmethod
1230 def format_on_edit(final_content, edit_content):
shrinithi28d887f2025-01-08 05:27:19 +00001231 op_id = ACMTopic.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001232 final_content,
1233 "update",
1234 edit_content,
1235 )
1236 final_content["operatingState"] = "PROCESSING"
1237 final_content["resourceState"] = "IN_PROGRESS"
1238 if final_content.get("_admin"):
1239 now = time()
1240 final_content["_admin"]["modified"] = now
yshahd0c876f2024-11-11 09:24:48 +00001241 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001242
1243 def edit(self, session, _id, indata, kwargs):
1244 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001245 if _id == "update":
1246 for ksus in indata["ksus"]:
1247 content = ksus
1248 _id = content["_id"]
1249 _id_list.append(_id)
1250 content.pop("_id")
yshahd0c876f2024-11-11 09:24:48 +00001251 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001252 else:
1253 content = indata
1254 _id_list.append(_id)
yshahd0c876f2024-11-11 09:24:48 +00001255 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001256
1257 data = {"ksus_list": _id_list, "operation_id": op_id}
1258 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +00001259
yshahd23c6a52025-06-13 05:49:31 +00001260 def cluster_list_ksu(self, session, filter_q=None, api_req=None):
1261 db_filter = {}
1262 if filter_q.get("cluster_id"):
1263 db_filter["_id"] = filter_q.get("cluster_id")
1264 ksu_data_list = []
1265
1266 cluster_data = self.db.get_one("clusters", db_filter)
1267 profiles_list = [
1268 "infra_controller_profiles",
1269 "infra_config_profiles",
1270 "app_profiles",
1271 "resource_profiles",
1272 ]
1273 for profile in profiles_list:
1274 data_list = []
1275 for profile_id in cluster_data[profile]:
1276 filter_q = {"profile": {"_id": profile_id, "profile_type": profile}}
1277 data_list = self.db.get_list(self.topic, filter_q)
1278 for ksu_data in data_list:
1279 ksu_data["package_name"] = []
1280 ksu_data["package_path"] = []
1281 for okas in ksu_data["operationHistory"][0]["operationParams"]["oka"]:
1282 sw_catalog_path = okas.get("sw_catalog_path")
1283 if sw_catalog_path:
1284 parts = sw_catalog_path.rsplit("/", 2)
1285 self.logger.info(f"Parts: {parts}")
1286 ksu_data["package_name"].append(parts[-2])
1287 ksu_data["package_path"].append("/".join(parts[:-1]))
1288 else:
1289 oka_id = okas["_id"]
1290 db_oka = self.db.get_one("okas", {"_id": oka_id})
1291 oka_type = self.MAP_PROFILE[
1292 db_oka.get("profile_type", "infra_controller_profiles")
1293 ]
1294 ksu_data["package_name"].append(db_oka["git_name"].lower())
1295 ksu_data["package_path"].append(
1296 f"{oka_type}/{db_oka['git_name'].lower()}"
1297 )
1298 ksu_data_list.append(ksu_data)
1299
1300 outdata = {}
1301 outdata["count"] = cluster_data["ksu_count"]
1302 outdata["data"] = ksu_data_list
1303 self.logger.info(f"Outdata: {outdata}")
1304 return outdata
1305
yshahd0c876f2024-11-11 09:24:48 +00001306 def edit_ksu(self, session, _id, indata, kwargs):
yshahf8f07632025-01-17 04:46:03 +00001307 check = {
1308 "ksu": _id,
1309 }
1310 if indata.get("profile"):
1311 check[indata["profile"]["profile_type"]] = indata["profile"]["_id"]
1312 if indata.get("oka"):
1313 check["okas"] = []
1314 for oka in indata["oka"]:
1315 if oka.get("_id") is not None:
1316 check["okas"].append(oka["_id"])
1317
1318 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001319 content = None
1320 indata = self._remove_envelop(indata)
1321
1322 # Override descriptor with query string kwargs
1323 if kwargs:
1324 self._update_input_with_kwargs(indata, kwargs)
1325 try:
1326 if indata and session.get("set_project"):
1327 raise EngineException(
1328 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1329 HTTPStatus.UNPROCESSABLE_ENTITY,
1330 )
1331 # TODO self._check_edition(session, indata, _id, force)
1332 if not content:
1333 content = self.show(session, _id)
1334
1335 for okas in indata["oka"]:
1336 if not okas["_id"]:
1337 okas.pop("_id")
1338 if not okas["sw_catalog_path"]:
1339 okas.pop("sw_catalog_path")
1340
1341 indata = self._validate_input_edit(indata, content, force=session["force"])
1342
1343 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1344 _id = content.get("_id") or _id
1345
1346 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
yshah53cc9eb2024-07-05 13:06:31 +00001347 op_id = self.format_on_edit(content, indata)
1348 self.db.replace(self.topic, _id, content)
1349 return op_id
1350 except ValidationError as e:
1351 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1352
1353 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
1354 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001355 if _id == "delete":
1356 for ksus in indata["ksus"]:
1357 content = ksus
1358 _id = content["_id"]
yshah53cc9eb2024-07-05 13:06:31 +00001359 content.pop("_id")
garciadeblasac285872024-12-05 12:21:09 +01001360 op_id, not_send_msg_ksu = self.delete(session, _id)
1361 if not not_send_msg_ksu:
1362 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001363 else:
garciadeblasac285872024-12-05 12:21:09 +01001364 op_id, not_send_msg_ksu = self.delete(session, _id)
1365 if not not_send_msg_ksu:
1366 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001367
garciadeblasac285872024-12-05 12:21:09 +01001368 if _id_list:
yshah781ce732025-02-28 08:56:13 +00001369 data = {
1370 "ksus_list": _id_list,
1371 "operation_id": op_id,
1372 "force": session["force"],
1373 }
garciadeblasac285872024-12-05 12:21:09 +01001374 self._send_msg("delete", data, not_send_msg)
yshah53cc9eb2024-07-05 13:06:31 +00001375 return op_id
1376
yshahd0c876f2024-11-11 09:24:48 +00001377 def delete(self, session, _id):
yshah53cc9eb2024-07-05 13:06:31 +00001378 if not self.multiproject:
1379 filter_q = {}
1380 else:
1381 filter_q = self._get_project_filter(session)
1382 filter_q[self.id_field(self.topic, _id)] = _id
1383 item_content = self.db.get_one(self.topic, filter_q)
yshahf8f07632025-01-17 04:46:03 +00001384
1385 check = {
1386 "ksu": _id,
1387 item_content["profile"]["profile_type"]: item_content["profile"]["_id"],
1388 }
1389 self.check_dependency(check, operation_type="delete")
1390
yshah53cc9eb2024-07-05 13:06:31 +00001391 item_content["state"] = "IN_DELETION"
1392 item_content["operatingState"] = "PROCESSING"
1393 item_content["resourceState"] = "IN_PROGRESS"
yshahd0c876f2024-11-11 09:24:48 +00001394 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001395 item_content,
1396 "delete",
1397 None,
1398 )
1399 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1400
garciadeblasac285872024-12-05 12:21:09 +01001401 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1402 not_send_msg = None
1403 profile_id = item_content["profile"]["_id"]
1404 profile_type = item_content["profile"]["profile_type"]
1405 profile_collection_map = {
1406 "app_profiles": "k8sapp",
1407 "resource_profiles": "k8sresource",
1408 "infra_controller_profiles": "k8sinfra_controller",
1409 "infra_config_profiles": "k8sinfra_config",
1410 }
1411 profile_collection = profile_collection_map[profile_type]
1412 profile_content = self.db.get_one(
1413 profile_collection, {"_id": profile_id}, fail_on_empty=False
1414 )
1415 if not profile_content:
1416 self.db.del_one(self.topic, filter_q)
1417 not_send_msg = True
1418 return op_id, not_send_msg
yshah53cc9eb2024-07-05 13:06:31 +00001419
1420
garciadeblasb798f452025-08-05 18:21:26 +02001421class AppInstanceTopic(ACMTopic):
1422 topic = "appinstances"
1423 okapkg_topic = "okas"
1424 topic_msg = "appinstance"
1425 schema_new = app_instance_schema
1426 schema_edit = app_instance_schema
1427
1428 def __init__(self, db, fs, msg, auth):
1429 super().__init__(db, fs, msg, auth)
1430 self.logger = logging.getLogger("nbi.appinstances")
1431
1432 @staticmethod
1433 def format_on_new(content, project_id=None, make_public=False):
1434 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
1435 content["current_operation"] = None
1436 content["state"] = "IN_CREATION"
1437 content["operatingState"] = "PROCESSING"
1438 content["resourceState"] = "IN_PROGRESS"
1439
1440 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1441 if indata.get("oka") and indata.get("sw_catalog_path"):
1442 raise EngineException(
1443 "Cannot create app instance with both OKA and SW catalog path",
1444 HTTPStatus.UNPROCESSABLE_ENTITY,
1445 )
1446
1447 # Override descriptor with query string kwargs
1448 content = self._remove_envelop(indata)
1449 self._update_input_with_kwargs(content, kwargs)
1450 content = self._validate_input_new(input=content, force=session["force"])
1451
1452 # Check for unique name
1453 self.check_unique_name(session, content["name"])
1454
1455 self.check_conflict_on_new(session, content)
1456
1457 operation_params = {}
1458 for content_key, content_value in content.items():
1459 operation_params[content_key] = content_value
1460 self.format_on_new(
1461 content, project_id=session["project_id"], make_public=session["public"]
1462 )
1463 op_id = self.format_on_operation(
1464 content,
1465 operation_type="create",
1466 operation_params=operation_params,
1467 )
1468 content["git_name"] = self.create_gitname(content, session)
1469
1470 oka_id = content.get("oka")
1471 if oka_id:
1472 self.update_oka_usage_state(session, oka_id)
1473
1474 _id = self.db.create(self.topic, content)
1475 rollback.append({"topic": self.topic, "_id": _id})
garciadeblasbe6498e2025-11-05 11:55:37 +01001476 self._send_msg("create", {"appinstance": _id, "operation_id": op_id})
garciadeblasb798f452025-08-05 18:21:26 +02001477 return _id, op_id
1478
1479 def update_oka_usage_state(self, session, oka_id):
1480 filter_db = self._get_project_filter(session)
1481 filter_db[BaseTopic.id_field(self.topic, oka_id)] = oka_id
1482
1483 data = self.db.get_one(self.okapkg_topic, filter_db)
1484 if data["_admin"]["usageState"] == "NOT_IN_USE":
1485 usage_state_update = {
1486 "_admin.usageState": "IN_USE",
1487 }
1488 self.db.set_one(
1489 self.okapkg_topic, {"_id": oka_id}, update_dict=usage_state_update
1490 )
1491
1492 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1493 if final_content["name"] != edit_content["name"]:
1494 self.check_unique_name(session, edit_content["name"])
1495 return final_content
1496
1497 @staticmethod
1498 def format_on_edit(final_content, edit_content):
1499 op_id = ACMTopic.format_on_operation(
1500 final_content,
1501 "update",
1502 edit_content,
1503 )
1504 final_content["operatingState"] = "PROCESSING"
1505 final_content["resourceState"] = "IN_PROGRESS"
1506 if final_content.get("_admin"):
1507 now = time()
1508 final_content["_admin"]["modified"] = now
1509 return op_id
1510
1511 def edit(self, session, _id, indata, kwargs):
1512 content = None
1513 indata = self._remove_envelop(indata)
1514
1515 # Override descriptor with query string kwargs
1516 if kwargs:
1517 self._update_input_with_kwargs(indata, kwargs)
1518 try:
1519 if indata and session.get("set_project"):
1520 raise EngineException(
1521 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1522 HTTPStatus.UNPROCESSABLE_ENTITY,
1523 )
1524 # TODO self._check_edition(session, indata, _id, force)
1525 if not content:
1526 content = self.show(session, _id)
1527
1528 indata = self._validate_input_edit(indata, content, force=session["force"])
1529
1530 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1531 _id = content.get("_id") or _id
1532
1533 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1534 op_id = self.format_on_edit(content, indata)
1535 self.db.replace(self.topic, _id, content)
1536 return op_id
1537 except ValidationError as e:
1538 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1539
1540 def update_appinstance(self, session, _id, item, indata):
1541 if not self.multiproject:
1542 filter_db = {}
1543 else:
1544 filter_db = self._get_project_filter(session)
1545 # To allow project&user addressing by name AS WELL AS _id
1546 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1547 validate_input(indata, app_instance_schema)
1548 data = self.db.get_one(self.topic, filter_db)
1549 operation_params = {}
1550 data["operatingState"] = "PROCESSING"
1551 data["resourceState"] = "IN_PROGRESS"
1552 operation_params = indata
1553 op_id = self.format_on_operation(
1554 data,
1555 item,
1556 operation_params,
1557 )
1558 self.db.set_one(self.topic, {"_id": _id}, data)
garciadeblasbe6498e2025-11-05 11:55:37 +01001559 self._send_msg(item, {"appinstance": _id, "operation_id": op_id})
garciadeblasb798f452025-08-05 18:21:26 +02001560 return op_id
1561
1562 def delete(self, session, _id, not_send_msg=None):
1563 if not self.multiproject:
1564 filter_q = {}
1565 else:
1566 filter_q = self._get_project_filter(session)
1567 filter_q[self.id_field(self.topic, _id)] = _id
1568 item_content = self.db.get_one(self.topic, filter_q)
1569 item_content["state"] = "IN_DELETION"
1570 item_content["operatingState"] = "PROCESSING"
1571 item_content["resourceState"] = "IN_PROGRESS"
1572 op_id = self.format_on_operation(
1573 item_content,
1574 "delete",
1575 None,
1576 )
1577 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1578
1579 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1580 not_send_msg2 = not_send_msg
1581 profile_id = item_content["profile"]["_id"]
1582 profile_type = item_content["profile"]["profile_type"]
1583 profile_collection_map = {
1584 "app_profiles": "k8sapp",
1585 "resource_profiles": "k8sresource",
1586 "infra_controller_profiles": "k8sinfra_controller",
1587 "infra_config_profiles": "k8sinfra_config",
1588 }
1589 profile_collection = profile_collection_map[profile_type]
1590 profile_content = self.db.get_one(
1591 profile_collection, {"_id": profile_id}, fail_on_empty=False
1592 )
1593 if not profile_content:
1594 self.db.del_one(self.topic, filter_q)
1595 not_send_msg2 = True
1596 self._send_msg(
1597 "delete",
1598 {"appinstance": _id, "operation_id": op_id, "force": session["force"]},
1599 not_send_msg=not_send_msg2,
1600 )
1601 return op_id
1602
1603
shrinithi28d887f2025-01-08 05:27:19 +00001604class OkaTopic(DescriptorTopic, ACMOperationTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001605 topic = "okas"
1606 topic_msg = "oka"
1607 schema_new = oka_schema
1608 schema_edit = oka_schema
1609
1610 def __init__(self, db, fs, msg, auth):
1611 super().__init__(db, fs, msg, auth)
1612 self.logger = logging.getLogger("nbi.oka")
1613
1614 @staticmethod
1615 def format_on_new(content, project_id=None, make_public=False):
1616 DescriptorTopic.format_on_new(
1617 content, project_id=project_id, make_public=make_public
1618 )
garciadeblasbecc7052024-11-20 12:04:53 +01001619 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001620 content["state"] = "PENDING_CONTENT"
1621 content["operatingState"] = "PROCESSING"
1622 content["resourceState"] = "IN_PROGRESS"
1623
1624 def check_conflict_on_del(self, session, _id, db_content):
1625 usage_state = db_content["_admin"]["usageState"]
1626 if usage_state == "IN_USE":
1627 raise EngineException(
1628 "There is a KSU using this package",
1629 http_code=HTTPStatus.CONFLICT,
1630 )
1631
1632 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
yshah00cfe8b2025-01-17 04:05:45 +00001633 if "name" in edit_content:
1634 if final_content["name"] == edit_content["name"]:
1635 name = edit_content["name"]
1636 raise EngineException(
1637 f"No update, new name for the OKA is the same: {name}",
1638 http_code=HTTPStatus.CONFLICT,
1639 )
1640 else:
1641 self.check_unique_name(session, edit_content["name"])
1642 elif (
1643 "description" in edit_content
yshah53cc9eb2024-07-05 13:06:31 +00001644 and final_content["description"] == edit_content["description"]
1645 ):
yshah00cfe8b2025-01-17 04:05:45 +00001646 description = edit_content["description"]
yshah53cc9eb2024-07-05 13:06:31 +00001647 raise EngineException(
yshah00cfe8b2025-01-17 04:05:45 +00001648 f"No update, new description for the OKA is the same: {description}",
yshah53cc9eb2024-07-05 13:06:31 +00001649 http_code=HTTPStatus.CONFLICT,
1650 )
yshah53cc9eb2024-07-05 13:06:31 +00001651 return final_content
1652
1653 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1654 indata = self._remove_envelop(indata)
1655
1656 # Override descriptor with query string kwargs
1657 if kwargs:
1658 self._update_input_with_kwargs(indata, kwargs)
1659 try:
1660 if indata and session.get("set_project"):
1661 raise EngineException(
1662 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1663 HTTPStatus.UNPROCESSABLE_ENTITY,
1664 )
1665 # TODO self._check_edition(session, indata, _id, force)
1666 if not content:
1667 content = self.show(session, _id)
1668
1669 indata = self._validate_input_edit(indata, content, force=session["force"])
1670
1671 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1672 _id = content.get("_id") or _id
1673
1674 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1675 op_id = self.format_on_edit(content, indata)
1676 deep_update_rfc7396(content, indata)
1677
1678 self.db.replace(self.topic, _id, content)
1679 return op_id
1680 except ValidationError as e:
1681 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1682
1683 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +00001684 check = {"oka": _id}
1685 self.check_dependency(check, operation_type="delete")
yshah53cc9eb2024-07-05 13:06:31 +00001686 if not self.multiproject:
1687 filter_q = {}
1688 else:
1689 filter_q = self._get_project_filter(session)
1690 filter_q[self.id_field(self.topic, _id)] = _id
1691 item_content = self.db.get_one(self.topic, filter_q)
1692 item_content["state"] = "IN_DELETION"
1693 item_content["operatingState"] = "PROCESSING"
1694 self.check_conflict_on_del(session, _id, item_content)
yshahd0c876f2024-11-11 09:24:48 +00001695 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001696 item_content,
1697 "delete",
1698 None,
1699 )
yshah53cc9eb2024-07-05 13:06:31 +00001700 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1701 self._send_msg(
yshah781ce732025-02-28 08:56:13 +00001702 "delete",
1703 {"oka_id": _id, "operation_id": op_id, "force": session["force"]},
1704 not_send_msg=not_send_msg,
yshah53cc9eb2024-07-05 13:06:31 +00001705 )
yshahffcac5f2024-08-19 12:49:07 +00001706 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001707
1708 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1709 # _remove_envelop
1710 if indata:
1711 if "userDefinedData" in indata:
1712 indata = indata["userDefinedData"]
1713
1714 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1715
1716 self._update_input_with_kwargs(content, kwargs)
1717 content = BaseTopic._validate_input_new(
1718 self, input=kwargs, force=session["force"]
1719 )
1720
1721 self.check_unique_name(session, content["name"])
1722 operation_params = {}
1723 for content_key, content_value in content.items():
1724 operation_params[content_key] = content_value
1725 self.format_on_new(
1726 content, session["project_id"], make_public=session["public"]
1727 )
yshahd0c876f2024-11-11 09:24:48 +00001728 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001729 content,
1730 operation_type="create",
1731 operation_params=operation_params,
1732 )
1733 content["git_name"] = self.create_gitname(content, session)
1734 _id = self.db.create(self.topic, content)
1735 rollback.append({"topic": self.topic, "_id": _id})
yshahd0c876f2024-11-11 09:24:48 +00001736 return _id, op_id
yshah53cc9eb2024-07-05 13:06:31 +00001737
1738 def upload_content(self, session, _id, indata, kwargs, headers):
yshahf8f07632025-01-17 04:46:03 +00001739 if headers["Method"] in ("PUT", "PATCH"):
1740 check = {"oka": _id}
1741 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001742 current_desc = self.show(session, _id)
1743
1744 compressed = None
1745 content_type = headers.get("Content-Type")
1746 if (
1747 content_type
1748 and "application/gzip" in content_type
1749 or "application/x-gzip" in content_type
1750 ):
1751 compressed = "gzip"
1752 if content_type and "application/zip" in content_type:
1753 compressed = "zip"
1754 filename = headers.get("Content-Filename")
1755 if not filename and compressed:
1756 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1757 elif not filename:
1758 filename = "package"
1759
1760 revision = 1
1761 if "revision" in current_desc["_admin"]:
1762 revision = current_desc["_admin"]["revision"] + 1
1763
1764 file_pkg = None
1765 fs_rollback = []
1766
1767 try:
1768 start = 0
1769 # Rather than using a temp folder, we will store the package in a folder based on
1770 # the current revision.
1771 proposed_revision_path = _id + ":" + str(revision)
1772 # all the content is upload here and if ok, it is rename from id_ to is folder
1773
1774 if start:
1775 if not self.fs.file_exists(proposed_revision_path, "dir"):
1776 raise EngineException(
1777 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1778 )
1779 else:
1780 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1781 self.fs.mkdir(proposed_revision_path)
1782 fs_rollback.append(proposed_revision_path)
1783
1784 storage = self.fs.get_params()
1785 storage["folder"] = proposed_revision_path
yshah2c932bd2024-09-24 18:16:07 +00001786 storage["zipfile"] = filename
yshah53cc9eb2024-07-05 13:06:31 +00001787
1788 file_path = (proposed_revision_path, filename)
1789 file_pkg = self.fs.file_open(file_path, "a+b")
1790
yshah53cc9eb2024-07-05 13:06:31 +00001791 if isinstance(indata, dict):
1792 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1793 file_pkg.write(indata_text.encode(encoding="utf-8"))
1794 else:
1795 indata_len = 0
1796 indata = indata.file
1797 while True:
1798 indata_text = indata.read(4096)
1799 indata_len += len(indata_text)
1800 if not indata_text:
1801 break
1802 file_pkg.write(indata_text)
1803
yshah53cc9eb2024-07-05 13:06:31 +00001804 # Need to close the file package here so it can be copied from the
1805 # revision to the current, unrevisioned record
1806 if file_pkg:
1807 file_pkg.close()
1808 file_pkg = None
1809
1810 # Fetch both the incoming, proposed revision and the original revision so we
1811 # can call a validate method to compare them
1812 current_revision_path = _id + "/"
1813 self.fs.sync(from_path=current_revision_path)
1814 self.fs.sync(from_path=proposed_revision_path)
1815
garciadeblas807b8bf2024-09-23 13:03:00 +02001816 # Is this required?
yshah53cc9eb2024-07-05 13:06:31 +00001817 if revision > 1:
1818 try:
1819 self._validate_descriptor_changes(
1820 _id,
1821 filename,
1822 current_revision_path,
1823 proposed_revision_path,
1824 )
1825 except Exception as e:
1826 shutil.rmtree(
1827 self.fs.path + current_revision_path, ignore_errors=True
1828 )
1829 shutil.rmtree(
1830 self.fs.path + proposed_revision_path, ignore_errors=True
1831 )
1832 # Only delete the new revision. We need to keep the original version in place
1833 # as it has not been changed.
1834 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1835 raise e
1836
1837 indata = self._remove_envelop(indata)
1838
1839 # Override descriptor with query string kwargs
1840 if kwargs:
1841 self._update_input_with_kwargs(indata, kwargs)
1842
1843 current_desc["_admin"]["storage"] = storage
1844 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1845 current_desc["_admin"]["operationalState"] = "ENABLED"
1846 current_desc["_admin"]["modified"] = time()
1847 current_desc["_admin"]["revision"] = revision
1848
1849 deep_update_rfc7396(current_desc, indata)
1850
1851 # Copy the revision to the active package name by its original id
1852 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1853 os.rename(
1854 self.fs.path + proposed_revision_path,
1855 self.fs.path + current_revision_path,
1856 )
1857 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1858 self.fs.mkdir(current_revision_path)
1859 self.fs.reverse_sync(from_path=current_revision_path)
1860
1861 shutil.rmtree(self.fs.path + _id)
1862 kwargs = {}
1863 kwargs["package"] = filename
1864 if headers["Method"] == "POST":
1865 current_desc["state"] = "IN_CREATION"
garciadeblasbecc7052024-11-20 12:04:53 +01001866 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get(
1867 "op_id"
1868 )
yshah53cc9eb2024-07-05 13:06:31 +00001869 elif headers["Method"] in ("PUT", "PATCH"):
yshahd0c876f2024-11-11 09:24:48 +00001870 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001871 current_desc,
1872 "update",
1873 kwargs,
1874 )
1875 current_desc["operatingState"] = "PROCESSING"
1876 current_desc["resourceState"] = "IN_PROGRESS"
1877
1878 self.db.replace(self.topic, _id, current_desc)
1879
1880 # Store a copy of the package as a point in time revision
1881 revision_desc = dict(current_desc)
1882 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1883 self.db.create(self.topic + "_revisions", revision_desc)
1884 fs_rollback = []
1885
yshah53cc9eb2024-07-05 13:06:31 +00001886 if headers["Method"] == "POST":
1887 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1888 elif headers["Method"] == "PUT" or "PATCH":
1889 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1890
1891 return True
1892
1893 except EngineException:
1894 raise
1895 finally:
1896 if file_pkg:
1897 file_pkg.close()
1898 for file in fs_rollback:
1899 self.fs.file_delete(file, ignore_non_exist=True)