blob: 496b7372cf7bcc9a0c476ba3cbd04bf7b09c3bcc [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
yshah53cc9eb2024-07-05 13:06:31 +000018import shutil
19import os
rshri2d386cb2024-07-05 14:35:51 +000020from http import HTTPStatus
21
yshah53cc9eb2024-07-05 13:06:31 +000022from time import time
rshri2d386cb2024-07-05 14:35:51 +000023from osm_nbi.base_topic import BaseTopic, EngineException
shrinithi28d887f2025-01-08 05:27:19 +000024from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic
rshri2d386cb2024-07-05 14:35:51 +000025
yshah53cc9eb2024-07-05 13:06:31 +000026from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000027from osm_nbi.validation import (
28 ValidationError,
yshah99122b82024-11-18 07:05:29 +000029 validate_input,
rshri2d386cb2024-07-05 14:35:51 +000030 clustercreation_new_schema,
yshah99122b82024-11-18 07:05:29 +000031 cluster_edit_schema,
32 cluster_update_schema,
rshri2d386cb2024-07-05 14:35:51 +000033 infra_controller_profile_create_new_schema,
34 infra_config_profile_create_new_schema,
35 app_profile_create_new_schema,
36 resource_profile_create_new_schema,
37 infra_controller_profile_create_edit_schema,
38 infra_config_profile_create_edit_schema,
39 app_profile_create_edit_schema,
40 resource_profile_create_edit_schema,
rshri17b09ec2024-11-07 05:48:12 +000041 clusterregistration_new_schema,
rshri2d386cb2024-07-05 14:35:51 +000042 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000043 ksu_schema,
44 oka_schema,
yshahd23c6a52025-06-13 05:49:31 +000045 node_create_new_schema,
46 node_edit_schema,
rshri2d386cb2024-07-05 14:35:51 +000047)
yshah53cc9eb2024-07-05 13:06:31 +000048from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000049from osm_common.msgbase import MsgException
50from osm_common.fsbase import FsException
51
yshah53cc9eb2024-07-05 13:06:31 +000052__author__ = (
53 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
54 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
55)
rshri2d386cb2024-07-05 14:35:51 +000056
57
shrinithi28d887f2025-01-08 05:27:19 +000058class InfraContTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000059 topic = "k8sinfra_controller"
60 topic_msg = "k8s_infra_controller"
61 schema_new = infra_controller_profile_create_new_schema
62 schema_edit = infra_controller_profile_create_edit_schema
63
64 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000065 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000066
67 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
68 # To create the new infra controller profile
69 return self.new_profile(rollback, session, indata, kwargs, headers)
70
71 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
72 # To create the default infra controller profile while creating the cluster
73 return self.default_profile(rollback, session, indata, kwargs, headers)
74
75 def delete(self, session, _id, dry_run=False, not_send_msg=None):
garciadeblas14fed6f2025-04-02 12:53:22 +020076 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +000077 return _id
rshri2d386cb2024-07-05 14:35:51 +000078
79
shrinithi28d887f2025-01-08 05:27:19 +000080class InfraConfTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000081 topic = "k8sinfra_config"
82 topic_msg = "k8s_infra_config"
83 schema_new = infra_config_profile_create_new_schema
84 schema_edit = infra_config_profile_create_edit_schema
85
86 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +000087 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000088
89 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
90 # To create the new infra config profile
91 return self.new_profile(rollback, session, indata, kwargs, headers)
92
93 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
94 # To create the default infra config profile while creating the cluster
95 return self.default_profile(rollback, session, indata, kwargs, headers)
96
97 def delete(self, session, _id, dry_run=False, not_send_msg=None):
garciadeblas14fed6f2025-04-02 12:53:22 +020098 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +000099 return _id
rshri2d386cb2024-07-05 14:35:51 +0000100
101
shrinithi28d887f2025-01-08 05:27:19 +0000102class AppTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000103 topic = "k8sapp"
104 topic_msg = "k8s_app"
105 schema_new = app_profile_create_new_schema
106 schema_edit = app_profile_create_edit_schema
107
108 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000109 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000110
111 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
112 # To create the new app profile
113 return self.new_profile(rollback, session, indata, kwargs, headers)
114
115 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
116 # To create the default app profile while creating the cluster
117 return self.default_profile(rollback, session, indata, kwargs, headers)
118
119 def delete(self, session, _id, dry_run=False, not_send_msg=None):
garciadeblas14fed6f2025-04-02 12:53:22 +0200120 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000121 return _id
rshri2d386cb2024-07-05 14:35:51 +0000122
123
shrinithi28d887f2025-01-08 05:27:19 +0000124class ResourceTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000125 topic = "k8sresource"
126 topic_msg = "k8s_resource"
127 schema_new = resource_profile_create_new_schema
128 schema_edit = resource_profile_create_edit_schema
129
130 def __init__(self, db, fs, msg, auth):
shrinithi75492bd2025-03-21 18:37:44 +0000131 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000132
133 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
134 # To create the new resource profile
135 return self.new_profile(rollback, session, indata, kwargs, headers)
136
137 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
138 # To create the default resource profile while creating the cluster
139 return self.default_profile(rollback, session, indata, kwargs, headers)
140
141 def delete(self, session, _id, dry_run=False, not_send_msg=None):
garciadeblas14fed6f2025-04-02 12:53:22 +0200142 self.delete_profile(session, _id, dry_run, not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000143 return _id
rshri2d386cb2024-07-05 14:35:51 +0000144
145
shrinithi28d887f2025-01-08 05:27:19 +0000146class ClusterTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000147 topic = "clusters"
148 topic_msg = "cluster"
149 schema_new = clustercreation_new_schema
150 schema_edit = attach_dettach_profile_schema
151
152 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000153 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000154 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
155 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
156 self.resource_topic = ResourceTopic(db, fs, msg, auth)
157 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000158
garciadeblasbecc7052024-11-20 12:04:53 +0100159 @staticmethod
160 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000161 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100162 content["current_operation"] = None
163
rshri2d386cb2024-07-05 14:35:51 +0000164 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
165 """
166 Creates a new k8scluster into database.
167 :param rollback: list to append the created items at database in case a rollback must be done
168 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
169 :param indata: params to be used for the k8cluster
170 :param kwargs: used to override the indata
171 :param headers: http request headers
172 :return: the _id of k8scluster created at database. Or an exception of type
173 EngineException, ValidationError, DbException, FsException, MsgException.
174 Note: Exceptions are not captured on purpose. They should be captured at called
175 """
rshri41b2db92025-06-11 11:17:42 +0000176
rshri2d386cb2024-07-05 14:35:51 +0000177 step = "checking quotas" # first step must be defined outside try
178 try:
rshri41b2db92025-06-11 11:17:42 +0000179 if self.multiproject:
180 self.check_quota(session)
181
182 content = self._remove_envelop(indata)
183
rshri2d386cb2024-07-05 14:35:51 +0000184 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000185 self.cluster_unique_name_check(session, indata["name"])
rshri41b2db92025-06-11 11:17:42 +0000186
rshri2d386cb2024-07-05 14:35:51 +0000187 step = "validating input parameters"
rshri41b2db92025-06-11 11:17:42 +0000188 self._update_input_with_kwargs(content, kwargs)
189
190 content = self._validate_input_new(content, session, force=session["force"])
191
192 operation_params = indata.copy()
193
194 self.check_conflict_on_new(session, content)
195 self.format_on_new(
196 content, project_id=session["project_id"], make_public=session["public"]
197 )
rshri2d386cb2024-07-05 14:35:51 +0000198
199 step = "filling cluster details from input data"
rshri41b2db92025-06-11 11:17:42 +0000200 content = self._create_cluster(
201 content, rollback, session, indata, kwargs, headers
rshri2d386cb2024-07-05 14:35:51 +0000202 )
203
204 step = "creating cluster at database"
rshri41b2db92025-06-11 11:17:42 +0000205 _id = self.db.create(self.topic, content)
206
rshri2d386cb2024-07-05 14:35:51 +0000207 op_id = self.format_on_operation(
rshri41b2db92025-06-11 11:17:42 +0000208 content,
rshri2d386cb2024-07-05 14:35:51 +0000209 "create",
210 operation_params,
211 )
rshri41b2db92025-06-11 11:17:42 +0000212
garciadeblas6e88d9c2024-08-15 10:55:04 +0200213 pubkey, privkey = self._generate_age_key()
rshri41b2db92025-06-11 11:17:42 +0000214 content["age_pubkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200215 pubkey, schema_version="1.11", salt=_id
216 )
rshri41b2db92025-06-11 11:17:42 +0000217 content["age_privkey"] = self.db.encrypt(
garciadeblas6e88d9c2024-08-15 10:55:04 +0200218 privkey, schema_version="1.11", salt=_id
219 )
rshri41b2db92025-06-11 11:17:42 +0000220
garciadeblas6e88d9c2024-08-15 10:55:04 +0200221 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000222 rollback.append({"topic": self.topic, "_id": _id})
rshri41b2db92025-06-11 11:17:42 +0000223 self.db.set_one("clusters", {"_id": _id}, content)
rshri2d386cb2024-07-05 14:35:51 +0000224 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
225
rshri50e34dc2024-12-02 03:10:39 +0000226 # To add the content in old collection "k8sclusters"
rshri41b2db92025-06-11 11:17:42 +0000227 self.add_to_old_collection(content, session)
rshri50e34dc2024-12-02 03:10:39 +0000228
rshri2d386cb2024-07-05 14:35:51 +0000229 return _id, None
230 except (
231 ValidationError,
232 EngineException,
233 DbException,
234 MsgException,
235 FsException,
236 ) as e:
237 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
238
rshri41b2db92025-06-11 11:17:42 +0000239 def _validate_input_new(self, content, session, force=False):
240 # validating vim and checking the mandatory parameters
241 vim_type = self.check_vim(session, content["vim_account"])
242
243 # for aws
244 if vim_type == "aws":
245 self._aws_check(content)
246
247 # for azure and gcp
248 elif vim_type in ["azure", "gcp"]:
249 self._params_check(content)
250
251 return super()._validate_input_new(content, force=session["force"])
252
253 def _aws_check(self, indata):
254 if "node_count" in indata or "node_size" in indata:
255 raise ValueError("node_count and node_size are not allowed for AWS")
256 return
257
258 def _params_check(self, indata):
259 if "node_count" not in indata and "node_size" not in indata:
260 raise ValueError("node_count and node_size are mandatory parameter")
261 return
262
263 def _create_cluster(self, content, rollback, session, indata, kwargs, headers):
264 private_subnet = indata.get("private_subnet")
265 public_subnet = indata.get("public_subnet")
266
267 # Enforce: if private_subnet is provided, public_subnet must also be provided
268 if (private_subnet and not public_subnet) or (
269 public_subnet and not private_subnet
270 ):
271 raise ValueError(
272 "'public_subnet' must be provided if 'private_subnet' is given and viceversa."
273 )
274
275 # private Subnet validation
276 if private_subnet:
277 count = len(private_subnet)
278 if count != 2:
279 raise ValueError(
280 f"private_subnet must contain exactly 2 items, got {count}"
281 )
282
283 # public Subnet validation
284 public_subnet = indata.get("public_subnet")
285 if public_subnet:
286 count = len(public_subnet)
287 if count != 1:
288 raise ValueError(
289 f"public_subnet must contain exactly 1 items, got {count}"
290 )
291
292 content["infra_controller_profiles"] = [
293 self._create_default_profiles(
294 rollback, session, indata, kwargs, headers, self.infra_contr_topic
295 )
296 ]
297 content["infra_config_profiles"] = [
298 self._create_default_profiles(
299 rollback, session, indata, kwargs, headers, self.infra_conf_topic
300 )
301 ]
302 content["resource_profiles"] = [
303 self._create_default_profiles(
304 rollback, session, indata, kwargs, headers, self.resource_topic
305 )
306 ]
307 content["app_profiles"] = [
308 self._create_default_profiles(
309 rollback, session, indata, kwargs, headers, self.app_topic
310 )
311 ]
312 content["created"] = "true"
313 content["state"] = "IN_CREATION"
314 content["operatingState"] = "PROCESSING"
315 content["git_name"] = self.create_gitname(content, session)
316 content["resourceState"] = "IN_PROGRESS.REQUEST_RECEIVED"
317
rshri2d386cb2024-07-05 14:35:51 +0000318 # Get the vim_account details
319 vim_account_details = self.db.get_one(
rshri41b2db92025-06-11 11:17:42 +0000320 "vim_accounts", {"name": content["vim_account"]}
rshri2d386cb2024-07-05 14:35:51 +0000321 )
322
rshri17b09ec2024-11-07 05:48:12 +0000323 # Add optional fields if they exist in the request
rshri41b2db92025-06-11 11:17:42 +0000324
325 if "region_name" not in indata:
326 region_name = vim_account_details.get("config", {}).get("region_name")
327 if region_name:
328 content["region_name"] = region_name
329
330 if "resource_group" not in indata:
331 resource_group = vim_account_details.get("config", {}).get("resource_group")
332 if resource_group:
333 content["resource_group"] = resource_group
334
335 version = "k8s_version" in content
336 if not version:
337 content["k8s_version"] = "1.28"
338 content["node_count"] = indata.get("node_count", 0)
yshahd23c6a52025-06-13 05:49:31 +0000339 content["ksu_count"] = 0
rshri41b2db92025-06-11 11:17:42 +0000340 self.logger.info(f"cotent is : {content}")
341 return content
rshri2d386cb2024-07-05 14:35:51 +0000342
343 def check_vim(self, session, name):
344 try:
345 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
346 if vim_account_details is not None:
rshri41b2db92025-06-11 11:17:42 +0000347 return vim_account_details["vim_type"]
rshri2d386cb2024-07-05 14:35:51 +0000348 except ValidationError as e:
349 raise EngineException(
350 e,
351 HTTPStatus.UNPROCESSABLE_ENTITY,
352 )
353
354 def _create_default_profiles(
355 self, rollback, session, indata, kwargs, headers, topic
356 ):
357 topic = self.to_select_topic(topic)
358 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
359 return default_profiles
360
361 def to_select_topic(self, topic):
362 if topic == "infra_controller_profiles":
363 topic = self.infra_contr_topic
364 elif topic == "infra_config_profiles":
365 topic = self.infra_conf_topic
366 elif topic == "resource_profiles":
367 topic = self.resource_topic
368 elif topic == "app_profiles":
369 topic = self.app_topic
370 return topic
371
372 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
373 try:
374 filter_q = self._get_project_filter(session)
375 filter_q[self.id_field(self.topic, _id)] = _id
376 content = self.db.get_one(self.topic, filter_q)
377 existing_profiles = []
378 topic = None
379 topic = self.to_select_topic(profile)
380 for profile_id in content[profile]:
381 data = topic.show(session, profile_id, filter_q, api_req)
382 existing_profiles.append(data)
383 return existing_profiles
384 except ValidationError as e:
385 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
386
387 def state_check(self, profile_id, session, topic):
388 topic = self.to_select_topic(topic)
389 content = topic.show(session, profile_id, filter_q=None, api_req=False)
390 state = content["state"]
391 if state == "CREATED":
392 return
393 else:
394 raise EngineException(
395 f" {profile_id} is not in created state",
396 HTTPStatus.UNPROCESSABLE_ENTITY,
397 )
398
399 def edit(self, session, _id, item, indata=None, kwargs=None):
rshri50e34dc2024-12-02 03:10:39 +0000400 if item not in (
yshah99122b82024-11-18 07:05:29 +0000401 "infra_controller_profiles",
402 "infra_config_profiles",
403 "app_profiles",
404 "resource_profiles",
405 ):
406 self.schema_edit = cluster_edit_schema
407 super().edit(session, _id, indata=item, kwargs=kwargs, content=None)
rshri2d386cb2024-07-05 14:35:51 +0000408 else:
yshah99122b82024-11-18 07:05:29 +0000409 indata = self._remove_envelop(indata)
410 indata = self._validate_input_edit(
411 indata, content=None, force=session["force"]
412 )
413 if indata.get("add_profile"):
414 self.add_profile(session, _id, item, indata)
415 elif indata.get("remove_profile"):
416 self.remove_profile(session, _id, item, indata)
417 else:
418 error_msg = "Add / remove operation is only applicable"
419 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
rshri2d386cb2024-07-05 14:35:51 +0000420
yshah00a620a2025-01-16 12:06:40 +0000421 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
422 check = self.db.get_one(self.topic, {"_id": _id})
423 if "name" in indata and check["name"] != indata["name"]:
424 self.check_unique_name(session, indata["name"])
425 _filter = {"name": indata["name"]}
426 topic_list = [
427 "k8sclusters",
428 "k8sinfra_controller",
429 "k8sinfra_config",
430 "k8sapp",
431 "k8sresource",
432 ]
433 # Check unique name for k8scluster and profiles
434 for topic in topic_list:
435 if self.db.get_one(
436 topic, _filter, fail_on_empty=False, fail_on_more=False
437 ):
438 raise EngineException(
439 "name '{}' already exists for {}".format(indata["name"], topic),
440 HTTPStatus.CONFLICT,
441 )
442 # Replace name in k8scluster and profiles
443 for topic in topic_list:
444 data = self.db.get_one(topic, {"name": check["name"]})
445 data["name"] = indata["name"]
446 self.db.replace(topic, data["_id"], data)
447 return True
448
rshri2d386cb2024-07-05 14:35:51 +0000449 def add_profile(self, session, _id, item, indata=None):
450 indata = self._remove_envelop(indata)
451 operation_params = indata
452 profile_id = indata["add_profile"][0]["id"]
453 # check state
454 self.state_check(profile_id, session, item)
455 filter_q = self._get_project_filter(session)
456 filter_q[self.id_field(self.topic, _id)] = _id
457 content = self.db.get_one(self.topic, filter_q)
458 profile_list = content[item]
459
460 if profile_id not in profile_list:
461 content["operatingState"] = "PROCESSING"
rshri2d386cb2024-07-05 14:35:51 +0000462 op_id = self.format_on_operation(
463 content,
464 "add",
465 operation_params,
466 )
467 self.db.set_one("clusters", {"_id": content["_id"]}, content)
468 self._send_msg(
469 "add",
470 {
471 "cluster_id": _id,
472 "profile_id": profile_id,
473 "profile_type": item,
474 "operation_id": op_id,
475 },
476 )
477 else:
478 raise EngineException(
479 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
480 )
481
482 def _get_default_profiles(self, session, topic):
483 topic = self.to_select_topic(topic)
484 existing_profiles = topic.list(session, filter_q=None, api_req=False)
485 default_profiles = [
486 profile["_id"]
487 for profile in existing_profiles
488 if profile.get("default", False)
489 ]
490 return default_profiles
491
492 def remove_profile(self, session, _id, item, indata):
493 indata = self._remove_envelop(indata)
494 operation_params = indata
495 profile_id = indata["remove_profile"][0]["id"]
496 filter_q = self._get_project_filter(session)
497 filter_q[self.id_field(self.topic, _id)] = _id
498 content = self.db.get_one(self.topic, filter_q)
499 profile_list = content[item]
500
501 default_profiles = self._get_default_profiles(session, item)
502
503 if profile_id in default_profiles:
504 raise EngineException(
505 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
506 )
507 if profile_id in profile_list:
rshri2d386cb2024-07-05 14:35:51 +0000508 op_id = self.format_on_operation(
509 content,
510 "remove",
511 operation_params,
512 )
513 self.db.set_one("clusters", {"_id": content["_id"]}, content)
514 self._send_msg(
515 "remove",
516 {
517 "cluster_id": _id,
518 "profile_id": profile_id,
519 "profile_type": item,
520 "operation_id": op_id,
521 },
522 )
523 else:
524 raise EngineException(
525 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
526 )
527
shahithyab9eb4142024-10-17 05:51:39 +0000528 def get_cluster_creds(self, session, _id, item):
yshah53cc9eb2024-07-05 13:06:31 +0000529 if not self.multiproject:
530 filter_db = {}
531 else:
532 filter_db = self._get_project_filter(session)
yshah53cc9eb2024-07-05 13:06:31 +0000533 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
garciadeblasbecc7052024-11-20 12:04:53 +0100534 operation_params = None
shahithyab9eb4142024-10-17 05:51:39 +0000535 data = self.db.get_one(self.topic, filter_db)
garciadeblasbecc7052024-11-20 12:04:53 +0100536 op_id = self.format_on_operation(data, item, operation_params)
shahithyab9eb4142024-10-17 05:51:39 +0000537 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
538 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id})
539 return op_id
540
541 def get_cluster_creds_file(self, session, _id, item, op_id):
542 if not self.multiproject:
543 filter_db = {}
544 else:
545 filter_db = self._get_project_filter(session)
546 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
shahithya8bded112024-10-15 08:01:44 +0000547
548 data = self.db.get_one(self.topic, filter_db)
shahithyab9eb4142024-10-17 05:51:39 +0000549 creds_flag = None
550 for operations in data["operationHistory"]:
551 if operations["op_id"] == op_id:
552 creds_flag = operations["result"]
553 self.logger.info("Creds Flag: {}".format(creds_flag))
shahithya8bded112024-10-15 08:01:44 +0000554
shahithyab9eb4142024-10-17 05:51:39 +0000555 if creds_flag is True:
556 credentials = data["credentials"]
shahithya8bded112024-10-15 08:01:44 +0000557
shahithyab9eb4142024-10-17 05:51:39 +0000558 file_pkg = None
559 current_path = _id
shahithya8bded112024-10-15 08:01:44 +0000560
shahithyab9eb4142024-10-17 05:51:39 +0000561 self.fs.file_delete(current_path, ignore_non_exist=True)
562 self.fs.mkdir(current_path)
563 filename = "credentials.yaml"
564 file_path = (current_path, filename)
565 self.logger.info("File path: {}".format(file_path))
566 file_pkg = self.fs.file_open(file_path, "a+b")
shahithya8bded112024-10-15 08:01:44 +0000567
shahithyab9eb4142024-10-17 05:51:39 +0000568 credentials_yaml = yaml.safe_dump(
569 credentials, indent=4, default_flow_style=False
570 )
571 file_pkg.write(credentials_yaml.encode(encoding="utf-8"))
shahithya8bded112024-10-15 08:01:44 +0000572
shahithyab9eb4142024-10-17 05:51:39 +0000573 if file_pkg:
574 file_pkg.close()
575 file_pkg = None
576 self.fs.sync(from_path=current_path)
shahithya8bded112024-10-15 08:01:44 +0000577
shahithyab9eb4142024-10-17 05:51:39 +0000578 return (
579 self.fs.file_open((current_path, filename), "rb"),
580 "text/plain",
581 )
582 else:
583 raise EngineException(
584 "Not possible to get the credentials of the cluster",
585 HTTPStatus.UNPROCESSABLE_ENTITY,
586 )
yshah53cc9eb2024-07-05 13:06:31 +0000587
yshahd23c6a52025-06-13 05:49:31 +0000588 def update_item(self, session, _id, item, indata):
yshah53cc9eb2024-07-05 13:06:31 +0000589 if not self.multiproject:
590 filter_db = {}
591 else:
592 filter_db = self._get_project_filter(session)
593 # To allow project&user addressing by name AS WELL AS _id
594 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
yshah99122b82024-11-18 07:05:29 +0000595 validate_input(indata, cluster_update_schema)
yshah53cc9eb2024-07-05 13:06:31 +0000596 data = self.db.get_one(self.topic, filter_db)
yshah99122b82024-11-18 07:05:29 +0000597 operation_params = {}
yshah53cc9eb2024-07-05 13:06:31 +0000598 data["operatingState"] = "PROCESSING"
599 data["resourceState"] = "IN_PROGRESS"
600 operation_params = indata
yshahd0c876f2024-11-11 09:24:48 +0000601 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000602 data,
603 item,
604 operation_params,
605 )
606 self.db.set_one(self.topic, {"_id": _id}, data)
yshah53cc9eb2024-07-05 13:06:31 +0000607 data = {"cluster_id": _id, "operation_id": op_id}
608 self._send_msg(item, data)
609 return op_id
610
shrinithi28d887f2025-01-08 05:27:19 +0000611 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
612 op_id = self.common_delete(_id, db_content)
yshah781ce732025-02-28 08:56:13 +0000613 return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]}
rshri2d386cb2024-07-05 14:35:51 +0000614
shrinithi28d887f2025-01-08 05:27:19 +0000615 def delete(self, session, _id, dry_run=False, not_send_msg=None):
616 filter_q = self._get_project_filter(session)
617 filter_q[self.id_field(self.topic, _id)] = _id
618 check = self.db.get_one(self.topic, filter_q)
shrinithi75492bd2025-03-21 18:37:44 +0000619 op_id = check["current_operation"]
shrinithi28d887f2025-01-08 05:27:19 +0000620 if check["created"] == "false":
621 raise EngineException(
garciadeblas3d1d6272025-02-04 11:55:36 +0100622 "Cannot delete registered cluster. Please deregister.",
shrinithi28d887f2025-01-08 05:27:19 +0000623 HTTPStatus.UNPROCESSABLE_ENTITY,
624 )
garciadeblas14fed6f2025-04-02 12:53:22 +0200625 super().delete(session, _id, dry_run, not_send_msg)
shrinithi75492bd2025-03-21 18:37:44 +0000626 return op_id
shrinithi28d887f2025-01-08 05:27:19 +0000627
628
yshahd23c6a52025-06-13 05:49:31 +0000629class NodeGroupTopic(ACMTopic):
630 topic = "nodegroups"
631 topic_msg = "nodegroup"
632 schema_new = node_create_new_schema
633 schema_edit = node_edit_schema
634
635 def __init__(self, db, fs, msg, auth):
636 BaseTopic.__init__(self, db, fs, msg, auth)
637
638 @staticmethod
639 def format_on_new(content, project_id=None, make_public=False):
640 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
641 content["current_operation"] = None
642 content["state"] = "IN_CREATION"
643 content["operatingState"] = "PROCESSING"
644 content["resourceState"] = "IN_PROGRESS"
645
646 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
647 self.logger.info(f"Indata: {indata}")
648 self.check_unique_name(session, indata["name"])
649
650 indata = self._remove_envelop(indata)
651 self._update_input_with_kwargs(indata, kwargs)
652 if not indata.get("private_subnet") and not indata.get("public_subnet"):
653 raise EngineException(
654 "Please provide atleast one subnet",
655 HTTPStatus.UNPROCESSABLE_ENTITY,
656 )
657 content = self._validate_input_new(indata, session["force"])
658
659 self.logger.info(f"Indata: {indata}")
660 self.logger.info(f"Content: {content}")
661 cluster_id = content["cluster_id"]
662 db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
663 private_subnet = db_cluster.get("private_subnet")
664 public_subnet = db_cluster.get("public_subnet")
665 if content.get("private_subnet"):
666 for subnet in content["private_subnet"]:
667 if subnet not in private_subnet:
668 raise EngineException(
669 "No External subnet is used to add nodegroup",
670 HTTPStatus.UNPROCESSABLE_ENTITY,
671 )
672 if content.get("public_subnet"):
673 for subnet in content["public_subnet"]:
674 if subnet not in public_subnet:
675 raise EngineException(
676 "No External subnet is used to add nodegroup",
677 HTTPStatus.UNPROCESSABLE_ENTITY,
678 )
679
680 operation_params = {}
681 for content_key, content_value in content.items():
682 operation_params[content_key] = content_value
683 self.format_on_new(
684 content, session["project_id"], make_public=session["public"]
685 )
686 content["git_name"] = self.create_gitname(content, session)
687 self.logger.info(f"Operation Params: {operation_params}")
688 op_id = self.format_on_operation(
689 content,
690 "create",
691 operation_params,
692 )
693 node_count = db_cluster.get("node_count")
694 new_node_count = node_count + 1
695 self.logger.info(f"New Node count: {new_node_count}")
696 db_cluster["node_count"] = new_node_count
697 self.db.set_one("clusters", {"_id": cluster_id}, db_cluster)
698 _id = self.db.create(self.topic, content)
699 self._send_msg("add_nodegroup", {"nodegroup_id": _id, "operation_id": op_id})
700 return _id, op_id
701
702 def list(self, session, filter_q=None, api_req=False):
703 db_filter = {}
704 if filter_q.get("cluster_id"):
705 db_filter["cluster_id"] = filter_q.get("cluster_id")
706 data_list = self.db.get_list(self.topic, db_filter)
707 cluster_data = self.db.get_one("clusters", {"_id": db_filter["cluster_id"]})
708 self.logger.info(f"Cluster Data: {cluster_data}")
709 self.logger.info(f"Data: {data_list}")
710 if filter_q.get("cluster_id"):
711 outdata = {}
712 outdata["count"] = cluster_data["node_count"]
713 outdata["data"] = data_list
714 self.logger.info(f"Outdata: {outdata}")
715 return outdata
716 if api_req:
717 data_list = [self.sol005_projection(inst) for inst in data_list]
718 return data_list
719
720 def delete(self, session, _id, dry_run=False, not_send_msg=None):
721 if not self.multiproject:
722 filter_q = {}
723 else:
724 filter_q = self._get_project_filter(session)
725 filter_q[self.id_field(self.topic, _id)] = _id
726 item_content = self.db.get_one(self.topic, filter_q)
727 item_content["state"] = "IN_DELETION"
728 item_content["operatingState"] = "PROCESSING"
729 item_content["resourceState"] = "IN_PROGRESS"
730 self.check_conflict_on_del(session, _id, item_content)
731 op_id = self.format_on_operation(
732 item_content,
733 "delete",
734 None,
735 )
736 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
737 self._send_msg(
738 "delete_nodegroup",
739 {"nodegroup_id": _id, "operation_id": op_id},
740 not_send_msg=not_send_msg,
741 )
742 return op_id
743
744 def update_item(self, session, _id, item, indata):
745 content = None
746 try:
747 if not content:
748 content = self.db.get_one(self.topic, {"_id": _id})
749 indata = self._validate_input_edit(indata, content, force=session["force"])
750 _id = content.get("_id") or _id
751
752 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
753 op_id = self.format_on_edit(content, indata)
754 op_id = ACMTopic.format_on_operation(
755 content,
756 "scale",
757 indata,
758 )
759 self.logger.info(f"op_id: {op_id}")
760 content["operatingState"] = "PROCESSING"
761 content["resourceState"] = "IN_PROGRESS"
762 self.db.replace(self.topic, _id, content)
763 self._send_msg(
764 "scale_nodegroup", {"nodegroup_id": _id, "operation_id": op_id}
765 )
766 return op_id
767 except ValidationError as e:
768 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
769
770 def edit(self, session, _id, indata, kwargs):
771 content = None
772
773 # Override descriptor with query string kwargs
774 if kwargs:
775 self._update_input_with_kwargs(indata, kwargs)
776 try:
777 if indata and session.get("set_project"):
778 raise EngineException(
779 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
780 HTTPStatus.UNPROCESSABLE_ENTITY,
781 )
782 # TODO self._check_edition(session, indata, _id, force)
783 if not content:
784 content = self.db.get_one(self.topic, {"_id": _id})
785
786 indata = self._validate_input_edit(indata, content, force=session["force"])
787 self.logger.info(f"Indata: {indata}")
788
789 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
790 _id = content.get("_id") or _id
791
792 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
793 if "name" in indata and "description" in indata:
794 content["name"] = indata["name"]
795 content["description"] = indata["description"]
796 elif "name" in indata:
797 content["name"] = indata["name"]
798 elif "description" in indata:
799 content["description"] = indata["description"]
800 op_id = self.format_on_edit(content, indata)
801 self.db.set_one(self.topic, {"_id": _id}, content)
802 return op_id
803 except ValidationError as e:
804 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
805
806
shrinithi28d887f2025-01-08 05:27:19 +0000807class ClusterOpsTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000808 topic = "clusters"
809 topic_msg = "cluster"
rshri17b09ec2024-11-07 05:48:12 +0000810 schema_new = clusterregistration_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000811
812 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000813 super().__init__(db, fs, msg, auth)
garciadeblas3d5dc322025-04-03 23:57:04 +0200814 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
815 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
816 self.resource_topic = ResourceTopic(db, fs, msg, auth)
817 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000818
garciadeblasbecc7052024-11-20 12:04:53 +0100819 @staticmethod
820 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000821 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100822 content["current_operation"] = None
823
rshri2d386cb2024-07-05 14:35:51 +0000824 def add(self, rollback, session, indata, kwargs=None, headers=None):
825 step = "checking quotas"
826 try:
827 self.check_quota(session)
828 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000829 self.cluster_unique_name_check(session, indata["name"])
830 # self.check_unique_name(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000831 step = "validating input parameters"
832 cls_add_request = self._remove_envelop(indata)
833 self._update_input_with_kwargs(cls_add_request, kwargs)
834 cls_add_request = self._validate_input_new(
835 cls_add_request, session["force"]
836 )
837 operation_params = cls_add_request
838
839 step = "filling cluster details from input data"
garciadeblas3d5dc322025-04-03 23:57:04 +0200840 cls_add_request = self._add_cluster(
841 cls_add_request, rollback, session, indata, kwargs, headers
842 )
rshri2d386cb2024-07-05 14:35:51 +0000843
rshri17b09ec2024-11-07 05:48:12 +0000844 step = "registering the cluster at database"
rshri2d386cb2024-07-05 14:35:51 +0000845 self.format_on_new(
rshri17b09ec2024-11-07 05:48:12 +0000846 cls_add_request, session["project_id"], make_public=session["public"]
rshri2d386cb2024-07-05 14:35:51 +0000847 )
rshri2d386cb2024-07-05 14:35:51 +0000848 op_id = self.format_on_operation(
rshri17b09ec2024-11-07 05:48:12 +0000849 cls_add_request,
rshri2d386cb2024-07-05 14:35:51 +0000850 "register",
851 operation_params,
852 )
rshri17b09ec2024-11-07 05:48:12 +0000853 _id = self.db.create(self.topic, cls_add_request)
garciadeblas9d9d9262024-09-25 11:25:33 +0200854 pubkey, privkey = self._generate_age_key()
rshri17b09ec2024-11-07 05:48:12 +0000855 cls_add_request["age_pubkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200856 pubkey, schema_version="1.11", salt=_id
857 )
rshri17b09ec2024-11-07 05:48:12 +0000858 cls_add_request["age_privkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200859 privkey, schema_version="1.11", salt=_id
860 )
861 # TODO: set age_pubkey and age_privkey in the default profiles
rshri17b09ec2024-11-07 05:48:12 +0000862 self.db.set_one(self.topic, {"_id": _id}, cls_add_request)
rshri2d386cb2024-07-05 14:35:51 +0000863 rollback.append({"topic": self.topic, "_id": _id})
864 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
rshri50e34dc2024-12-02 03:10:39 +0000865
866 # To add the content in old collection "k8sclusters"
867 self.add_to_old_collection(cls_add_request, session)
868
rshri2d386cb2024-07-05 14:35:51 +0000869 return _id, None
870 except (
871 ValidationError,
872 EngineException,
873 DbException,
874 MsgException,
875 FsException,
876 ) as e:
877 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
878
garciadeblas3d5dc322025-04-03 23:57:04 +0200879 def _add_cluster(self, cls_add_request, rollback, session, indata, kwargs, headers):
rshri2d386cb2024-07-05 14:35:51 +0000880 cls_add = {
881 "name": cls_add_request["name"],
rshri2d386cb2024-07-05 14:35:51 +0000882 "credentials": cls_add_request["credentials"],
883 "vim_account": cls_add_request["vim_account"],
rshri17b09ec2024-11-07 05:48:12 +0000884 "bootstrap": cls_add_request["bootstrap"],
garciadeblas3d5dc322025-04-03 23:57:04 +0200885 "infra_controller_profiles": [
886 self._create_default_profiles(
887 rollback, session, indata, kwargs, headers, self.infra_contr_topic
888 )
889 ],
890 "infra_config_profiles": [
891 self._create_default_profiles(
892 rollback, session, indata, kwargs, headers, self.infra_conf_topic
893 )
894 ],
895 "resource_profiles": [
896 self._create_default_profiles(
897 rollback, session, indata, kwargs, headers, self.resource_topic
898 )
899 ],
900 "app_profiles": [
901 self._create_default_profiles(
902 rollback, session, indata, kwargs, headers, self.app_topic
903 )
904 ],
rshri2d386cb2024-07-05 14:35:51 +0000905 "created": "false",
906 "state": "IN_CREATION",
907 "operatingState": "PROCESSING",
908 "git_name": self.create_gitname(cls_add_request, session),
909 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
910 }
rshri17b09ec2024-11-07 05:48:12 +0000911 # Add optional fields if they exist in the request
912 if "description" in cls_add_request:
913 cls_add["description"] = cls_add_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000914 return cls_add
915
garciadeblas3d5dc322025-04-03 23:57:04 +0200916 def check_vim(self, session, name):
917 try:
918 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
919 if vim_account_details is not None:
920 return name
921 except ValidationError as e:
922 raise EngineException(
923 e,
924 HTTPStatus.UNPROCESSABLE_ENTITY,
925 )
926
927 def _create_default_profiles(
928 self, rollback, session, indata, kwargs, headers, topic
929 ):
930 topic = self.to_select_topic(topic)
931 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
932 return default_profiles
933
934 def to_select_topic(self, topic):
935 if topic == "infra_controller_profiles":
936 topic = self.infra_contr_topic
937 elif topic == "infra_config_profiles":
938 topic = self.infra_conf_topic
939 elif topic == "resource_profiles":
940 topic = self.resource_topic
941 elif topic == "app_profiles":
942 topic = self.app_topic
943 return topic
944
rshri2d386cb2024-07-05 14:35:51 +0000945 def remove(self, session, _id, dry_run=False, not_send_msg=None):
946 """
947 Delete item by its internal _id
948 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
949 :param _id: server internal id
950 :param dry_run: make checking but do not delete
951 :param not_send_msg: To not send message (False) or store content (list) instead
952 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
953 """
954
955 # To allow addressing projects and users by name AS WELL AS by _id
956 if not self.multiproject:
957 filter_q = {}
958 else:
959 filter_q = self._get_project_filter(session)
960 filter_q[self.id_field(self.topic, _id)] = _id
961 item_content = self.db.get_one(self.topic, filter_q)
962
rshri2d386cb2024-07-05 14:35:51 +0000963 op_id = self.format_on_operation(
964 item_content,
965 "deregister",
966 None,
967 )
968 self.db.set_one(self.topic, {"_id": _id}, item_content)
969
970 self.check_conflict_on_del(session, _id, item_content)
971 if dry_run:
972 return None
973
974 if self.multiproject and session["project_id"]:
975 # remove reference from project_read if there are more projects referencing it. If it last one,
976 # do not remove reference, but delete
977 other_projects_referencing = next(
978 (
979 p
980 for p in item_content["_admin"]["projects_read"]
981 if p not in session["project_id"] and p != "ANY"
982 ),
983 None,
984 )
985
986 # check if there are projects referencing it (apart from ANY, that means, public)....
987 if other_projects_referencing:
988 # remove references but not delete
989 update_dict_pull = {
990 "_admin.projects_read": session["project_id"],
991 "_admin.projects_write": session["project_id"],
992 }
993 self.db.set_one(
994 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
995 )
996 return None
997 else:
998 can_write = next(
999 (
1000 p
1001 for p in item_content["_admin"]["projects_write"]
1002 if p == "ANY" or p in session["project_id"]
1003 ),
1004 None,
1005 )
1006 if not can_write:
1007 raise EngineException(
1008 "You have not write permission to delete it",
1009 http_code=HTTPStatus.UNAUTHORIZED,
1010 )
1011
1012 # delete
1013 self._send_msg(
1014 "deregister",
1015 {"cluster_id": _id, "operation_id": op_id},
1016 not_send_msg=not_send_msg,
1017 )
shrinithi75492bd2025-03-21 18:37:44 +00001018 return _id
yshah53cc9eb2024-07-05 13:06:31 +00001019
1020
shrinithi28d887f2025-01-08 05:27:19 +00001021class KsusTopic(ACMTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001022 topic = "ksus"
1023 okapkg_topic = "okas"
1024 infra_topic = "k8sinfra"
1025 topic_msg = "ksu"
1026 schema_new = ksu_schema
1027 schema_edit = ksu_schema
yshahd23c6a52025-06-13 05:49:31 +00001028 MAP_PROFILE = {
1029 "infra_controller_profiles": "infra-controllers",
1030 "infra_config_profiles": "infra-configs",
1031 "resource_profiles": "managed_resources",
1032 "app_profiles": "apps",
1033 }
yshah53cc9eb2024-07-05 13:06:31 +00001034
1035 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +00001036 super().__init__(db, fs, msg, auth)
yshah53cc9eb2024-07-05 13:06:31 +00001037 self.logger = logging.getLogger("nbi.ksus")
1038
1039 @staticmethod
1040 def format_on_new(content, project_id=None, make_public=False):
1041 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +01001042 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001043 content["state"] = "IN_CREATION"
1044 content["operatingState"] = "PROCESSING"
1045 content["resourceState"] = "IN_PROGRESS"
1046
1047 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1048 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001049 for ksus in indata["ksus"]:
1050 content = ksus
1051 oka = content["oka"][0]
1052 oka_flag = ""
1053 if oka["_id"]:
1054 oka_flag = "_id"
shahithya8bded112024-10-15 08:01:44 +00001055 oka["sw_catalog_path"] = ""
yshah53cc9eb2024-07-05 13:06:31 +00001056 elif oka["sw_catalog_path"]:
1057 oka_flag = "sw_catalog_path"
1058
1059 for okas in content["oka"]:
1060 if okas["_id"] and okas["sw_catalog_path"]:
1061 raise EngineException(
1062 "Cannot create ksu with both OKA and SW catalog path",
1063 HTTPStatus.UNPROCESSABLE_ENTITY,
1064 )
1065 if not okas["sw_catalog_path"]:
1066 okas.pop("sw_catalog_path")
1067 elif not okas["_id"]:
1068 okas.pop("_id")
1069 if oka_flag not in okas.keys():
1070 raise EngineException(
1071 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
1072 HTTPStatus.UNPROCESSABLE_ENTITY,
1073 )
1074
1075 # Override descriptor with query string kwargs
1076 content = self._remove_envelop(content)
1077 self._update_input_with_kwargs(content, kwargs)
1078 content = self._validate_input_new(input=content, force=session["force"])
1079
1080 # Check for unique name
1081 self.check_unique_name(session, content["name"])
1082
1083 self.check_conflict_on_new(session, content)
1084
1085 operation_params = {}
1086 for content_key, content_value in content.items():
1087 operation_params[content_key] = content_value
1088 self.format_on_new(
1089 content, project_id=session["project_id"], make_public=session["public"]
1090 )
yshah53cc9eb2024-07-05 13:06:31 +00001091 op_id = self.format_on_operation(
1092 content,
1093 operation_type="create",
1094 operation_params=operation_params,
1095 )
1096 content["git_name"] = self.create_gitname(content, session)
1097
1098 # Update Oka_package usage state
1099 for okas in content["oka"]:
1100 if "_id" in okas.keys():
1101 self.update_usage_state(session, okas)
1102
yshahd23c6a52025-06-13 05:49:31 +00001103 profile_id = content["profile"].get("_id")
1104 profile_type = content["profile"].get("profile_type")
1105 db_cluster_list = self.db.get_list("clusters")
1106 for db_cluster in db_cluster_list:
1107 if db_cluster.get("created") == "true":
1108 profile_list = db_cluster[profile_type]
1109 if profile_id in profile_list:
1110 ksu_count = db_cluster.get("ksu_count")
1111 new_ksu_count = ksu_count + 1
1112 self.logger.info(f"New KSU count: {new_ksu_count}")
1113 db_cluster["ksu_count"] = new_ksu_count
1114 self.db.set_one(
1115 "clusters", {"_id": db_cluster["_id"]}, db_cluster
1116 )
1117
yshah53cc9eb2024-07-05 13:06:31 +00001118 _id = self.db.create(self.topic, content)
1119 rollback.append({"topic": self.topic, "_id": _id})
yshah53cc9eb2024-07-05 13:06:31 +00001120 _id_list.append(_id)
1121 data = {"ksus_list": _id_list, "operation_id": op_id}
1122 self._send_msg("create", data)
1123 return _id_list, op_id
1124
1125 def clone(self, rollback, session, _id, indata, kwargs, headers):
1126 filter_db = self._get_project_filter(session)
1127 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1128 data = self.db.get_one(self.topic, filter_db)
1129
yshah53cc9eb2024-07-05 13:06:31 +00001130 op_id = self.format_on_operation(
1131 data,
1132 "clone",
1133 indata,
1134 )
1135 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
1136 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
1137 return op_id
1138
1139 def update_usage_state(self, session, oka_content):
1140 _id = oka_content["_id"]
1141 filter_db = self._get_project_filter(session)
1142 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
1143
1144 data = self.db.get_one(self.okapkg_topic, filter_db)
1145 if data["_admin"]["usageState"] == "NOT_IN_USE":
1146 usage_state_update = {
1147 "_admin.usageState": "IN_USE",
1148 }
1149 self.db.set_one(
1150 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
1151 )
1152
1153 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
1154 indata = self._remove_envelop(indata)
1155
1156 # Override descriptor with query string kwargs
1157 if kwargs:
1158 self._update_input_with_kwargs(indata, kwargs)
1159 try:
1160 if indata and session.get("set_project"):
1161 raise EngineException(
1162 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1163 HTTPStatus.UNPROCESSABLE_ENTITY,
1164 )
1165 # TODO self._check_edition(session, indata, _id, force)
1166 if not content:
1167 content = self.show(session, _id)
1168 indata = self._validate_input_edit(
1169 input=indata, content=content, force=session["force"]
1170 )
1171 operation_params = indata
1172 deep_update_rfc7396(content, indata)
1173
1174 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1175 _id = content.get("_id") or _id
yshahd0c876f2024-11-11 09:24:48 +00001176 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001177 content,
1178 "move",
1179 operation_params,
1180 )
1181 if content.get("_admin"):
1182 now = time()
1183 content["_admin"]["modified"] = now
1184 content["operatingState"] = "PROCESSING"
1185 content["resourceState"] = "IN_PROGRESS"
1186
1187 self.db.replace(self.topic, _id, content)
yshah53cc9eb2024-07-05 13:06:31 +00001188 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
1189 self._send_msg("move", data)
1190 return op_id
1191 except ValidationError as e:
1192 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1193
1194 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1195 if final_content["name"] != edit_content["name"]:
1196 self.check_unique_name(session, edit_content["name"])
1197 return final_content
1198
1199 @staticmethod
1200 def format_on_edit(final_content, edit_content):
shrinithi28d887f2025-01-08 05:27:19 +00001201 op_id = ACMTopic.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001202 final_content,
1203 "update",
1204 edit_content,
1205 )
1206 final_content["operatingState"] = "PROCESSING"
1207 final_content["resourceState"] = "IN_PROGRESS"
1208 if final_content.get("_admin"):
1209 now = time()
1210 final_content["_admin"]["modified"] = now
yshahd0c876f2024-11-11 09:24:48 +00001211 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001212
1213 def edit(self, session, _id, indata, kwargs):
1214 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001215 if _id == "update":
1216 for ksus in indata["ksus"]:
1217 content = ksus
1218 _id = content["_id"]
1219 _id_list.append(_id)
1220 content.pop("_id")
yshahd0c876f2024-11-11 09:24:48 +00001221 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001222 else:
1223 content = indata
1224 _id_list.append(_id)
yshahd0c876f2024-11-11 09:24:48 +00001225 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +00001226
1227 data = {"ksus_list": _id_list, "operation_id": op_id}
1228 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +00001229
yshahd23c6a52025-06-13 05:49:31 +00001230 def cluster_list_ksu(self, session, filter_q=None, api_req=None):
1231 db_filter = {}
1232 if filter_q.get("cluster_id"):
1233 db_filter["_id"] = filter_q.get("cluster_id")
1234 ksu_data_list = []
1235
1236 cluster_data = self.db.get_one("clusters", db_filter)
1237 profiles_list = [
1238 "infra_controller_profiles",
1239 "infra_config_profiles",
1240 "app_profiles",
1241 "resource_profiles",
1242 ]
1243 for profile in profiles_list:
1244 data_list = []
1245 for profile_id in cluster_data[profile]:
1246 filter_q = {"profile": {"_id": profile_id, "profile_type": profile}}
1247 data_list = self.db.get_list(self.topic, filter_q)
1248 for ksu_data in data_list:
1249 ksu_data["package_name"] = []
1250 ksu_data["package_path"] = []
1251 for okas in ksu_data["operationHistory"][0]["operationParams"]["oka"]:
1252 sw_catalog_path = okas.get("sw_catalog_path")
1253 if sw_catalog_path:
1254 parts = sw_catalog_path.rsplit("/", 2)
1255 self.logger.info(f"Parts: {parts}")
1256 ksu_data["package_name"].append(parts[-2])
1257 ksu_data["package_path"].append("/".join(parts[:-1]))
1258 else:
1259 oka_id = okas["_id"]
1260 db_oka = self.db.get_one("okas", {"_id": oka_id})
1261 oka_type = self.MAP_PROFILE[
1262 db_oka.get("profile_type", "infra_controller_profiles")
1263 ]
1264 ksu_data["package_name"].append(db_oka["git_name"].lower())
1265 ksu_data["package_path"].append(
1266 f"{oka_type}/{db_oka['git_name'].lower()}"
1267 )
1268 ksu_data_list.append(ksu_data)
1269
1270 outdata = {}
1271 outdata["count"] = cluster_data["ksu_count"]
1272 outdata["data"] = ksu_data_list
1273 self.logger.info(f"Outdata: {outdata}")
1274 return outdata
1275
yshahd0c876f2024-11-11 09:24:48 +00001276 def edit_ksu(self, session, _id, indata, kwargs):
yshah53cc9eb2024-07-05 13:06:31 +00001277 content = None
1278 indata = self._remove_envelop(indata)
1279
1280 # Override descriptor with query string kwargs
1281 if kwargs:
1282 self._update_input_with_kwargs(indata, kwargs)
1283 try:
1284 if indata and session.get("set_project"):
1285 raise EngineException(
1286 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1287 HTTPStatus.UNPROCESSABLE_ENTITY,
1288 )
1289 # TODO self._check_edition(session, indata, _id, force)
1290 if not content:
1291 content = self.show(session, _id)
1292
1293 for okas in indata["oka"]:
1294 if not okas["_id"]:
1295 okas.pop("_id")
1296 if not okas["sw_catalog_path"]:
1297 okas.pop("sw_catalog_path")
1298
1299 indata = self._validate_input_edit(indata, content, force=session["force"])
1300
1301 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1302 _id = content.get("_id") or _id
1303
1304 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
yshah53cc9eb2024-07-05 13:06:31 +00001305 op_id = self.format_on_edit(content, indata)
1306 self.db.replace(self.topic, _id, content)
1307 return op_id
1308 except ValidationError as e:
1309 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1310
1311 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
1312 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +00001313 if _id == "delete":
1314 for ksus in indata["ksus"]:
1315 content = ksus
1316 _id = content["_id"]
yshah53cc9eb2024-07-05 13:06:31 +00001317 content.pop("_id")
garciadeblasac285872024-12-05 12:21:09 +01001318 op_id, not_send_msg_ksu = self.delete(session, _id)
1319 if not not_send_msg_ksu:
1320 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001321 else:
garciadeblasac285872024-12-05 12:21:09 +01001322 op_id, not_send_msg_ksu = self.delete(session, _id)
1323 if not not_send_msg_ksu:
1324 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +00001325
garciadeblasac285872024-12-05 12:21:09 +01001326 if _id_list:
yshah781ce732025-02-28 08:56:13 +00001327 data = {
1328 "ksus_list": _id_list,
1329 "operation_id": op_id,
1330 "force": session["force"],
1331 }
garciadeblasac285872024-12-05 12:21:09 +01001332 self._send_msg("delete", data, not_send_msg)
yshah53cc9eb2024-07-05 13:06:31 +00001333 return op_id
1334
yshahd0c876f2024-11-11 09:24:48 +00001335 def delete(self, session, _id):
yshah53cc9eb2024-07-05 13:06:31 +00001336 if not self.multiproject:
1337 filter_q = {}
1338 else:
1339 filter_q = self._get_project_filter(session)
1340 filter_q[self.id_field(self.topic, _id)] = _id
1341 item_content = self.db.get_one(self.topic, filter_q)
1342 item_content["state"] = "IN_DELETION"
1343 item_content["operatingState"] = "PROCESSING"
1344 item_content["resourceState"] = "IN_PROGRESS"
yshahd0c876f2024-11-11 09:24:48 +00001345 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001346 item_content,
1347 "delete",
1348 None,
1349 )
1350 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1351
garciadeblasac285872024-12-05 12:21:09 +01001352 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1353 not_send_msg = None
1354 profile_id = item_content["profile"]["_id"]
1355 profile_type = item_content["profile"]["profile_type"]
1356 profile_collection_map = {
1357 "app_profiles": "k8sapp",
1358 "resource_profiles": "k8sresource",
1359 "infra_controller_profiles": "k8sinfra_controller",
1360 "infra_config_profiles": "k8sinfra_config",
1361 }
1362 profile_collection = profile_collection_map[profile_type]
1363 profile_content = self.db.get_one(
1364 profile_collection, {"_id": profile_id}, fail_on_empty=False
1365 )
1366 if not profile_content:
1367 self.db.del_one(self.topic, filter_q)
1368 not_send_msg = True
1369 return op_id, not_send_msg
yshah53cc9eb2024-07-05 13:06:31 +00001370
1371
shrinithi28d887f2025-01-08 05:27:19 +00001372class OkaTopic(DescriptorTopic, ACMOperationTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001373 topic = "okas"
1374 topic_msg = "oka"
1375 schema_new = oka_schema
1376 schema_edit = oka_schema
1377
1378 def __init__(self, db, fs, msg, auth):
1379 super().__init__(db, fs, msg, auth)
1380 self.logger = logging.getLogger("nbi.oka")
1381
1382 @staticmethod
1383 def format_on_new(content, project_id=None, make_public=False):
1384 DescriptorTopic.format_on_new(
1385 content, project_id=project_id, make_public=make_public
1386 )
garciadeblasbecc7052024-11-20 12:04:53 +01001387 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001388 content["state"] = "PENDING_CONTENT"
1389 content["operatingState"] = "PROCESSING"
1390 content["resourceState"] = "IN_PROGRESS"
1391
1392 def check_conflict_on_del(self, session, _id, db_content):
1393 usage_state = db_content["_admin"]["usageState"]
1394 if usage_state == "IN_USE":
1395 raise EngineException(
1396 "There is a KSU using this package",
1397 http_code=HTTPStatus.CONFLICT,
1398 )
1399
1400 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
yshah00cfe8b2025-01-17 04:05:45 +00001401 if "name" in edit_content:
1402 if final_content["name"] == edit_content["name"]:
1403 name = edit_content["name"]
1404 raise EngineException(
1405 f"No update, new name for the OKA is the same: {name}",
1406 http_code=HTTPStatus.CONFLICT,
1407 )
1408 else:
1409 self.check_unique_name(session, edit_content["name"])
1410 elif (
1411 "description" in edit_content
yshah53cc9eb2024-07-05 13:06:31 +00001412 and final_content["description"] == edit_content["description"]
1413 ):
yshah00cfe8b2025-01-17 04:05:45 +00001414 description = edit_content["description"]
yshah53cc9eb2024-07-05 13:06:31 +00001415 raise EngineException(
yshah00cfe8b2025-01-17 04:05:45 +00001416 f"No update, new description for the OKA is the same: {description}",
yshah53cc9eb2024-07-05 13:06:31 +00001417 http_code=HTTPStatus.CONFLICT,
1418 )
yshah53cc9eb2024-07-05 13:06:31 +00001419 return final_content
1420
1421 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1422 indata = self._remove_envelop(indata)
1423
1424 # Override descriptor with query string kwargs
1425 if kwargs:
1426 self._update_input_with_kwargs(indata, kwargs)
1427 try:
1428 if indata and session.get("set_project"):
1429 raise EngineException(
1430 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1431 HTTPStatus.UNPROCESSABLE_ENTITY,
1432 )
1433 # TODO self._check_edition(session, indata, _id, force)
1434 if not content:
1435 content = self.show(session, _id)
1436
1437 indata = self._validate_input_edit(indata, content, force=session["force"])
1438
1439 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1440 _id = content.get("_id") or _id
1441
1442 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1443 op_id = self.format_on_edit(content, indata)
1444 deep_update_rfc7396(content, indata)
1445
1446 self.db.replace(self.topic, _id, content)
1447 return op_id
1448 except ValidationError as e:
1449 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1450
1451 def delete(self, session, _id, dry_run=False, not_send_msg=None):
1452 if not self.multiproject:
1453 filter_q = {}
1454 else:
1455 filter_q = self._get_project_filter(session)
1456 filter_q[self.id_field(self.topic, _id)] = _id
1457 item_content = self.db.get_one(self.topic, filter_q)
1458 item_content["state"] = "IN_DELETION"
1459 item_content["operatingState"] = "PROCESSING"
1460 self.check_conflict_on_del(session, _id, item_content)
yshahd0c876f2024-11-11 09:24:48 +00001461 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001462 item_content,
1463 "delete",
1464 None,
1465 )
yshah53cc9eb2024-07-05 13:06:31 +00001466 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1467 self._send_msg(
yshah781ce732025-02-28 08:56:13 +00001468 "delete",
1469 {"oka_id": _id, "operation_id": op_id, "force": session["force"]},
1470 not_send_msg=not_send_msg,
yshah53cc9eb2024-07-05 13:06:31 +00001471 )
yshahffcac5f2024-08-19 12:49:07 +00001472 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001473
1474 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1475 # _remove_envelop
1476 if indata:
1477 if "userDefinedData" in indata:
1478 indata = indata["userDefinedData"]
1479
1480 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1481
1482 self._update_input_with_kwargs(content, kwargs)
1483 content = BaseTopic._validate_input_new(
1484 self, input=kwargs, force=session["force"]
1485 )
1486
1487 self.check_unique_name(session, content["name"])
1488 operation_params = {}
1489 for content_key, content_value in content.items():
1490 operation_params[content_key] = content_value
1491 self.format_on_new(
1492 content, session["project_id"], make_public=session["public"]
1493 )
yshahd0c876f2024-11-11 09:24:48 +00001494 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001495 content,
1496 operation_type="create",
1497 operation_params=operation_params,
1498 )
1499 content["git_name"] = self.create_gitname(content, session)
1500 _id = self.db.create(self.topic, content)
1501 rollback.append({"topic": self.topic, "_id": _id})
yshahd0c876f2024-11-11 09:24:48 +00001502 return _id, op_id
yshah53cc9eb2024-07-05 13:06:31 +00001503
1504 def upload_content(self, session, _id, indata, kwargs, headers):
1505 current_desc = self.show(session, _id)
1506
1507 compressed = None
1508 content_type = headers.get("Content-Type")
1509 if (
1510 content_type
1511 and "application/gzip" in content_type
1512 or "application/x-gzip" in content_type
1513 ):
1514 compressed = "gzip"
1515 if content_type and "application/zip" in content_type:
1516 compressed = "zip"
1517 filename = headers.get("Content-Filename")
1518 if not filename and compressed:
1519 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1520 elif not filename:
1521 filename = "package"
1522
1523 revision = 1
1524 if "revision" in current_desc["_admin"]:
1525 revision = current_desc["_admin"]["revision"] + 1
1526
1527 file_pkg = None
1528 fs_rollback = []
1529
1530 try:
1531 start = 0
1532 # Rather than using a temp folder, we will store the package in a folder based on
1533 # the current revision.
1534 proposed_revision_path = _id + ":" + str(revision)
1535 # all the content is upload here and if ok, it is rename from id_ to is folder
1536
1537 if start:
1538 if not self.fs.file_exists(proposed_revision_path, "dir"):
1539 raise EngineException(
1540 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1541 )
1542 else:
1543 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1544 self.fs.mkdir(proposed_revision_path)
1545 fs_rollback.append(proposed_revision_path)
1546
1547 storage = self.fs.get_params()
1548 storage["folder"] = proposed_revision_path
yshah2c932bd2024-09-24 18:16:07 +00001549 storage["zipfile"] = filename
yshah53cc9eb2024-07-05 13:06:31 +00001550
1551 file_path = (proposed_revision_path, filename)
1552 file_pkg = self.fs.file_open(file_path, "a+b")
1553
yshah53cc9eb2024-07-05 13:06:31 +00001554 if isinstance(indata, dict):
1555 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1556 file_pkg.write(indata_text.encode(encoding="utf-8"))
1557 else:
1558 indata_len = 0
1559 indata = indata.file
1560 while True:
1561 indata_text = indata.read(4096)
1562 indata_len += len(indata_text)
1563 if not indata_text:
1564 break
1565 file_pkg.write(indata_text)
1566
yshah53cc9eb2024-07-05 13:06:31 +00001567 # Need to close the file package here so it can be copied from the
1568 # revision to the current, unrevisioned record
1569 if file_pkg:
1570 file_pkg.close()
1571 file_pkg = None
1572
1573 # Fetch both the incoming, proposed revision and the original revision so we
1574 # can call a validate method to compare them
1575 current_revision_path = _id + "/"
1576 self.fs.sync(from_path=current_revision_path)
1577 self.fs.sync(from_path=proposed_revision_path)
1578
garciadeblas807b8bf2024-09-23 13:03:00 +02001579 # Is this required?
yshah53cc9eb2024-07-05 13:06:31 +00001580 if revision > 1:
1581 try:
1582 self._validate_descriptor_changes(
1583 _id,
1584 filename,
1585 current_revision_path,
1586 proposed_revision_path,
1587 )
1588 except Exception as e:
1589 shutil.rmtree(
1590 self.fs.path + current_revision_path, ignore_errors=True
1591 )
1592 shutil.rmtree(
1593 self.fs.path + proposed_revision_path, ignore_errors=True
1594 )
1595 # Only delete the new revision. We need to keep the original version in place
1596 # as it has not been changed.
1597 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1598 raise e
1599
1600 indata = self._remove_envelop(indata)
1601
1602 # Override descriptor with query string kwargs
1603 if kwargs:
1604 self._update_input_with_kwargs(indata, kwargs)
1605
1606 current_desc["_admin"]["storage"] = storage
1607 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1608 current_desc["_admin"]["operationalState"] = "ENABLED"
1609 current_desc["_admin"]["modified"] = time()
1610 current_desc["_admin"]["revision"] = revision
1611
1612 deep_update_rfc7396(current_desc, indata)
1613
1614 # Copy the revision to the active package name by its original id
1615 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1616 os.rename(
1617 self.fs.path + proposed_revision_path,
1618 self.fs.path + current_revision_path,
1619 )
1620 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1621 self.fs.mkdir(current_revision_path)
1622 self.fs.reverse_sync(from_path=current_revision_path)
1623
1624 shutil.rmtree(self.fs.path + _id)
1625 kwargs = {}
1626 kwargs["package"] = filename
1627 if headers["Method"] == "POST":
1628 current_desc["state"] = "IN_CREATION"
garciadeblasbecc7052024-11-20 12:04:53 +01001629 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get(
1630 "op_id"
1631 )
yshah53cc9eb2024-07-05 13:06:31 +00001632 elif headers["Method"] in ("PUT", "PATCH"):
yshahd0c876f2024-11-11 09:24:48 +00001633 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001634 current_desc,
1635 "update",
1636 kwargs,
1637 )
1638 current_desc["operatingState"] = "PROCESSING"
1639 current_desc["resourceState"] = "IN_PROGRESS"
1640
1641 self.db.replace(self.topic, _id, current_desc)
1642
1643 # Store a copy of the package as a point in time revision
1644 revision_desc = dict(current_desc)
1645 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1646 self.db.create(self.topic + "_revisions", revision_desc)
1647 fs_rollback = []
1648
yshah53cc9eb2024-07-05 13:06:31 +00001649 if headers["Method"] == "POST":
1650 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1651 elif headers["Method"] == "PUT" or "PATCH":
1652 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1653
1654 return True
1655
1656 except EngineException:
1657 raise
1658 finally:
1659 if file_pkg:
1660 file_pkg.close()
1661 for file in fs_rollback:
1662 self.fs.file_delete(file, ignore_non_exist=True)