blob: dc6016cc09fc64271e5a78b802de2dcd3b826699 [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
yshah53cc9eb2024-07-05 13:06:31 +000018import shutil
19import os
rshri2d386cb2024-07-05 14:35:51 +000020from http import HTTPStatus
21
yshah53cc9eb2024-07-05 13:06:31 +000022from time import time
rshri2d386cb2024-07-05 14:35:51 +000023from osm_nbi.base_topic import BaseTopic, EngineException
shrinithi28d887f2025-01-08 05:27:19 +000024from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic
rshri2d386cb2024-07-05 14:35:51 +000025
yshah53cc9eb2024-07-05 13:06:31 +000026from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000027from osm_nbi.validation import (
28 ValidationError,
yshah99122b82024-11-18 07:05:29 +000029 validate_input,
rshri2d386cb2024-07-05 14:35:51 +000030 clustercreation_new_schema,
yshah99122b82024-11-18 07:05:29 +000031 cluster_edit_schema,
32 cluster_update_schema,
rshri2d386cb2024-07-05 14:35:51 +000033 infra_controller_profile_create_new_schema,
34 infra_config_profile_create_new_schema,
35 app_profile_create_new_schema,
36 resource_profile_create_new_schema,
37 infra_controller_profile_create_edit_schema,
38 infra_config_profile_create_edit_schema,
39 app_profile_create_edit_schema,
40 resource_profile_create_edit_schema,
rshri17b09ec2024-11-07 05:48:12 +000041 clusterregistration_new_schema,
rshri2d386cb2024-07-05 14:35:51 +000042 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000043 ksu_schema,
44 oka_schema,
yshahd23c6a52025-06-13 05:49:31 +000045 node_create_new_schema,
46 node_edit_schema,
rshri2d386cb2024-07-05 14:35:51 +000047)
yshah53cc9eb2024-07-05 13:06:31 +000048from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000049from osm_common.msgbase import MsgException
50from osm_common.fsbase import FsException
51
yshah53cc9eb2024-07-05 13:06:31 +000052__author__ = (
53 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
54 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
55)
rshri2d386cb2024-07-05 14:35:51 +000056
57
shrinithi28d887f2025-01-08 05:27:19 +000058class InfraContTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000059 topic = "k8sinfra_controller"
60 topic_msg = "k8s_infra_controller"
61 schema_new = infra_controller_profile_create_new_schema
62 schema_edit = infra_controller_profile_create_edit_schema
63
64 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000065 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000066
67 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
68 # To create the new infra controller profile
69 return self.new_profile(rollback, session, indata, kwargs, headers)
70
71 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
72 # To create the default infra controller profile while creating the cluster
73 return self.default_profile(rollback, session, indata, kwargs, headers)
74
75 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +000076 check = {"infra_controller_profiles": _id}
77 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +020078 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +000079 return _id
rshri2d386cb2024-07-05 14:35:51 +000080
81
shrinithi28d887f2025-01-08 05:27:19 +000082class InfraConfTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000083 topic = "k8sinfra_config"
84 topic_msg = "k8s_infra_config"
85 schema_new = infra_config_profile_create_new_schema
86 schema_edit = infra_config_profile_create_edit_schema
87
88 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000089 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000090
91 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
92 # To create the new infra config profile
93 return self.new_profile(rollback, session, indata, kwargs, headers)
94
95 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
96 # To create the default infra config profile while creating the cluster
97 return self.default_profile(rollback, session, indata, kwargs, headers)
98
99 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000100 check = {"infra_config_profiles": _id}
101 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200102 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000103 return _id
rshri2d386cb2024-07-05 14:35:51 +0000104
105
shrinithi28d887f2025-01-08 05:27:19 +0000106class AppTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000107 topic = "k8sapp"
108 topic_msg = "k8s_app"
109 schema_new = app_profile_create_new_schema
110 schema_edit = app_profile_create_edit_schema
111
112 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000113 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000114
115 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
116 # To create the new app profile
117 return self.new_profile(rollback, session, indata, kwargs, headers)
118
119 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
120 # To create the default app profile while creating the cluster
121 return self.default_profile(rollback, session, indata, kwargs, headers)
122
123 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000124 check = {"app_profiles": _id}
125 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200126 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000127 return _id
rshri2d386cb2024-07-05 14:35:51 +0000128
129
shrinithi28d887f2025-01-08 05:27:19 +0000130class ResourceTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000131 topic = "k8sresource"
132 topic_msg = "k8s_resource"
133 schema_new = resource_profile_create_new_schema
134 schema_edit = resource_profile_create_edit_schema
135
136 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000137 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000138
139 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
140 # To create the new resource profile
141 return self.new_profile(rollback, session, indata, kwargs, headers)
142
143 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
144 # To create the default resource profile while creating the cluster
145 return self.default_profile(rollback, session, indata, kwargs, headers)
146
147 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000148 check = {"resource_profiles": _id}
149 self.check_dependency(check, operation_type="delete")
garciadeblas14fed6f2025-04-02 12:53:22 +0200150 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000151 return _id
rshri2d386cb2024-07-05 14:35:51 +0000152
153
shrinithi28d887f2025-01-08 05:27:19 +0000154class ClusterTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000155 topic = "clusters"
156 topic_msg = "cluster"
157 schema_new = clustercreation_new_schema
158 schema_edit = attach_dettach_profile_schema
159
160 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000161 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000162 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
163 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
164 self.resource_topic = ResourceTopic(db, fs, msg, auth)
165 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000166
garciadeblasbecc7052024-11-20 12:04:53 +0100167 @staticmethod
168 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000169 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100170 content["current_operation"] = None
171
rshri2d386cb2024-07-05 14:35:51 +0000172 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
173 """
174 Creates a new k8scluster into database.
175 :param rollback: list to append the created items at database in case a rollback must be done
176 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
177 :param indata: params to be used for the k8cluster
178 :param kwargs: used to override the indata
179 :param headers: http request headers
180 :return: the _id of k8scluster created at database. Or an exception of type
181 EngineException, ValidationError, DbException, FsException, MsgException.
182 Note: Exceptions are not captured on purpose. They should be captured at called
183 """
rshri41b2db92025-06-11 11:17:42 +0000184
rshri2d386cb2024-07-05 14:35:51 +0000185 step = "checking quotas" # first step must be defined outside try
186 try:
rshri41b2db92025-06-11 11:17:42 +0000187 if self.multiproject:
188 self.check_quota(session)
189
190 content = self._remove_envelop(indata)
191
rshri2d386cb2024-07-05 14:35:51 +0000192 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000193 self.cluster_unique_name_check(session, indata["name"])
rshri41b2db92025-06-11 11:17:42 +0000194
rshri2d386cb2024-07-05 14:35:51 +0000195 step = "validating input parameters"
rshri41b2db92025-06-11 11:17:42 +0000196 self._update_input_with_kwargs(content, kwargs)
197
198 content = self._validate_input_new(content, session, force=session["force"])
199
200 operation_params = indata.copy()
201
202 self.check_conflict_on_new(session, content)
203 self.format_on_new(
204 content, project_id=session["project_id"], make_public=session["public"]
205 )
rshri2d386cb2024-07-05 14:35:51 +0000206
207 step = "filling cluster details from input data"
rshri41b2db92025-06-11 11:17:42 +0000208 content = self._create_cluster(
209 content, rollback, session, indata, kwargs, headers
rshri2d386cb2024-07-05 14:35:51 +0000210 )
211
212 step = "creating cluster at database"
rshri41b2db92025-06-11 11:17:42 +0000213 _id = self.db.create(self.topic, content)
214
rshri2d386cb2024-07-05 14:35:51 +0000215 op_id = self.format_on_operation(
rshri41b2db92025-06-11 11:17:42 +0000216 content,
rshri2d386cb2024-07-05 14:35:51 +0000217 "create",
218 operation_params,
219 )
rshri41b2db92025-06-11 11:17:42 +0000220
garciadeblas6e88d9c2024-08-15 10:55:04 +0200221 pubkey, privkey = self._generate_age_key()
rshri41b2db92025-06-11 11:17:42 +0000222 content["age_pubkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200223 pubkey, schema_version="1.11", salt=_id
224 )
rshri41b2db92025-06-11 11:17:42 +0000225 content["age_privkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200226 privkey, schema_version="1.11", salt=_id
227 )
rshri41b2db92025-06-11 11:17:42 +0000228
garciadeblas6e88d9c2024-08-15 10:55:04 +0200229 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000230 rollback.append({"topic": self.topic, "_id": _id})
rshri41b2db92025-06-11 11:17:42 +0000231 self.db.set_one("clusters", {"_id": _id}, content)
rshri2d386cb2024-07-05 14:35:51 +0000232 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
233
rshri50e34dc2024-12-02 03:10:39 +0000234 # To add the content in old collection "k8sclusters"
rshri41b2db92025-06-11 11:17:42 +0000235 self.add_to_old_collection(content, session)
rshri50e34dc2024-12-02 03:10:39 +0000236
rshri2d386cb2024-07-05 14:35:51 +0000237 return _id, None
238 except (
239 ValidationError,
240 EngineException,
241 DbException,
242 MsgException,
243 FsException,
244 ) as e:
245 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
246
rshri41b2db92025-06-11 11:17:42 +0000247 def _validate_input_new(self, content, session, force=False):
248 # validating vim and checking the mandatory parameters
249 vim_type = self.check_vim(session, content["vim_account"])
250
251 # for aws
252 if vim_type == "aws":
253 self._aws_check(content)
254
255 # for azure and gcp
256 elif vim_type in ["azure", "gcp"]:
257 self._params_check(content)
258
259 return super()._validate_input_new(content, force=session["force"])
260
261 def _aws_check(self, indata):
262 if "node_count" in indata or "node_size" in indata:
263 raise ValueError("node_count and node_size are not allowed for AWS")
264 return
265
266 def _params_check(self, indata):
267 if "node_count" not in indata and "node_size" not in indata:
268 raise ValueError("node_count and node_size are mandatory parameter")
269 return
270
271 def _create_cluster(self, content, rollback, session, indata, kwargs, headers):
272 private_subnet = indata.get("private_subnet")
273 public_subnet = indata.get("public_subnet")
274
275 # Enforce: if private_subnet is provided, public_subnet must also be provided
276 if (private_subnet and not public_subnet) or (
277 public_subnet and not private_subnet
278 ):
279 raise ValueError(
280 "'public_subnet' must be provided if 'private_subnet' is given and viceversa."
281 )
282
283 # private Subnet validation
284 if private_subnet:
285 count = len(private_subnet)
286 if count != 2:
287 raise ValueError(
288 f"private_subnet must contain exactly 2 items, got {count}"
289 )
290
291 # public Subnet validation
292 public_subnet = indata.get("public_subnet")
293 if public_subnet:
294 count = len(public_subnet)
295 if count != 1:
296 raise ValueError(
297 f"public_subnet must contain exactly 1 items, got {count}"
298 )
299
300 content["infra_controller_profiles"] = [
301 self._create_default_profiles(
302 rollback, session, indata, kwargs, headers, self.infra_contr_topic
303 )
304 ]
305 content["infra_config_profiles"] = [
306 self._create_default_profiles(
307 rollback, session, indata, kwargs, headers, self.infra_conf_topic
308 )
309 ]
310 content["resource_profiles"] = [
311 self._create_default_profiles(
312 rollback, session, indata, kwargs, headers, self.resource_topic
313 )
314 ]
315 content["app_profiles"] = [
316 self._create_default_profiles(
317 rollback, session, indata, kwargs, headers, self.app_topic
318 )
319 ]
320 content["created"] = "true"
321 content["state"] = "IN_CREATION"
322 content["operatingState"] = "PROCESSING"
323 content["git_name"] = self.create_gitname(content, session)
324 content["resourceState"] = "IN_PROGRESS.REQUEST_RECEIVED"
325
rshri2d386cb2024-07-05 14:35:51 +0000326 # Get the vim_account details
327 vim_account_details = self.db.get_one(
rshri41b2db92025-06-11 11:17:42 +0000328 "vim_accounts", {"name": content["vim_account"]}
rshri2d386cb2024-07-05 14:35:51 +0000329 )
330
rshri17b09ec2024-11-07 05:48:12 +0000331 # Add optional fields if they exist in the request
rshri41b2db92025-06-11 11:17:42 +0000332
333 if "region_name" not in indata:
334 region_name = vim_account_details.get("config", {}).get("region_name")
335 if region_name:
336 content["region_name"] = region_name
337
338 if "resource_group" not in indata:
339 resource_group = vim_account_details.get("config", {}).get("resource_group")
340 if resource_group:
341 content["resource_group"] = resource_group
342
343 version = "k8s_version" in content
344 if not version:
345 content["k8s_version"] = "1.28"
346 content["node_count"] = indata.get("node_count", 0)
yshahd23c6a52025-06-13 05:49:31 +0000347 content["ksu_count"] = 0
rshri41b2db92025-06-11 11:17:42 +0000348 self.logger.info(f"cotent is : {content}")
349 return content
rshri2d386cb2024-07-05 14:35:51 +0000350
351 def check_vim(self, session, name):
352 try:
353 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
354 if vim_account_details is not None:
rshri41b2db92025-06-11 11:17:42 +0000355 return vim_account_details["vim_type"]
rshri2d386cb2024-07-05 14:35:51 +0000356 except ValidationError as e:
357 raise EngineException(
358 e,
359 HTTPStatus.UNPROCESSABLE_ENTITY,
360 )
361
362 def _create_default_profiles(
363 self, rollback, session, indata, kwargs, headers, topic
364 ):
365 topic = self.to_select_topic(topic)
366 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
367 return default_profiles
368
369 def to_select_topic(self, topic):
370 if topic == "infra_controller_profiles":
371 topic = self.infra_contr_topic
372 elif topic == "infra_config_profiles":
373 topic = self.infra_conf_topic
374 elif topic == "resource_profiles":
375 topic = self.resource_topic
376 elif topic == "app_profiles":
377 topic = self.app_topic
378 return topic
379
380 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
381 try:
382 filter_q = self._get_project_filter(session)
383 filter_q[self.id_field(self.topic, _id)] = _id
384 content = self.db.get_one(self.topic, filter_q)
385 existing_profiles = []
386 topic = None
387 topic = self.to_select_topic(profile)
388 for profile_id in content[profile]:
389 data = topic.show(session, profile_id, filter_q, api_req)
390 existing_profiles.append(data)
391 return existing_profiles
392 except ValidationError as e:
393 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
394
395 def state_check(self, profile_id, session, topic):
396 topic = self.to_select_topic(topic)
397 content = topic.show(session, profile_id, filter_q=None, api_req=False)
398 state = content["state"]
399 if state == "CREATED":
400 return
401 else:
402 raise EngineException(
403 f" {profile_id} is not in created state",
404 HTTPStatus.UNPROCESSABLE_ENTITY,
405 )
406
407 def edit(self, session, _id, item, indata=None, kwargs=None):
rshri50e34dc2024-12-02 03:10:39 +0000408 if item not in (
yshah99122b82024-11-18 07:05:29 +0000409 "infra_controller_profiles",
410 "infra_config_profiles",
411 "app_profiles",
412 "resource_profiles",
413 ):
414 self.schema_edit = cluster_edit_schema
415 super().edit(session, _id, indata=item, kwargs=kwargs, content=None)
rshri2d386cb2024-07-05 14:35:51 +0000416 else:
yshah99122b82024-11-18 07:05:29 +0000417 indata = self._remove_envelop(indata)
418 indata = self._validate_input_edit(
419 indata, content=None, force=session["force"]
420 )
421 if indata.get("add_profile"):
422 self.add_profile(session, _id, item, indata)
423 elif indata.get("remove_profile"):
424 self.remove_profile(session, _id, item, indata)
425 else:
426 error_msg = "Add / remove operation is only applicable"
427 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
rshri2d386cb2024-07-05 14:35:51 +0000428
yshah00a620a2025-01-16 12:06:40 +0000429 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
430 check = self.db.get_one(self.topic, {"_id": _id})
431 if "name" in indata and check["name"] != indata["name"]:
432 self.check_unique_name(session, indata["name"])
433 _filter = {"name": indata["name"]}
434 topic_list = [
435 "k8sclusters",
436 "k8sinfra_controller",
437 "k8sinfra_config",
438 "k8sapp",
439 "k8sresource",
440 ]
441 # Check unique name for k8scluster and profiles
442 for topic in topic_list:
443 if self.db.get_one(
444 topic, _filter, fail_on_empty=False, fail_on_more=False
445 ):
446 raise EngineException(
447 "name '{}' already exists for {}".format(indata["name"], topic),
448 HTTPStatus.CONFLICT,
449 )
450 # Replace name in k8scluster and profiles
451 for topic in topic_list:
452 data = self.db.get_one(topic, {"name": check["name"]})
453 data["name"] = indata["name"]
454 self.db.replace(topic, data["_id"], data)
455 return True
456
rshri2d386cb2024-07-05 14:35:51 +0000457 def add_profile(self, session, _id, item, indata=None):
yshahf8f07632025-01-17 04:46:03 +0000458 check = {"cluster": _id, item: indata["add_profile"][0]["id"]}
459 self.check_dependency(check)
rshri2d386cb2024-07-05 14:35:51 +0000460 indata = self._remove_envelop(indata)
461 operation_params = indata
462 profile_id = indata["add_profile"][0]["id"]
463 # check state
464 self.state_check(profile_id, session, item)
465 filter_q = self._get_project_filter(session)
466 filter_q[self.id_field(self.topic, _id)] = _id
467 content = self.db.get_one(self.topic, filter_q)
468 profile_list = content[item]
469
470 if profile_id not in profile_list:
471 content["operatingState"] = "PROCESSING"
rshri2d386cb2024-07-05 14:35:51 +0000472 op_id = self.format_on_operation(
473 content,
474 "add",
475 operation_params,
476 )
477 self.db.set_one("clusters", {"_id": content["_id"]}, content)
478 self._send_msg(
479 "add",
480 {
481 "cluster_id": _id,
482 "profile_id": profile_id,
483 "profile_type": item,
484 "operation_id": op_id,
485 },
486 )
487 else:
488 raise EngineException(
489 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
490 )
491
492 def _get_default_profiles(self, session, topic):
493 topic = self.to_select_topic(topic)
494 existing_profiles = topic.list(session, filter_q=None, api_req=False)
495 default_profiles = [
496 profile["_id"]
497 for profile in existing_profiles
498 if profile.get("default", False)
499 ]
500 return default_profiles
501
502 def remove_profile(self, session, _id, item, indata):
yshahf8f07632025-01-17 04:46:03 +0000503 check = {"cluster": _id, item: indata["remove_profile"][0]["id"]}
504 self.check_dependency(check)
rshri2d386cb2024-07-05 14:35:51 +0000505 indata = self._remove_envelop(indata)
506 operation_params = indata
507 profile_id = indata["remove_profile"][0]["id"]
508 filter_q = self._get_project_filter(session)
509 filter_q[self.id_field(self.topic, _id)] = _id
510 content = self.db.get_one(self.topic, filter_q)
511 profile_list = content[item]
512
513 default_profiles = self._get_default_profiles(session, item)
514
515 if profile_id in default_profiles:
516 raise EngineException(
517 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
518 )
519 if profile_id in profile_list:
rshri2d386cb2024-07-05 14:35:51 +0000520 op_id = self.format_on_operation(
521 content,
522 "remove",
523 operation_params,
524 )
525 self.db.set_one("clusters", {"_id": content["_id"]}, content)
526 self._send_msg(
527 "remove",
528 {
529 "cluster_id": _id,
530 "profile_id": profile_id,
531 "profile_type": item,
532 "operation_id": op_id,
533 },
534 )
535 else:
536 raise EngineException(
537 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
538 )
539
shahithyab9eb4142024-10-17 05:51:39 +0000540 def get_cluster_creds(self, session, _id, item):
yshah53cc9eb2024-07-05 13:06:31 +0000541 if not self.multiproject:
542 filter_db = {}
543 else:
544 filter_db = self._get_project_filter(session)
yshah53cc9eb2024-07-05 13:06:31 +0000545 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
garciadeblasbecc7052024-11-20 12:04:53 +0100546 operation_params = None
shahithyab9eb4142024-10-17 05:51:39 +0000547 data = self.db.get_one(self.topic, filter_db)
garciadeblasbecc7052024-11-20 12:04:53 +0100548 op_id = self.format_on_operation(data, item, operation_params)
shahithyab9eb4142024-10-17 05:51:39 +0000549 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
550 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id})
551 return op_id
552
553 def get_cluster_creds_file(self, session, _id, item, op_id):
554 if not self.multiproject:
555 filter_db = {}
556 else:
557 filter_db = self._get_project_filter(session)
558 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
shahithya8bded112024-10-15 08:01:44 +0000559
560 data = self.db.get_one(self.topic, filter_db)
shahithyab9eb4142024-10-17 05:51:39 +0000561 creds_flag = None
562 for operations in data["operationHistory"]:
563 if operations["op_id"] == op_id:
564 creds_flag = operations["result"]
565 self.logger.info("Creds Flag: {}".format(creds_flag))
shahithya8bded112024-10-15 08:01:44 +0000566
shahithyab9eb4142024-10-17 05:51:39 +0000567 if creds_flag is True:
568 credentials = data["credentials"]
shahithya8bded112024-10-15 08:01:44 +0000569
shahithyab9eb4142024-10-17 05:51:39 +0000570 file_pkg = None
571 current_path = _id
shahithya8bded112024-10-15 08:01:44 +0000572
shahithyab9eb4142024-10-17 05:51:39 +0000573 self.fs.file_delete(current_path, ignore_non_exist=True)
574 self.fs.mkdir(current_path)
575 filename = "credentials.yaml"
576 file_path = (current_path, filename)
577 self.logger.info("File path: {}".format(file_path))
578 file_pkg = self.fs.file_open(file_path, "a+b")
shahithya8bded112024-10-15 08:01:44 +0000579
shahithyab9eb4142024-10-17 05:51:39 +0000580 credentials_yaml = yaml.safe_dump(
581 credentials, indent=4, default_flow_style=False
582 )
583 file_pkg.write(credentials_yaml.encode(encoding="utf-8"))
shahithya8bded112024-10-15 08:01:44 +0000584
shahithyab9eb4142024-10-17 05:51:39 +0000585 if file_pkg:
586 file_pkg.close()
587 file_pkg = None
588 self.fs.sync(from_path=current_path)
shahithya8bded112024-10-15 08:01:44 +0000589
shahithyab9eb4142024-10-17 05:51:39 +0000590 return (
591 self.fs.file_open((current_path, filename), "rb"),
592 "text/plain",
593 )
594 else:
595 raise EngineException(
596 "Not possible to get the credentials of the cluster",
597 HTTPStatus.UNPROCESSABLE_ENTITY,
598 )
yshah53cc9eb2024-07-05 13:06:31 +0000599
yshahd23c6a52025-06-13 05:49:31 +0000600 def update_item(self, session, _id, item, indata):
yshah53cc9eb2024-07-05 13:06:31 +0000601 if not self.multiproject:
602 filter_db = {}
603 else:
604 filter_db = self._get_project_filter(session)
605 # To allow project&user addressing by name AS WELL AS _id
606 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
yshah99122b82024-11-18 07:05:29 +0000607 validate_input(indata, cluster_update_schema)
yshah53cc9eb2024-07-05 13:06:31 +0000608 data = self.db.get_one(self.topic, filter_db)
yshah99122b82024-11-18 07:05:29 +0000609 operation_params = {}
yshah53cc9eb2024-07-05 13:06:31 +0000610 data["operatingState"] = "PROCESSING"
611 data["resourceState"] = "IN_PROGRESS"
612 operation_params = indata
yshahd0c876f2024-11-11 09:24:48 +0000613 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000614 data,
615 item,
616 operation_params,
617 )
618 self.db.set_one(self.topic, {"_id": _id}, data)
yshah53cc9eb2024-07-05 13:06:31 +0000619 data = {"cluster_id": _id, "operation_id": op_id}
620 self._send_msg(item, data)
621 return op_id
622
shrinithi28d887f2025-01-08 05:27:19 +0000623 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
624 op_id = self.common_delete(_id, db_content)
yshah781ce732025-02-28 08:56:13 +0000625 return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]}
rshri2d386cb2024-07-05 14:35:51 +0000626
shrinithi28d887f2025-01-08 05:27:19 +0000627 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +0000628 check = {"cluster": _id}
629 self.check_dependency(check, operation_type="delete")
shrinithi28d887f2025-01-08 05:27:19 +0000630 filter_q = self._get_project_filter(session)
631 filter_q[self.id_field(self.topic, _id)] = _id
632 check = self.db.get_one(self.topic, filter_q)
shrinithi75492bd2025-03-21 18:37:44 +0000633 op_id = check["current_operation"]
shrinithi28d887f2025-01-08 05:27:19 +0000634 if check["created"] == "false":
635 raise EngineException(
garciadeblas3d1d6272025-02-04 11:55:36 +0100636 "Cannot delete registered cluster. Please deregister.",
shrinithi28d887f2025-01-08 05:27:19 +0000637 HTTPStatus.UNPROCESSABLE_ENTITY,
638 )
garciadeblas14fed6f2025-04-02 12:53:22 +0200639 super().delete(session, _id, dry_run, not_send_msg)
shrinithi75492bd2025-03-21 18:37:44 +0000640 return op_id
shrinithi28d887f2025-01-08 05:27:19 +0000641
642
yshahd23c6a52025-06-13 05:49:31 +0000643class NodeGroupTopic(ACMTopic):
644 topic = "nodegroups"
645 topic_msg = "nodegroup"
646 schema_new = node_create_new_schema
647 schema_edit = node_edit_schema
648
649 def __init__(self, db, fs, msg, auth):
650 BaseTopic.__init__(self, db, fs, msg, auth)
651
652 @staticmethod
653 def format_on_new(content, project_id=None, make_public=False):
654 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
655 content["current_operation"] = None
656 content["state"] = "IN_CREATION"
657 content["operatingState"] = "PROCESSING"
658 content["resourceState"] = "IN_PROGRESS"
659
660 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
661 self.logger.info(f"Indata: {indata}")
662 self.check_unique_name(session, indata["name"])
663
664 indata = self._remove_envelop(indata)
665 self._update_input_with_kwargs(indata, kwargs)
666 if not indata.get("private_subnet") and not indata.get("public_subnet"):
667 raise EngineException(
668 "Please provide atleast one subnet",
669 HTTPStatus.UNPROCESSABLE_ENTITY,
670 )
671 content = self._validate_input_new(indata, session["force"])
672
673 self.logger.info(f"Indata: {indata}")
674 self.logger.info(f"Content: {content}")
675 cluster_id = content["cluster_id"]
676 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
677 private_subnet = db_cluster.get("private_subnet")
678 public_subnet = db_cluster.get("public_subnet")
679 if content.get("private_subnet"):
680 for subnet in content["private_subnet"]:
681 if subnet not in private_subnet:
682 raise EngineException(
683 "No External subnet is used to add nodegroup",
684 HTTPStatus.UNPROCESSABLE_ENTITY,
685 )
686 if content.get("public_subnet"):
687 for subnet in content["public_subnet"]:
688 if subnet not in public_subnet:
689 raise EngineException(
690 "No External subnet is used to add nodegroup",
691 HTTPStatus.UNPROCESSABLE_ENTITY,
692 )
693
694 operation_params = {}
695 for content_key, content_value in content.items():
696 operation_params[content_key] = content_value
697 self.format_on_new(
698 content, session["project_id"], make_public=session["public"]
699 )
700 content["git_name"] = self.create_gitname(content, session)
701 self.logger.info(f"Operation Params: {operation_params}")
702 op_id = self.format_on_operation(
703 content,
704 "create",
705 operation_params,
706 )
707 node_count = db_cluster.get("node_count")
708 new_node_count = node_count + 1
709 self.logger.info(f"New Node count: {new_node_count}")
710 db_cluster["node_count"] = new_node_count
711 self.db.set_one("clusters", {"_id": cluster_id}, db_cluster)
712 _id = self.db.create(self.topic, content)
713 self._send_msg("add_nodegroup", {"nodegroup_id": _id, "operation_id": op_id})
714 return _id, op_id
715
716 def list(self, session, filter_q=None, api_req=False):
717 db_filter = {}
718 if filter_q.get("cluster_id"):
719 db_filter["cluster_id"] = filter_q.get("cluster_id")
720 data_list = self.db.get_list(self.topic, db_filter)
721 cluster_data = self.db.get_one("clusters", {"_id": db_filter["cluster_id"]})
722 self.logger.info(f"Cluster Data: {cluster_data}")
723 self.logger.info(f"Data: {data_list}")
724 if filter_q.get("cluster_id"):
725 outdata = {}
726 outdata["count"] = cluster_data["node_count"]
727 outdata["data"] = data_list
728 self.logger.info(f"Outdata: {outdata}")
729 return outdata
730 if api_req:
731 data_list = [self.sol005_projection(inst) for inst in data_list]
732 return data_list
733
734 def delete(self, session, _id, dry_run=False, not_send_msg=None):
735 if not self.multiproject:
736 filter_q = {}
737 else:
738 filter_q = self._get_project_filter(session)
739 filter_q[self.id_field(self.topic, _id)] = _id
740 item_content = self.db.get_one(self.topic, filter_q)
741 item_content["state"] = "IN_DELETION"
742 item_content["operatingState"] = "PROCESSING"
743 item_content["resourceState"] = "IN_PROGRESS"
744 self.check_conflict_on_del(session, _id, item_content)
745 op_id = self.format_on_operation(
746 item_content,
747 "delete",
748 None,
749 )
750 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
751 self._send_msg(
752 "delete_nodegroup",
753 {"nodegroup_id": _id, "operation_id": op_id},
754 not_send_msg=not_send_msg,
755 )
756 return op_id
757
758 def update_item(self, session, _id, item, indata):
759 content = None
760 try:
761 if not content:
762 content = self.db.get_one(self.topic, {"_id": _id})
763 indata = self._validate_input_edit(indata, content, force=session["force"])
764 _id = content.get("_id") or _id
765
766 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
767 op_id = self.format_on_edit(content, indata)
768 op_id = ACMTopic.format_on_operation(
769 content,
770 "scale",
771 indata,
772 )
773 self.logger.info(f"op_id: {op_id}")
774 content["operatingState"] = "PROCESSING"
775 content["resourceState"] = "IN_PROGRESS"
776 self.db.replace(self.topic, _id, content)
777 self._send_msg(
778 "scale_nodegroup", {"nodegroup_id": _id, "operation_id": op_id}
779 )
780 return op_id
781 except ValidationError as e:
782 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
783
784 def edit(self, session, _id, indata, kwargs):
785 content = None
786
787 # Override descriptor with query string kwargs
788 if kwargs:
789 self._update_input_with_kwargs(indata, kwargs)
790 try:
791 if indata and session.get("set_project"):
792 raise EngineException(
793 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
794 HTTPStatus.UNPROCESSABLE_ENTITY,
795 )
796 # TODO self._check_edition(session, indata, _id, force)
797 if not content:
798 content = self.db.get_one(self.topic, {"_id": _id})
799
800 indata = self._validate_input_edit(indata, content, force=session["force"])
801 self.logger.info(f"Indata: {indata}")
802
803 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
804 _id = content.get("_id") or _id
805
806 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
807 if "name" in indata and "description" in indata:
808 content["name"] = indata["name"]
809 content["description"] = indata["description"]
810 elif "name" in indata:
811 content["name"] = indata["name"]
812 elif "description" in indata:
813 content["description"] = indata["description"]
814 op_id = self.format_on_edit(content, indata)
815 self.db.set_one(self.topic, {"_id": _id}, content)
816 return op_id
817 except ValidationError as e:
818 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
819
820
shrinithi28d887f2025-01-08 05:27:19 +0000821class ClusterOpsTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000822 topic = "clusters"
823 topic_msg = "cluster"
rshri17b09ec2024-11-07 05:48:12 +0000824 schema_new = clusterregistration_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000825
826 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000827 super().__init__(db, fs, msg, auth)
garciadeblas3d5dc322025-04-03 23:57:04 +0200828 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
829 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
830 self.resource_topic = ResourceTopic(db, fs, msg, auth)
831 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000832
garciadeblasbecc7052024-11-20 12:04:53 +0100833 @staticmethod
834 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000835 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100836 content["current_operation"] = None
837
rshri2d386cb2024-07-05 14:35:51 +0000838 def add(self, rollback, session, indata, kwargs=None, headers=None):
839 step = "checking quotas"
840 try:
841 self.check_quota(session)
842 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000843 self.cluster_unique_name_check(session, indata["name"])
844 # self.check_unique_name(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000845 step = "validating input parameters"
846 cls_add_request = self._remove_envelop(indata)
847 self._update_input_with_kwargs(cls_add_request, kwargs)
848 cls_add_request = self._validate_input_new(
849 cls_add_request, session["force"]
850 )
851 operation_params = cls_add_request
852
853 step = "filling cluster details from input data"
garciadeblas3d5dc322025-04-03 23:57:04 +0200854 cls_add_request = self._add_cluster(
855 cls_add_request, rollback, session, indata, kwargs, headers
856 )
rshri2d386cb2024-07-05 14:35:51 +0000857
rshri17b09ec2024-11-07 05:48:12 +0000858 step = "registering the cluster at database"
rshri2d386cb2024-07-05 14:35:51 +0000859 self.format_on_new(
rshri17b09ec2024-11-07 05:48:12 +0000860 cls_add_request, session["project_id"], make_public=session["public"]
rshri2d386cb2024-07-05 14:35:51 +0000861 )
rshri2d386cb2024-07-05 14:35:51 +0000862 op_id = self.format_on_operation(
rshri17b09ec2024-11-07 05:48:12 +0000863 cls_add_request,
rshri2d386cb2024-07-05 14:35:51 +0000864 "register",
865 operation_params,
866 )
rshri17b09ec2024-11-07 05:48:12 +0000867 _id = self.db.create(self.topic, cls_add_request)
garciadeblas9d9d9262024-09-25 11:25:33 +0200868 pubkey, privkey = self._generate_age_key()
rshri17b09ec2024-11-07 05:48:12 +0000869 cls_add_request["age_pubkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200870 pubkey, schema_version="1.11", salt=_id
871 )
rshri17b09ec2024-11-07 05:48:12 +0000872 cls_add_request["age_privkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200873 privkey, schema_version="1.11", salt=_id
874 )
875 # TODO: set age_pubkey and age_privkey in the default profiles
rshri17b09ec2024-11-07 05:48:12 +0000876 self.db.set_one(self.topic, {"_id": _id}, cls_add_request)
rshri2d386cb2024-07-05 14:35:51 +0000877 rollback.append({"topic": self.topic, "_id": _id})
878 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
rshri50e34dc2024-12-02 03:10:39 +0000879
880 # To add the content in old collection "k8sclusters"
881 self.add_to_old_collection(cls_add_request, session)
882
rshri2d386cb2024-07-05 14:35:51 +0000883 return _id, None
884 except (
885 ValidationError,
886 EngineException,
887 DbException,
888 MsgException,
889 FsException,
890 ) as e:
891 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
892
garciadeblas3d5dc322025-04-03 23:57:04 +0200893 def _add_cluster(self, cls_add_request, rollback, session, indata, kwargs, headers):
rshri2d386cb2024-07-05 14:35:51 +0000894 cls_add = {
895 "name": cls_add_request["name"],
rshri2d386cb2024-07-05 14:35:51 +0000896 "credentials": cls_add_request["credentials"],
897 "vim_account": cls_add_request["vim_account"],
rshri17b09ec2024-11-07 05:48:12 +0000898 "bootstrap": cls_add_request["bootstrap"],
garciadeblas3d5dc322025-04-03 23:57:04 +0200899 "infra_controller_profiles": [
900 self._create_default_profiles(
901 rollback, session, indata, kwargs, headers, self.infra_contr_topic
902 )
903 ],
904 "infra_config_profiles": [
905 self._create_default_profiles(
906 rollback, session, indata, kwargs, headers, self.infra_conf_topic
907 )
908 ],
909 "resource_profiles": [
910 self._create_default_profiles(
911 rollback, session, indata, kwargs, headers, self.resource_topic
912 )
913 ],
914 "app_profiles": [
915 self._create_default_profiles(
916 rollback, session, indata, kwargs, headers, self.app_topic
917 )
918 ],
rshri2d386cb2024-07-05 14:35:51 +0000919 "created": "false",
920 "state": "IN_CREATION",
921 "operatingState": "PROCESSING",
922 "git_name": self.create_gitname(cls_add_request, session),
923 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
924 }
rshri17b09ec2024-11-07 05:48:12 +0000925 # Add optional fields if they exist in the request
926 if "description" in cls_add_request:
927 cls_add["description"] = cls_add_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000928 return cls_add
929
garciadeblas3d5dc322025-04-03 23:57:04 +0200930 def check_vim(self, session, name):
931 try:
932 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
933 if vim_account_details is not None:
934 return name
935 except ValidationError as e:
936 raise EngineException(
937 e,
938 HTTPStatus.UNPROCESSABLE_ENTITY,
939 )
940
941 def _create_default_profiles(
942 self, rollback, session, indata, kwargs, headers, topic
943 ):
944 topic = self.to_select_topic(topic)
945 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
946 return default_profiles
947
948 def to_select_topic(self, topic):
949 if topic == "infra_controller_profiles":
950 topic = self.infra_contr_topic
951 elif topic == "infra_config_profiles":
952 topic = self.infra_conf_topic
953 elif topic == "resource_profiles":
954 topic = self.resource_topic
955 elif topic == "app_profiles":
956 topic = self.app_topic
957 return topic
958
rshri2d386cb2024-07-05 14:35:51 +0000959 def remove(self, session, _id, dry_run=False, not_send_msg=None):
960 """
961 Delete item by its internal _id
962 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
963 :param _id: server internal id
964 :param dry_run: make checking but do not delete
965 :param not_send_msg: To not send message (False) or store content (list) instead
966 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
967 """
968
969 # To allow addressing projects and users by name AS WELL AS by _id
970 if not self.multiproject:
971 filter_q = {}
972 else:
973 filter_q = self._get_project_filter(session)
974 filter_q[self.id_field(self.topic, _id)] = _id
975 item_content = self.db.get_one(self.topic, filter_q)
976
rshri2d386cb2024-07-05 14:35:51 +0000977 op_id = self.format_on_operation(
978 item_content,
979 "deregister",
980 None,
981 )
982 self.db.set_one(self.topic, {"_id": _id}, item_content)
983
984 self.check_conflict_on_del(session, _id, item_content)
985 if dry_run:
986 return None
987
988 if self.multiproject and session["project_id"]:
989 # remove reference from project_read if there are more projects referencing it. If it last one,
990 # do not remove reference, but delete
991 other_projects_referencing = next(
992 (
993 p
994 for p in item_content["_admin"]["projects_read"]
995 if p not in session["project_id"] and p != "ANY"
996 ),
997 None,
998 )
999
1000 # check if there are projects referencing it (apart from ANY, that means, public)....
1001 if other_projects_referencing:
1002 # remove references but not delete
1003 update_dict_pull = {
1004 "_admin.projects_read": session["project_id"],
1005 "_admin.projects_write": session["project_id"],
1006 }
1007 self.db.set_one(
1008 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
1009 )
1010 return None
1011 else:
1012 can_write = next(
1013 (
1014 p
1015 for p in item_content["_admin"]["projects_write"]
1016 if p == "ANY" or p in session["project_id"]
1017 ),
1018 None,
1019 )
1020 if not can_write:
1021 raise EngineException(
1022 "You have not write permission to delete it",
1023 http_code=HTTPStatus.UNAUTHORIZED,
1024 )
1025
1026 # delete
1027 self._send_msg(
1028 "deregister",
1029 {"cluster_id": _id, "operation_id": op_id},
1030 not_send_msg=not_send_msg,
1031 )
shrinithi75492bd2025-03-21 18:37:44 +00001032 return _id
yshah53cc9eb2024-07-05 13:06:31 +00001033
1034
shrinithi28d887f2025-01-08 05:27:19 +00001035class KsusTopic(ACMTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001036 topic = "ksus"
1037 okapkg_topic = "okas"
1038 infra_topic = "k8sinfra"
1039 topic_msg = "ksu"
1040 schema_new = ksu_schema
1041 schema_edit = ksu_schema
yshahd23c6a52025-06-13 05:49:31 +00001042 MAP_PROFILE = {
1043 "infra_controller_profiles": "infra-controllers",
1044 "infra_config_profiles": "infra-configs",
1045 "resource_profiles": "managed_resources",
1046 "app_profiles": "apps",
1047 }
yshah53cc9eb2024-07-05 13:06:31 +00001048
1049 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +00001050 super().__init__(db, fs, msg, auth)
yshah53cc9eb2024-07-05 13:06:31 +00001051 self.logger = logging.getLogger("nbi.ksus")
1052
1053 @staticmethod
1054 def format_on_new(content, project_id=None, make_public=False):
1055 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +01001056 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001057 content["state"] = "IN_CREATION"
1058 content["operatingState"] = "PROCESSING"
1059 content["resourceState"] = "IN_PROGRESS"
1060
1061 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1062 _id_list = []
yshahf8f07632025-01-17 04:46:03 +00001063 for content in indata["ksus"]:
1064 check = {content["profile"]["profile_type"]: content["profile"]["_id"]}
yshah53cc9eb2024-07-05 13:06:31 +00001065 oka = content["oka"][0]
1066 oka_flag = ""
1067 if oka["_id"]:
yshahf8f07632025-01-17 04:46:03 +00001068 check["okas"] = []
yshah53cc9eb2024-07-05 13:06:31 +00001069 oka_flag = "_id"
shahithya8bded112024-10-15 08:01:44 +00001070 oka["sw_catalog_path"] = ""
yshah53cc9eb2024-07-05 13:06:31 +00001071 elif oka["sw_catalog_path"]:
1072 oka_flag = "sw_catalog_path"
1073
1074 for okas in content["oka"]:
yshahf8f07632025-01-17 04:46:03 +00001075 if okas.get("_id") is not None:
1076 check["okas"].append(okas["_id"])
yshah53cc9eb2024-07-05 13:06:31 +00001077 if okas["_id"] and okas["sw_catalog_path"]:
1078 raise EngineException(
1079 "Cannot create ksu with both OKA and SW catalog path",
1080 HTTPStatus.UNPROCESSABLE_ENTITY,
1081 )
1082 if not okas["sw_catalog_path"]:
1083 okas.pop("sw_catalog_path")
1084 elif not okas["_id"]:
1085 okas.pop("_id")
1086 if oka_flag not in okas.keys():
1087 raise EngineException(
1088 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
1089 HTTPStatus.UNPROCESSABLE_ENTITY,
1090 )
yshahf8f07632025-01-17 04:46:03 +00001091 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001092
1093 # Override descriptor with query string kwargs
1094 content = self._remove_envelop(content)
1095 self._update_input_with_kwargs(content, kwargs)
1096 content = self._validate_input_new(input=content, force=session["force"])
1097
1098 # Check for unique name
1099 self.check_unique_name(session, content["name"])
1100
1101 self.check_conflict_on_new(session, content)
1102
1103 operation_params = {}
1104 for content_key, content_value in content.items():
1105 operation_params[content_key] = content_value
1106 self.format_on_new(
1107 content, project_id=session["project_id"], make_public=session["public"]
1108 )
yshah53cc9eb2024-07-05 13:06:31 +00001109 op_id = self.format_on_operation(
1110 content,
1111 operation_type="create",
1112 operation_params=operation_params,
1113 )
1114 content["git_name"] = self.create_gitname(content, session)
1115
1116 # Update Oka_package usage state
1117 for okas in content["oka"]:
1118 if "_id" in okas.keys():
1119 self.update_usage_state(session, okas)
1120
yshahd23c6a52025-06-13 05:49:31 +00001121 profile_id = content["profile"].get("_id")
1122 profile_type = content["profile"].get("profile_type")
1123 db_cluster_list = self.db.get_list("clusters")
1124 for db_cluster in db_cluster_list:
1125 if db_cluster.get("created") == "true":
1126 profile_list = db_cluster[profile_type]
1127 if profile_id in profile_list:
1128 ksu_count = db_cluster.get("ksu_count")
1129 new_ksu_count = ksu_count + 1
1130 self.logger.info(f"New KSU count: {new_ksu_count}")
1131 db_cluster["ksu_count"] = new_ksu_count
1132 self.db.set_one(
1133 "clusters", {"_id": db_cluster["_id"]}, db_cluster
1134 )
1135
yshah53cc9eb2024-07-05 13:06:31 +00001136 _id = self.db.create(self.topic, content)
1137 rollback.append({"topic": self.topic, "_id": _id})
yshah53cc9eb2024-07-05 13:06:31 +00001138 _id_list.append(_id)
1139 data = {"ksus_list": _id_list, "operation_id": op_id}
1140 self._send_msg("create", data)
1141 return _id_list, op_id
1142
1143 def clone(self, rollback, session, _id, indata, kwargs, headers):
yshahf8f07632025-01-17 04:46:03 +00001144 check = {
1145 "ksu": _id,
1146 indata["profile"]["profile_type"]: indata["profile"]["_id"],
1147 }
1148 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001149 filter_db = self._get_project_filter(session)
1150 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1151 data = self.db.get_one(self.topic, filter_db)
1152
yshah53cc9eb2024-07-05 13:06:31 +00001153 op_id = self.format_on_operation(
1154 data,
1155 "clone",
1156 indata,
1157 )
1158 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
1159 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
1160 return op_id
1161
1162 def update_usage_state(self, session, oka_content):
1163 _id = oka_content["_id"]
1164 filter_db = self._get_project_filter(session)
1165 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1166
1167 data = self.db.get_one(self.okapkg_topic, filter_db)
1168 if data["_admin"]["usageState"] == "NOT_IN_USE":
1169 usage_state_update = {
1170 "_admin.usageState": "IN_USE",
1171 }
1172 self.db.set_one(
1173 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
1174 )
1175
1176 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
yshahf8f07632025-01-17 04:46:03 +00001177 check = {
1178 "ksu": _id,
1179 indata["profile"]["profile_type"]: indata["profile"]["_id"],
1180 }
1181 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001182 indata = self._remove_envelop(indata)
1183
1184 # Override descriptor with query string kwargs
1185 if kwargs:
1186 self._update_input_with_kwargs(indata, kwargs)
1187 try:
1188 if indata and session.get("set_project"):
1189 raise EngineException(
1190 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1191 HTTPStatus.UNPROCESSABLE_ENTITY,
1192 )
1193 # TODO self._check_edition(session, indata, _id, force)
1194 if not content:
1195 content = self.show(session, _id)
1196 indata = self._validate_input_edit(
1197 input=indata, content=content, force=session["force"]
1198 )
1199 operation_params = indata
1200 deep_update_rfc7396(content, indata)
1201
1202 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1203 _id = content.get("_id") or _id
yshahd0c876f2024-11-11 09:24:48 +00001204 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001205 content,
1206 "move",
1207 operation_params,
1208 )
1209 if content.get("_admin"):
1210 now = time()
1211 content["_admin"]["modified"] = now
1212 content["operatingState"] = "PROCESSING"
1213 content["resourceState"] = "IN_PROGRESS"
1214
1215 self.db.replace(self.topic, _id, content)
yshah53cc9eb2024-07-05 13:06:31 +00001216 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
1217 self._send_msg("move", data)
1218 return op_id
1219 except ValidationError as e:
1220 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1221
1222 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1223 if final_content["name"] != edit_content["name"]:
1224 self.check_unique_name(session, edit_content["name"])
1225 return final_content
1226
1227 @staticmethod
1228 def format_on_edit(final_content, edit_content):
shrinithi28d887f2025-01-08 05:27:19 +00001229 op_id = ACMTopic.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001230 final_content,
1231 "update",
1232 edit_content,
1233 )
1234 final_content["operatingState"] = "PROCESSING"
1235 final_content["resourceState"] = "IN_PROGRESS"
1236 if final_content.get("_admin"):
1237 now = time()
1238 final_content["_admin"]["modified"] = now
yshahd0c876f2024-11-11 09:24:48 +00001239 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001240
1241 def edit(self, session, _id, indata, kwargs):
1242 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001243 if _id == "update":
1244 for ksus in indata["ksus"]:
1245 content = ksus
1246 _id = content["_id"]
1247 _id_list.append(_id)
1248 content.pop("_id")
yshahd0c876f2024-11-11 09:24:48 +00001249 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001250 else:
1251 content = indata
1252 _id_list.append(_id)
yshahd0c876f2024-11-11 09:24:48 +00001253 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001254
1255 data = {"ksus_list": _id_list, "operation_id": op_id}
1256 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +00001257
yshahd23c6a52025-06-13 05:49:31 +00001258 def cluster_list_ksu(self, session, filter_q=None, api_req=None):
1259 db_filter = {}
1260 if filter_q.get("cluster_id"):
1261 db_filter["_id"] = filter_q.get("cluster_id")
1262 ksu_data_list = []
1263
1264 cluster_data = self.db.get_one("clusters", db_filter)
1265 profiles_list = [
1266 "infra_controller_profiles",
1267 "infra_config_profiles",
1268 "app_profiles",
1269 "resource_profiles",
1270 ]
1271 for profile in profiles_list:
1272 data_list = []
1273 for profile_id in cluster_data[profile]:
1274 filter_q = {"profile": {"_id": profile_id, "profile_type": profile}}
1275 data_list = self.db.get_list(self.topic, filter_q)
1276 for ksu_data in data_list:
1277 ksu_data["package_name"] = []
1278 ksu_data["package_path"] = []
1279 for okas in ksu_data["operationHistory"][0]["operationParams"]["oka"]:
1280 sw_catalog_path = okas.get("sw_catalog_path")
1281 if sw_catalog_path:
1282 parts = sw_catalog_path.rsplit("/", 2)
1283 self.logger.info(f"Parts: {parts}")
1284 ksu_data["package_name"].append(parts[-2])
1285 ksu_data["package_path"].append("/".join(parts[:-1]))
1286 else:
1287 oka_id = okas["_id"]
1288 db_oka = self.db.get_one("okas", {"_id": oka_id})
1289 oka_type = self.MAP_PROFILE[
1290 db_oka.get("profile_type", "infra_controller_profiles")
1291 ]
1292 ksu_data["package_name"].append(db_oka["git_name"].lower())
1293 ksu_data["package_path"].append(
1294 f"{oka_type}/{db_oka['git_name'].lower()}"
1295 )
1296 ksu_data_list.append(ksu_data)
1297
1298 outdata = {}
1299 outdata["count"] = cluster_data["ksu_count"]
1300 outdata["data"] = ksu_data_list
1301 self.logger.info(f"Outdata: {outdata}")
1302 return outdata
1303
yshahd0c876f2024-11-11 09:24:48 +00001304 def edit_ksu(self, session, _id, indata, kwargs):
yshahf8f07632025-01-17 04:46:03 +00001305 check = {
1306 "ksu": _id,
1307 }
1308 if indata.get("profile"):
1309 check[indata["profile"]["profile_type"]] = indata["profile"]["_id"]
1310 if indata.get("oka"):
1311 check["okas"] = []
1312 for oka in indata["oka"]:
1313 if oka.get("_id") is not None:
1314 check["okas"].append(oka["_id"])
1315
1316 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001317 content = None
1318 indata = self._remove_envelop(indata)
1319
1320 # Override descriptor with query string kwargs
1321 if kwargs:
1322 self._update_input_with_kwargs(indata, kwargs)
1323 try:
1324 if indata and session.get("set_project"):
1325 raise EngineException(
1326 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1327 HTTPStatus.UNPROCESSABLE_ENTITY,
1328 )
1329 # TODO self._check_edition(session, indata, _id, force)
1330 if not content:
1331 content = self.show(session, _id)
1332
1333 for okas in indata["oka"]:
1334 if not okas["_id"]:
1335 okas.pop("_id")
1336 if not okas["sw_catalog_path"]:
1337 okas.pop("sw_catalog_path")
1338
1339 indata = self._validate_input_edit(indata, content, force=session["force"])
1340
1341 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1342 _id = content.get("_id") or _id
1343
1344 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
yshah53cc9eb2024-07-05 13:06:31 +00001345 op_id = self.format_on_edit(content, indata)
1346 self.db.replace(self.topic, _id, content)
1347 return op_id
1348 except ValidationError as e:
1349 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1350
1351 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
1352 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001353 if _id == "delete":
1354 for ksus in indata["ksus"]:
1355 content = ksus
1356 _id = content["_id"]
yshah53cc9eb2024-07-05 13:06:31 +00001357 content.pop("_id")
garciadeblasac285872024-12-05 12:21:09 +01001358 op_id, not_send_msg_ksu = self.delete(session, _id)
1359 if not not_send_msg_ksu:
1360 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001361 else:
garciadeblasac285872024-12-05 12:21:09 +01001362 op_id, not_send_msg_ksu = self.delete(session, _id)
1363 if not not_send_msg_ksu:
1364 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001365
garciadeblasac285872024-12-05 12:21:09 +01001366 if _id_list:
yshah781ce732025-02-28 08:56:13 +00001367 data = {
1368 "ksus_list": _id_list,
1369 "operation_id": op_id,
1370 "force": session["force"],
1371 }
garciadeblasac285872024-12-05 12:21:09 +01001372 self._send_msg("delete", data, not_send_msg)
yshah53cc9eb2024-07-05 13:06:31 +00001373 return op_id
1374
yshahd0c876f2024-11-11 09:24:48 +00001375 def delete(self, session, _id):
yshah53cc9eb2024-07-05 13:06:31 +00001376 if not self.multiproject:
1377 filter_q = {}
1378 else:
1379 filter_q = self._get_project_filter(session)
1380 filter_q[self.id_field(self.topic, _id)] = _id
1381 item_content = self.db.get_one(self.topic, filter_q)
yshahf8f07632025-01-17 04:46:03 +00001382
1383 check = {
1384 "ksu": _id,
1385 item_content["profile"]["profile_type"]: item_content["profile"]["_id"],
1386 }
1387 self.check_dependency(check, operation_type="delete")
1388
yshah53cc9eb2024-07-05 13:06:31 +00001389 item_content["state"] = "IN_DELETION"
1390 item_content["operatingState"] = "PROCESSING"
1391 item_content["resourceState"] = "IN_PROGRESS"
yshahd0c876f2024-11-11 09:24:48 +00001392 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001393 item_content,
1394 "delete",
1395 None,
1396 )
1397 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1398
garciadeblasac285872024-12-05 12:21:09 +01001399 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1400 not_send_msg = None
1401 profile_id = item_content["profile"]["_id"]
1402 profile_type = item_content["profile"]["profile_type"]
1403 profile_collection_map = {
1404 "app_profiles": "k8sapp",
1405 "resource_profiles": "k8sresource",
1406 "infra_controller_profiles": "k8sinfra_controller",
1407 "infra_config_profiles": "k8sinfra_config",
1408 }
1409 profile_collection = profile_collection_map[profile_type]
1410 profile_content = self.db.get_one(
1411 profile_collection, {"_id": profile_id}, fail_on_empty=False
1412 )
1413 if not profile_content:
1414 self.db.del_one(self.topic, filter_q)
1415 not_send_msg = True
1416 return op_id, not_send_msg
yshah53cc9eb2024-07-05 13:06:31 +00001417
1418
shrinithi28d887f2025-01-08 05:27:19 +00001419class OkaTopic(DescriptorTopic, ACMOperationTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001420 topic = "okas"
1421 topic_msg = "oka"
1422 schema_new = oka_schema
1423 schema_edit = oka_schema
1424
1425 def __init__(self, db, fs, msg, auth):
1426 super().__init__(db, fs, msg, auth)
1427 self.logger = logging.getLogger("nbi.oka")
1428
1429 @staticmethod
1430 def format_on_new(content, project_id=None, make_public=False):
1431 DescriptorTopic.format_on_new(
1432 content, project_id=project_id, make_public=make_public
1433 )
garciadeblasbecc7052024-11-20 12:04:53 +01001434 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001435 content["state"] = "PENDING_CONTENT"
1436 content["operatingState"] = "PROCESSING"
1437 content["resourceState"] = "IN_PROGRESS"
1438
1439 def check_conflict_on_del(self, session, _id, db_content):
1440 usage_state = db_content["_admin"]["usageState"]
1441 if usage_state == "IN_USE":
1442 raise EngineException(
1443 "There is a KSU using this package",
1444 http_code=HTTPStatus.CONFLICT,
1445 )
1446
1447 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
yshah00cfe8b2025-01-17 04:05:45 +00001448 if "name" in edit_content:
1449 if final_content["name"] == edit_content["name"]:
1450 name = edit_content["name"]
1451 raise EngineException(
1452 f"No update, new name for the OKA is the same: {name}",
1453 http_code=HTTPStatus.CONFLICT,
1454 )
1455 else:
1456 self.check_unique_name(session, edit_content["name"])
1457 elif (
1458 "description" in edit_content
yshah53cc9eb2024-07-05 13:06:31 +00001459 and final_content["description"] == edit_content["description"]
1460 ):
yshah00cfe8b2025-01-17 04:05:45 +00001461 description = edit_content["description"]
yshah53cc9eb2024-07-05 13:06:31 +00001462 raise EngineException(
yshah00cfe8b2025-01-17 04:05:45 +00001463 f"No update, new description for the OKA is the same: {description}",
yshah53cc9eb2024-07-05 13:06:31 +00001464 http_code=HTTPStatus.CONFLICT,
1465 )
yshah53cc9eb2024-07-05 13:06:31 +00001466 return final_content
1467
1468 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1469 indata = self._remove_envelop(indata)
1470
1471 # Override descriptor with query string kwargs
1472 if kwargs:
1473 self._update_input_with_kwargs(indata, kwargs)
1474 try:
1475 if indata and session.get("set_project"):
1476 raise EngineException(
1477 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1478 HTTPStatus.UNPROCESSABLE_ENTITY,
1479 )
1480 # TODO self._check_edition(session, indata, _id, force)
1481 if not content:
1482 content = self.show(session, _id)
1483
1484 indata = self._validate_input_edit(indata, content, force=session["force"])
1485
1486 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1487 _id = content.get("_id") or _id
1488
1489 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1490 op_id = self.format_on_edit(content, indata)
1491 deep_update_rfc7396(content, indata)
1492
1493 self.db.replace(self.topic, _id, content)
1494 return op_id
1495 except ValidationError as e:
1496 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1497
1498 def delete(self, session, _id, dry_run=False, not_send_msg=None):
yshahf8f07632025-01-17 04:46:03 +00001499 check = {"oka": _id}
1500 self.check_dependency(check, operation_type="delete")
yshah53cc9eb2024-07-05 13:06:31 +00001501 if not self.multiproject:
1502 filter_q = {}
1503 else:
1504 filter_q = self._get_project_filter(session)
1505 filter_q[self.id_field(self.topic, _id)] = _id
1506 item_content = self.db.get_one(self.topic, filter_q)
1507 item_content["state"] = "IN_DELETION"
1508 item_content["operatingState"] = "PROCESSING"
1509 self.check_conflict_on_del(session, _id, item_content)
yshahd0c876f2024-11-11 09:24:48 +00001510 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001511 item_content,
1512 "delete",
1513 None,
1514 )
yshah53cc9eb2024-07-05 13:06:31 +00001515 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1516 self._send_msg(
yshah781ce732025-02-28 08:56:13 +00001517 "delete",
1518 {"oka_id": _id, "operation_id": op_id, "force": session["force"]},
1519 not_send_msg=not_send_msg,
yshah53cc9eb2024-07-05 13:06:31 +00001520 )
yshahffcac5f2024-08-19 12:49:07 +00001521 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001522
1523 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1524 # _remove_envelop
1525 if indata:
1526 if "userDefinedData" in indata:
1527 indata = indata["userDefinedData"]
1528
1529 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1530
1531 self._update_input_with_kwargs(content, kwargs)
1532 content = BaseTopic._validate_input_new(
1533 self, input=kwargs, force=session["force"]
1534 )
1535
1536 self.check_unique_name(session, content["name"])
1537 operation_params = {}
1538 for content_key, content_value in content.items():
1539 operation_params[content_key] = content_value
1540 self.format_on_new(
1541 content, session["project_id"], make_public=session["public"]
1542 )
yshahd0c876f2024-11-11 09:24:48 +00001543 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001544 content,
1545 operation_type="create",
1546 operation_params=operation_params,
1547 )
1548 content["git_name"] = self.create_gitname(content, session)
1549 _id = self.db.create(self.topic, content)
1550 rollback.append({"topic": self.topic, "_id": _id})
yshahd0c876f2024-11-11 09:24:48 +00001551 return _id, op_id
yshah53cc9eb2024-07-05 13:06:31 +00001552
1553 def upload_content(self, session, _id, indata, kwargs, headers):
yshahf8f07632025-01-17 04:46:03 +00001554 if headers["Method"] in ("PUT", "PATCH"):
1555 check = {"oka": _id}
1556 self.check_dependency(check)
yshah53cc9eb2024-07-05 13:06:31 +00001557 current_desc = self.show(session, _id)
1558
1559 compressed = None
1560 content_type = headers.get("Content-Type")
1561 if (
1562 content_type
1563 and "application/gzip" in content_type
1564 or "application/x-gzip" in content_type
1565 ):
1566 compressed = "gzip"
1567 if content_type and "application/zip" in content_type:
1568 compressed = "zip"
1569 filename = headers.get("Content-Filename")
1570 if not filename and compressed:
1571 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1572 elif not filename:
1573 filename = "package"
1574
1575 revision = 1
1576 if "revision" in current_desc["_admin"]:
1577 revision = current_desc["_admin"]["revision"] + 1
1578
1579 file_pkg = None
1580 fs_rollback = []
1581
1582 try:
1583 start = 0
1584 # Rather than using a temp folder, we will store the package in a folder based on
1585 # the current revision.
1586 proposed_revision_path = _id + ":" + str(revision)
1587 # all the content is upload here and if ok, it is rename from id_ to is folder
1588
1589 if start:
1590 if not self.fs.file_exists(proposed_revision_path, "dir"):
1591 raise EngineException(
1592 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1593 )
1594 else:
1595 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1596 self.fs.mkdir(proposed_revision_path)
1597 fs_rollback.append(proposed_revision_path)
1598
1599 storage = self.fs.get_params()
1600 storage["folder"] = proposed_revision_path
yshah2c932bd2024-09-24 18:16:07 +00001601 storage["zipfile"] = filename
yshah53cc9eb2024-07-05 13:06:31 +00001602
1603 file_path = (proposed_revision_path, filename)
1604 file_pkg = self.fs.file_open(file_path, "a+b")
1605
yshah53cc9eb2024-07-05 13:06:31 +00001606 if isinstance(indata, dict):
1607 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1608 file_pkg.write(indata_text.encode(encoding="utf-8"))
1609 else:
1610 indata_len = 0
1611 indata = indata.file
1612 while True:
1613 indata_text = indata.read(4096)
1614 indata_len += len(indata_text)
1615 if not indata_text:
1616 break
1617 file_pkg.write(indata_text)
1618
yshah53cc9eb2024-07-05 13:06:31 +00001619 # Need to close the file package here so it can be copied from the
1620 # revision to the current, unrevisioned record
1621 if file_pkg:
1622 file_pkg.close()
1623 file_pkg = None
1624
1625 # Fetch both the incoming, proposed revision and the original revision so we
1626 # can call a validate method to compare them
1627 current_revision_path = _id + "/"
1628 self.fs.sync(from_path=current_revision_path)
1629 self.fs.sync(from_path=proposed_revision_path)
1630
garciadeblas807b8bf2024-09-23 13:03:00 +02001631 # Is this required?
yshah53cc9eb2024-07-05 13:06:31 +00001632 if revision > 1:
1633 try:
1634 self._validate_descriptor_changes(
1635 _id,
1636 filename,
1637 current_revision_path,
1638 proposed_revision_path,
1639 )
1640 except Exception as e:
1641 shutil.rmtree(
1642 self.fs.path + current_revision_path, ignore_errors=True
1643 )
1644 shutil.rmtree(
1645 self.fs.path + proposed_revision_path, ignore_errors=True
1646 )
1647 # Only delete the new revision. We need to keep the original version in place
1648 # as it has not been changed.
1649 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1650 raise e
1651
1652 indata = self._remove_envelop(indata)
1653
1654 # Override descriptor with query string kwargs
1655 if kwargs:
1656 self._update_input_with_kwargs(indata, kwargs)
1657
1658 current_desc["_admin"]["storage"] = storage
1659 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1660 current_desc["_admin"]["operationalState"] = "ENABLED"
1661 current_desc["_admin"]["modified"] = time()
1662 current_desc["_admin"]["revision"] = revision
1663
1664 deep_update_rfc7396(current_desc, indata)
1665
1666 # Copy the revision to the active package name by its original id
1667 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1668 os.rename(
1669 self.fs.path + proposed_revision_path,
1670 self.fs.path + current_revision_path,
1671 )
1672 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1673 self.fs.mkdir(current_revision_path)
1674 self.fs.reverse_sync(from_path=current_revision_path)
1675
1676 shutil.rmtree(self.fs.path + _id)
1677 kwargs = {}
1678 kwargs["package"] = filename
1679 if headers["Method"] == "POST":
1680 current_desc["state"] = "IN_CREATION"
garciadeblasbecc7052024-11-20 12:04:53 +01001681 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get(
1682 "op_id"
1683 )
yshah53cc9eb2024-07-05 13:06:31 +00001684 elif headers["Method"] in ("PUT", "PATCH"):
yshahd0c876f2024-11-11 09:24:48 +00001685 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001686 current_desc,
1687 "update",
1688 kwargs,
1689 )
1690 current_desc["operatingState"] = "PROCESSING"
1691 current_desc["resourceState"] = "IN_PROGRESS"
1692
1693 self.db.replace(self.topic, _id, current_desc)
1694
1695 # Store a copy of the package as a point in time revision
1696 revision_desc = dict(current_desc)
1697 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1698 self.db.create(self.topic + "_revisions", revision_desc)
1699 fs_rollback = []
1700
yshah53cc9eb2024-07-05 13:06:31 +00001701 if headers["Method"] == "POST":
1702 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1703 elif headers["Method"] == "PUT" or "PATCH":
1704 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1705
1706 return True
1707
1708 except EngineException:
1709 raise
1710 finally:
1711 if file_pkg:
1712 file_pkg.close()
1713 for file in fs_rollback:
1714 self.fs.file_delete(file, ignore_non_exist=True)