blob: d4ff80b6abeda676e26cba8f1d908507aae65f33 [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
yshah53cc9eb2024-07-05 13:06:31 +000018import shutil
19import os
rshri2d386cb2024-07-05 14:35:51 +000020from http import HTTPStatus
21
yshah53cc9eb2024-07-05 13:06:31 +000022from time import time
rshri2d386cb2024-07-05 14:35:51 +000023from osm_nbi.base_topic import BaseTopic, EngineException
shrinithi28d887f2025-01-08 05:27:19 +000024from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic
rshri2d386cb2024-07-05 14:35:51 +000025
yshah53cc9eb2024-07-05 13:06:31 +000026from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000027from osm_nbi.validation import (
28 ValidationError,
yshah99122b82024-11-18 07:05:29 +000029 validate_input,
rshri2d386cb2024-07-05 14:35:51 +000030 clustercreation_new_schema,
yshah99122b82024-11-18 07:05:29 +000031 cluster_edit_schema,
32 cluster_update_schema,
rshri2d386cb2024-07-05 14:35:51 +000033 infra_controller_profile_create_new_schema,
34 infra_config_profile_create_new_schema,
35 app_profile_create_new_schema,
36 resource_profile_create_new_schema,
37 infra_controller_profile_create_edit_schema,
38 infra_config_profile_create_edit_schema,
39 app_profile_create_edit_schema,
40 resource_profile_create_edit_schema,
rshri17b09ec2024-11-07 05:48:12 +000041 clusterregistration_new_schema,
rshri2d386cb2024-07-05 14:35:51 +000042 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000043 ksu_schema,
44 oka_schema,
rshri2d386cb2024-07-05 14:35:51 +000045)
yshah53cc9eb2024-07-05 13:06:31 +000046from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000047from osm_common.msgbase import MsgException
48from osm_common.fsbase import FsException
49
yshah53cc9eb2024-07-05 13:06:31 +000050__author__ = (
51 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
52 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
53)
rshri2d386cb2024-07-05 14:35:51 +000054
55
shrinithi28d887f2025-01-08 05:27:19 +000056class InfraContTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000057 topic = "k8sinfra_controller"
58 topic_msg = "k8s_infra_controller"
59 schema_new = infra_controller_profile_create_new_schema
60 schema_edit = infra_controller_profile_create_edit_schema
61
62 def __init__(self, db, fs, msg, auth):
63 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000064
65 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
66 # To create the new infra controller profile
67 return self.new_profile(rollback, session, indata, kwargs, headers)
68
69 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
70 # To create the default infra controller profile while creating the cluster
71 return self.default_profile(rollback, session, indata, kwargs, headers)
72
73 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +000074 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +000075 return _id
rshri2d386cb2024-07-05 14:35:51 +000076
77
shrinithi28d887f2025-01-08 05:27:19 +000078class InfraConfTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000079 topic = "k8sinfra_config"
80 topic_msg = "k8s_infra_config"
81 schema_new = infra_config_profile_create_new_schema
82 schema_edit = infra_config_profile_create_edit_schema
83
84 def __init__(self, db, fs, msg, auth):
85 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000086
87 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
88 # To create the new infra config profile
89 return self.new_profile(rollback, session, indata, kwargs, headers)
90
91 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
92 # To create the default infra config profile while creating the cluster
93 return self.default_profile(rollback, session, indata, kwargs, headers)
94
95 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +000096 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +000097 return _id
rshri2d386cb2024-07-05 14:35:51 +000098
99
shrinithi28d887f2025-01-08 05:27:19 +0000100class AppTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000101 topic = "k8sapp"
102 topic_msg = "k8s_app"
103 schema_new = app_profile_create_new_schema
104 schema_edit = app_profile_create_edit_schema
105
106 def __init__(self, db, fs, msg, auth):
107 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000108
109 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
110 # To create the new app profile
111 return self.new_profile(rollback, session, indata, kwargs, headers)
112
113 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
114 # To create the default app profile while creating the cluster
115 return self.default_profile(rollback, session, indata, kwargs, headers)
116
117 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +0000118 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +0000119 return _id
rshri2d386cb2024-07-05 14:35:51 +0000120
121
shrinithi28d887f2025-01-08 05:27:19 +0000122class ResourceTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000123 topic = "k8sresource"
124 topic_msg = "k8s_resource"
125 schema_new = resource_profile_create_new_schema
126 schema_edit = resource_profile_create_edit_schema
127
128 def __init__(self, db, fs, msg, auth):
129 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000130
131 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
132 # To create the new resource profile
133 return self.new_profile(rollback, session, indata, kwargs, headers)
134
135 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
136 # To create the default resource profile while creating the cluster
137 return self.default_profile(rollback, session, indata, kwargs, headers)
138
139 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +0000140 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +0000141 return _id
rshri2d386cb2024-07-05 14:35:51 +0000142
143
shrinithi28d887f2025-01-08 05:27:19 +0000144class ClusterTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000145 topic = "clusters"
146 topic_msg = "cluster"
147 schema_new = clustercreation_new_schema
148 schema_edit = attach_dettach_profile_schema
149
150 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000151 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000152 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
153 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
154 self.resource_topic = ResourceTopic(db, fs, msg, auth)
155 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000156
garciadeblasbecc7052024-11-20 12:04:53 +0100157 @staticmethod
158 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000159 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100160 content["current_operation"] = None
161
rshri2d386cb2024-07-05 14:35:51 +0000162 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
163 """
164 Creates a new k8scluster into database.
165 :param rollback: list to append the created items at database in case a rollback must be done
166 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
167 :param indata: params to be used for the k8cluster
168 :param kwargs: used to override the indata
169 :param headers: http request headers
170 :return: the _id of k8scluster created at database. Or an exception of type
171 EngineException, ValidationError, DbException, FsException, MsgException.
172 Note: Exceptions are not captured on purpose. They should be captured at called
173 """
174 step = "checking quotas" # first step must be defined outside try
175 try:
176 self.check_quota(session)
177 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000178 # self.check_unique_name(session, indata["name"])
179 self.cluster_unique_name_check(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000180 step = "validating input parameters"
181 cls_request = self._remove_envelop(indata)
182 self._update_input_with_kwargs(cls_request, kwargs)
183 cls_request = self._validate_input_new(cls_request, session["force"])
184 operation_params = cls_request
185
186 step = "filling cluster details from input data"
187 cls_create = self._create_cluster(
188 cls_request, rollback, session, indata, kwargs, headers
189 )
190
191 step = "creating cluster at database"
192 self.format_on_new(
193 cls_create, session["project_id"], make_public=session["public"]
194 )
rshri2d386cb2024-07-05 14:35:51 +0000195 op_id = self.format_on_operation(
196 cls_create,
197 "create",
198 operation_params,
199 )
200 _id = self.db.create(self.topic, cls_create)
garciadeblas6e88d9c2024-08-15 10:55:04 +0200201 pubkey, privkey = self._generate_age_key()
202 cls_create["age_pubkey"] = self.db.encrypt(
203 pubkey, schema_version="1.11", salt=_id
204 )
205 cls_create["age_privkey"] = self.db.encrypt(
206 privkey, schema_version="1.11", salt=_id
207 )
208 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000209 rollback.append({"topic": self.topic, "_id": _id})
210 self.db.set_one("clusters", {"_id": _id}, cls_create)
211 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
212
rshri50e34dc2024-12-02 03:10:39 +0000213 # To add the content in old collection "k8sclusters"
214 self.add_to_old_collection(cls_create, session)
215
rshri2d386cb2024-07-05 14:35:51 +0000216 return _id, None
217 except (
218 ValidationError,
219 EngineException,
220 DbException,
221 MsgException,
222 FsException,
223 ) as e:
224 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
225
226 def _create_cluster(self, cls_request, rollback, session, indata, kwargs, headers):
227 # Check whether the region name and resource group have been given
garciadeblasc3a6c492024-08-15 10:00:42 +0200228 region_given = "region_name" in indata
229 resource_group_given = "resource_group" in indata
rshri2d386cb2024-07-05 14:35:51 +0000230
231 # Get the vim_account details
232 vim_account_details = self.db.get_one(
233 "vim_accounts", {"name": cls_request["vim_account"]}
234 )
235
garciadeblasc3a6c492024-08-15 10:00:42 +0200236 # Check whether the region name and resource group have been given
237 if not region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000238 region_name = vim_account_details["config"]["region_name"]
239 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200240 elif region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000241 region_name = cls_request["region_name"]
242 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200243 elif not region_given and resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000244 region_name = vim_account_details["config"]["region_name"]
245 resource_group = cls_request["resource_group"]
246 else:
247 region_name = cls_request["region_name"]
248 resource_group = cls_request["resource_group"]
249
250 cls_desc = {
251 "name": cls_request["name"],
252 "vim_account": self.check_vim(session, cls_request["vim_account"]),
253 "k8s_version": cls_request["k8s_version"],
254 "node_size": cls_request["node_size"],
255 "node_count": cls_request["node_count"],
rshri17b09ec2024-11-07 05:48:12 +0000256 "bootstrap": cls_request["bootstrap"],
rshri2d386cb2024-07-05 14:35:51 +0000257 "region_name": region_name,
258 "resource_group": resource_group,
259 "infra_controller_profiles": [
260 self._create_default_profiles(
261 rollback, session, indata, kwargs, headers, self.infra_contr_topic
262 )
263 ],
264 "infra_config_profiles": [
265 self._create_default_profiles(
266 rollback, session, indata, kwargs, headers, self.infra_conf_topic
267 )
268 ],
269 "resource_profiles": [
270 self._create_default_profiles(
271 rollback, session, indata, kwargs, headers, self.resource_topic
272 )
273 ],
274 "app_profiles": [
275 self._create_default_profiles(
276 rollback, session, indata, kwargs, headers, self.app_topic
277 )
278 ],
279 "created": "true",
280 "state": "IN_CREATION",
281 "operatingState": "PROCESSING",
282 "git_name": self.create_gitname(cls_request, session),
283 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
284 }
rshri17b09ec2024-11-07 05:48:12 +0000285 # Add optional fields if they exist in the request
286 if "description" in cls_request:
287 cls_desc["description"] = cls_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000288 return cls_desc
289
290 def check_vim(self, session, name):
291 try:
292 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
293 if vim_account_details is not None:
294 return name
295 except ValidationError as e:
296 raise EngineException(
297 e,
298 HTTPStatus.UNPROCESSABLE_ENTITY,
299 )
300
301 def _create_default_profiles(
302 self, rollback, session, indata, kwargs, headers, topic
303 ):
304 topic = self.to_select_topic(topic)
305 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
306 return default_profiles
307
308 def to_select_topic(self, topic):
309 if topic == "infra_controller_profiles":
310 topic = self.infra_contr_topic
311 elif topic == "infra_config_profiles":
312 topic = self.infra_conf_topic
313 elif topic == "resource_profiles":
314 topic = self.resource_topic
315 elif topic == "app_profiles":
316 topic = self.app_topic
317 return topic
318
319 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
320 try:
321 filter_q = self._get_project_filter(session)
322 filter_q[self.id_field(self.topic, _id)] = _id
323 content = self.db.get_one(self.topic, filter_q)
324 existing_profiles = []
325 topic = None
326 topic = self.to_select_topic(profile)
327 for profile_id in content[profile]:
328 data = topic.show(session, profile_id, filter_q, api_req)
329 existing_profiles.append(data)
330 return existing_profiles
331 except ValidationError as e:
332 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
333
334 def state_check(self, profile_id, session, topic):
335 topic = self.to_select_topic(topic)
336 content = topic.show(session, profile_id, filter_q=None, api_req=False)
337 state = content["state"]
338 if state == "CREATED":
339 return
340 else:
341 raise EngineException(
342 f" {profile_id} is not in created state",
343 HTTPStatus.UNPROCESSABLE_ENTITY,
344 )
345
346 def edit(self, session, _id, item, indata=None, kwargs=None):
rshri50e34dc2024-12-02 03:10:39 +0000347 if item not in (
yshah99122b82024-11-18 07:05:29 +0000348 "infra_controller_profiles",
349 "infra_config_profiles",
350 "app_profiles",
351 "resource_profiles",
352 ):
353 self.schema_edit = cluster_edit_schema
354 super().edit(session, _id, indata=item, kwargs=kwargs, content=None)
rshri2d386cb2024-07-05 14:35:51 +0000355 else:
yshah99122b82024-11-18 07:05:29 +0000356 indata = self._remove_envelop(indata)
357 indata = self._validate_input_edit(
358 indata, content=None, force=session["force"]
359 )
360 if indata.get("add_profile"):
361 self.add_profile(session, _id, item, indata)
362 elif indata.get("remove_profile"):
363 self.remove_profile(session, _id, item, indata)
364 else:
365 error_msg = "Add / remove operation is only applicable"
366 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
rshri2d386cb2024-07-05 14:35:51 +0000367
368 def add_profile(self, session, _id, item, indata=None):
369 indata = self._remove_envelop(indata)
370 operation_params = indata
371 profile_id = indata["add_profile"][0]["id"]
372 # check state
373 self.state_check(profile_id, session, item)
374 filter_q = self._get_project_filter(session)
375 filter_q[self.id_field(self.topic, _id)] = _id
376 content = self.db.get_one(self.topic, filter_q)
377 profile_list = content[item]
378
379 if profile_id not in profile_list:
380 content["operatingState"] = "PROCESSING"
rshri2d386cb2024-07-05 14:35:51 +0000381 op_id = self.format_on_operation(
382 content,
383 "add",
384 operation_params,
385 )
386 self.db.set_one("clusters", {"_id": content["_id"]}, content)
387 self._send_msg(
388 "add",
389 {
390 "cluster_id": _id,
391 "profile_id": profile_id,
392 "profile_type": item,
393 "operation_id": op_id,
394 },
395 )
396 else:
397 raise EngineException(
398 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
399 )
400
401 def _get_default_profiles(self, session, topic):
402 topic = self.to_select_topic(topic)
403 existing_profiles = topic.list(session, filter_q=None, api_req=False)
404 default_profiles = [
405 profile["_id"]
406 for profile in existing_profiles
407 if profile.get("default", False)
408 ]
409 return default_profiles
410
411 def remove_profile(self, session, _id, item, indata):
412 indata = self._remove_envelop(indata)
413 operation_params = indata
414 profile_id = indata["remove_profile"][0]["id"]
415 filter_q = self._get_project_filter(session)
416 filter_q[self.id_field(self.topic, _id)] = _id
417 content = self.db.get_one(self.topic, filter_q)
418 profile_list = content[item]
419
420 default_profiles = self._get_default_profiles(session, item)
421
422 if profile_id in default_profiles:
423 raise EngineException(
424 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
425 )
426 if profile_id in profile_list:
rshri2d386cb2024-07-05 14:35:51 +0000427 op_id = self.format_on_operation(
428 content,
429 "remove",
430 operation_params,
431 )
432 self.db.set_one("clusters", {"_id": content["_id"]}, content)
433 self._send_msg(
434 "remove",
435 {
436 "cluster_id": _id,
437 "profile_id": profile_id,
438 "profile_type": item,
439 "operation_id": op_id,
440 },
441 )
442 else:
443 raise EngineException(
444 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
445 )
446
shahithyab9eb4142024-10-17 05:51:39 +0000447 def get_cluster_creds(self, session, _id, item):
yshah53cc9eb2024-07-05 13:06:31 +0000448 if not self.multiproject:
449 filter_db = {}
450 else:
451 filter_db = self._get_project_filter(session)
yshah53cc9eb2024-07-05 13:06:31 +0000452 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
garciadeblasbecc7052024-11-20 12:04:53 +0100453 operation_params = None
shahithyab9eb4142024-10-17 05:51:39 +0000454 data = self.db.get_one(self.topic, filter_db)
garciadeblasbecc7052024-11-20 12:04:53 +0100455 op_id = self.format_on_operation(data, item, operation_params)
shahithyab9eb4142024-10-17 05:51:39 +0000456 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
457 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id})
458 return op_id
459
460 def get_cluster_creds_file(self, session, _id, item, op_id):
461 if not self.multiproject:
462 filter_db = {}
463 else:
464 filter_db = self._get_project_filter(session)
465 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
shahithya8bded112024-10-15 08:01:44 +0000466
467 data = self.db.get_one(self.topic, filter_db)
shahithyab9eb4142024-10-17 05:51:39 +0000468 creds_flag = None
469 for operations in data["operationHistory"]:
470 if operations["op_id"] == op_id:
471 creds_flag = operations["result"]
472 self.logger.info("Creds Flag: {}".format(creds_flag))
shahithya8bded112024-10-15 08:01:44 +0000473
shahithyab9eb4142024-10-17 05:51:39 +0000474 if creds_flag is True:
475 credentials = data["credentials"]
shahithya8bded112024-10-15 08:01:44 +0000476
shahithyab9eb4142024-10-17 05:51:39 +0000477 file_pkg = None
478 current_path = _id
shahithya8bded112024-10-15 08:01:44 +0000479
shahithyab9eb4142024-10-17 05:51:39 +0000480 self.fs.file_delete(current_path, ignore_non_exist=True)
481 self.fs.mkdir(current_path)
482 filename = "credentials.yaml"
483 file_path = (current_path, filename)
484 self.logger.info("File path: {}".format(file_path))
485 file_pkg = self.fs.file_open(file_path, "a+b")
shahithya8bded112024-10-15 08:01:44 +0000486
shahithyab9eb4142024-10-17 05:51:39 +0000487 credentials_yaml = yaml.safe_dump(
488 credentials, indent=4, default_flow_style=False
489 )
490 file_pkg.write(credentials_yaml.encode(encoding="utf-8"))
shahithya8bded112024-10-15 08:01:44 +0000491
shahithyab9eb4142024-10-17 05:51:39 +0000492 if file_pkg:
493 file_pkg.close()
494 file_pkg = None
495 self.fs.sync(from_path=current_path)
shahithya8bded112024-10-15 08:01:44 +0000496
shahithyab9eb4142024-10-17 05:51:39 +0000497 return (
498 self.fs.file_open((current_path, filename), "rb"),
499 "text/plain",
500 )
501 else:
502 raise EngineException(
503 "Not possible to get the credentials of the cluster",
504 HTTPStatus.UNPROCESSABLE_ENTITY,
505 )
yshah53cc9eb2024-07-05 13:06:31 +0000506
507 def update_cluster(self, session, _id, item, indata):
508 if not self.multiproject:
509 filter_db = {}
510 else:
511 filter_db = self._get_project_filter(session)
512 # To allow project&user addressing by name AS WELL AS _id
513 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
yshah99122b82024-11-18 07:05:29 +0000514 validate_input(indata, cluster_update_schema)
yshah53cc9eb2024-07-05 13:06:31 +0000515 data = self.db.get_one(self.topic, filter_db)
yshah99122b82024-11-18 07:05:29 +0000516 operation_params = {}
yshah53cc9eb2024-07-05 13:06:31 +0000517 data["operatingState"] = "PROCESSING"
518 data["resourceState"] = "IN_PROGRESS"
519 operation_params = indata
yshahd0c876f2024-11-11 09:24:48 +0000520 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000521 data,
522 item,
523 operation_params,
524 )
525 self.db.set_one(self.topic, {"_id": _id}, data)
yshah53cc9eb2024-07-05 13:06:31 +0000526 data = {"cluster_id": _id, "operation_id": op_id}
527 self._send_msg(item, data)
528 return op_id
529
shrinithi28d887f2025-01-08 05:27:19 +0000530 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
531 op_id = self.common_delete(_id, db_content)
532 return {"cluster_id": _id, "operation_id": op_id}
rshri2d386cb2024-07-05 14:35:51 +0000533
shrinithi28d887f2025-01-08 05:27:19 +0000534 def delete(self, session, _id, dry_run=False, not_send_msg=None):
535 filter_q = self._get_project_filter(session)
536 filter_q[self.id_field(self.topic, _id)] = _id
537 check = self.db.get_one(self.topic, filter_q)
538 if check["created"] == "false":
539 raise EngineException(
540 "Cannot delete registered cluster",
541 HTTPStatus.UNPROCESSABLE_ENTITY,
542 )
543 super().delete(session, _id, dry_run=False, not_send_msg=None)
544
545
546class ClusterOpsTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000547 topic = "clusters"
548 topic_msg = "cluster"
rshri17b09ec2024-11-07 05:48:12 +0000549 schema_new = clusterregistration_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000550
551 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000552 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000553
garciadeblasbecc7052024-11-20 12:04:53 +0100554 @staticmethod
555 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000556 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100557 content["current_operation"] = None
558
rshri2d386cb2024-07-05 14:35:51 +0000559 def add(self, rollback, session, indata, kwargs=None, headers=None):
560 step = "checking quotas"
561 try:
562 self.check_quota(session)
563 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000564 self.cluster_unique_name_check(session, indata["name"])
565 # self.check_unique_name(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000566 step = "validating input parameters"
567 cls_add_request = self._remove_envelop(indata)
568 self._update_input_with_kwargs(cls_add_request, kwargs)
569 cls_add_request = self._validate_input_new(
570 cls_add_request, session["force"]
571 )
572 operation_params = cls_add_request
573
574 step = "filling cluster details from input data"
rshri17b09ec2024-11-07 05:48:12 +0000575 cls_add_request = self._add_cluster(cls_add_request, session)
rshri2d386cb2024-07-05 14:35:51 +0000576
rshri17b09ec2024-11-07 05:48:12 +0000577 step = "registering the cluster at database"
rshri2d386cb2024-07-05 14:35:51 +0000578 self.format_on_new(
rshri17b09ec2024-11-07 05:48:12 +0000579 cls_add_request, session["project_id"], make_public=session["public"]
rshri2d386cb2024-07-05 14:35:51 +0000580 )
rshri2d386cb2024-07-05 14:35:51 +0000581 op_id = self.format_on_operation(
rshri17b09ec2024-11-07 05:48:12 +0000582 cls_add_request,
rshri2d386cb2024-07-05 14:35:51 +0000583 "register",
584 operation_params,
585 )
rshri17b09ec2024-11-07 05:48:12 +0000586 _id = self.db.create(self.topic, cls_add_request)
garciadeblas9d9d9262024-09-25 11:25:33 +0200587 pubkey, privkey = self._generate_age_key()
rshri17b09ec2024-11-07 05:48:12 +0000588 cls_add_request["age_pubkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200589 pubkey, schema_version="1.11", salt=_id
590 )
rshri17b09ec2024-11-07 05:48:12 +0000591 cls_add_request["age_privkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200592 privkey, schema_version="1.11", salt=_id
593 )
594 # TODO: set age_pubkey and age_privkey in the default profiles
rshri17b09ec2024-11-07 05:48:12 +0000595 self.db.set_one(self.topic, {"_id": _id}, cls_add_request)
rshri2d386cb2024-07-05 14:35:51 +0000596 rollback.append({"topic": self.topic, "_id": _id})
597 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
rshri50e34dc2024-12-02 03:10:39 +0000598
599 # To add the content in old collection "k8sclusters"
600 self.add_to_old_collection(cls_add_request, session)
601
rshri2d386cb2024-07-05 14:35:51 +0000602 return _id, None
603 except (
604 ValidationError,
605 EngineException,
606 DbException,
607 MsgException,
608 FsException,
609 ) as e:
610 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
611
612 def _add_cluster(self, cls_add_request, session):
613 cls_add = {
614 "name": cls_add_request["name"],
rshri2d386cb2024-07-05 14:35:51 +0000615 "credentials": cls_add_request["credentials"],
616 "vim_account": cls_add_request["vim_account"],
rshri17b09ec2024-11-07 05:48:12 +0000617 "bootstrap": cls_add_request["bootstrap"],
rshri2d386cb2024-07-05 14:35:51 +0000618 "created": "false",
619 "state": "IN_CREATION",
620 "operatingState": "PROCESSING",
621 "git_name": self.create_gitname(cls_add_request, session),
622 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
623 }
rshri17b09ec2024-11-07 05:48:12 +0000624 # Add optional fields if they exist in the request
625 if "description" in cls_add_request:
626 cls_add["description"] = cls_add_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000627 return cls_add
628
629 def remove(self, session, _id, dry_run=False, not_send_msg=None):
630 """
631 Delete item by its internal _id
632 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
633 :param _id: server internal id
634 :param dry_run: make checking but do not delete
635 :param not_send_msg: To not send message (False) or store content (list) instead
636 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
637 """
638
639 # To allow addressing projects and users by name AS WELL AS by _id
640 if not self.multiproject:
641 filter_q = {}
642 else:
643 filter_q = self._get_project_filter(session)
644 filter_q[self.id_field(self.topic, _id)] = _id
645 item_content = self.db.get_one(self.topic, filter_q)
646
rshri2d386cb2024-07-05 14:35:51 +0000647 op_id = self.format_on_operation(
648 item_content,
649 "deregister",
650 None,
651 )
652 self.db.set_one(self.topic, {"_id": _id}, item_content)
653
654 self.check_conflict_on_del(session, _id, item_content)
655 if dry_run:
656 return None
657
658 if self.multiproject and session["project_id"]:
659 # remove reference from project_read if there are more projects referencing it. If it last one,
660 # do not remove reference, but delete
661 other_projects_referencing = next(
662 (
663 p
664 for p in item_content["_admin"]["projects_read"]
665 if p not in session["project_id"] and p != "ANY"
666 ),
667 None,
668 )
669
670 # check if there are projects referencing it (apart from ANY, that means, public)....
671 if other_projects_referencing:
672 # remove references but not delete
673 update_dict_pull = {
674 "_admin.projects_read": session["project_id"],
675 "_admin.projects_write": session["project_id"],
676 }
677 self.db.set_one(
678 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
679 )
680 return None
681 else:
682 can_write = next(
683 (
684 p
685 for p in item_content["_admin"]["projects_write"]
686 if p == "ANY" or p in session["project_id"]
687 ),
688 None,
689 )
690 if not can_write:
691 raise EngineException(
692 "You have not write permission to delete it",
693 http_code=HTTPStatus.UNAUTHORIZED,
694 )
695
696 # delete
697 self._send_msg(
698 "deregister",
699 {"cluster_id": _id, "operation_id": op_id},
700 not_send_msg=not_send_msg,
701 )
702 return None
yshah53cc9eb2024-07-05 13:06:31 +0000703
704
shrinithi28d887f2025-01-08 05:27:19 +0000705class KsusTopic(ACMTopic):
yshah53cc9eb2024-07-05 13:06:31 +0000706 topic = "ksus"
707 okapkg_topic = "okas"
708 infra_topic = "k8sinfra"
709 topic_msg = "ksu"
710 schema_new = ksu_schema
711 schema_edit = ksu_schema
712
713 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000714 super().__init__(db, fs, msg, auth)
yshah53cc9eb2024-07-05 13:06:31 +0000715 self.logger = logging.getLogger("nbi.ksus")
716
717 @staticmethod
718 def format_on_new(content, project_id=None, make_public=False):
719 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100720 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +0000721 content["state"] = "IN_CREATION"
722 content["operatingState"] = "PROCESSING"
723 content["resourceState"] = "IN_PROGRESS"
724
725 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
726 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +0000727 for ksus in indata["ksus"]:
728 content = ksus
729 oka = content["oka"][0]
730 oka_flag = ""
731 if oka["_id"]:
732 oka_flag = "_id"
shahithya8bded112024-10-15 08:01:44 +0000733 oka["sw_catalog_path"] = ""
yshah53cc9eb2024-07-05 13:06:31 +0000734 elif oka["sw_catalog_path"]:
735 oka_flag = "sw_catalog_path"
736
737 for okas in content["oka"]:
738 if okas["_id"] and okas["sw_catalog_path"]:
739 raise EngineException(
740 "Cannot create ksu with both OKA and SW catalog path",
741 HTTPStatus.UNPROCESSABLE_ENTITY,
742 )
743 if not okas["sw_catalog_path"]:
744 okas.pop("sw_catalog_path")
745 elif not okas["_id"]:
746 okas.pop("_id")
747 if oka_flag not in okas.keys():
748 raise EngineException(
749 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
750 HTTPStatus.UNPROCESSABLE_ENTITY,
751 )
752
753 # Override descriptor with query string kwargs
754 content = self._remove_envelop(content)
755 self._update_input_with_kwargs(content, kwargs)
756 content = self._validate_input_new(input=content, force=session["force"])
757
758 # Check for unique name
759 self.check_unique_name(session, content["name"])
760
761 self.check_conflict_on_new(session, content)
762
763 operation_params = {}
764 for content_key, content_value in content.items():
765 operation_params[content_key] = content_value
766 self.format_on_new(
767 content, project_id=session["project_id"], make_public=session["public"]
768 )
yshah53cc9eb2024-07-05 13:06:31 +0000769 op_id = self.format_on_operation(
770 content,
771 operation_type="create",
772 operation_params=operation_params,
773 )
774 content["git_name"] = self.create_gitname(content, session)
775
776 # Update Oka_package usage state
777 for okas in content["oka"]:
778 if "_id" in okas.keys():
779 self.update_usage_state(session, okas)
780
781 _id = self.db.create(self.topic, content)
782 rollback.append({"topic": self.topic, "_id": _id})
yshah53cc9eb2024-07-05 13:06:31 +0000783 _id_list.append(_id)
784 data = {"ksus_list": _id_list, "operation_id": op_id}
785 self._send_msg("create", data)
786 return _id_list, op_id
787
788 def clone(self, rollback, session, _id, indata, kwargs, headers):
789 filter_db = self._get_project_filter(session)
790 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
791 data = self.db.get_one(self.topic, filter_db)
792
yshah53cc9eb2024-07-05 13:06:31 +0000793 op_id = self.format_on_operation(
794 data,
795 "clone",
796 indata,
797 )
798 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
799 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
800 return op_id
801
802 def update_usage_state(self, session, oka_content):
803 _id = oka_content["_id"]
804 filter_db = self._get_project_filter(session)
805 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
806
807 data = self.db.get_one(self.okapkg_topic, filter_db)
808 if data["_admin"]["usageState"] == "NOT_IN_USE":
809 usage_state_update = {
810 "_admin.usageState": "IN_USE",
811 }
812 self.db.set_one(
813 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
814 )
815
816 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
817 indata = self._remove_envelop(indata)
818
819 # Override descriptor with query string kwargs
820 if kwargs:
821 self._update_input_with_kwargs(indata, kwargs)
822 try:
823 if indata and session.get("set_project"):
824 raise EngineException(
825 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
826 HTTPStatus.UNPROCESSABLE_ENTITY,
827 )
828 # TODO self._check_edition(session, indata, _id, force)
829 if not content:
830 content = self.show(session, _id)
831 indata = self._validate_input_edit(
832 input=indata, content=content, force=session["force"]
833 )
834 operation_params = indata
835 deep_update_rfc7396(content, indata)
836
837 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
838 _id = content.get("_id") or _id
yshahd0c876f2024-11-11 09:24:48 +0000839 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000840 content,
841 "move",
842 operation_params,
843 )
844 if content.get("_admin"):
845 now = time()
846 content["_admin"]["modified"] = now
847 content["operatingState"] = "PROCESSING"
848 content["resourceState"] = "IN_PROGRESS"
849
850 self.db.replace(self.topic, _id, content)
yshah53cc9eb2024-07-05 13:06:31 +0000851 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
852 self._send_msg("move", data)
853 return op_id
854 except ValidationError as e:
855 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
856
857 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
858 if final_content["name"] != edit_content["name"]:
859 self.check_unique_name(session, edit_content["name"])
860 return final_content
861
862 @staticmethod
863 def format_on_edit(final_content, edit_content):
shrinithi28d887f2025-01-08 05:27:19 +0000864 op_id = ACMTopic.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000865 final_content,
866 "update",
867 edit_content,
868 )
869 final_content["operatingState"] = "PROCESSING"
870 final_content["resourceState"] = "IN_PROGRESS"
871 if final_content.get("_admin"):
872 now = time()
873 final_content["_admin"]["modified"] = now
yshahd0c876f2024-11-11 09:24:48 +0000874 return op_id
yshah53cc9eb2024-07-05 13:06:31 +0000875
876 def edit(self, session, _id, indata, kwargs):
877 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +0000878 if _id == "update":
879 for ksus in indata["ksus"]:
880 content = ksus
881 _id = content["_id"]
882 _id_list.append(_id)
883 content.pop("_id")
yshahd0c876f2024-11-11 09:24:48 +0000884 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +0000885 else:
886 content = indata
887 _id_list.append(_id)
yshahd0c876f2024-11-11 09:24:48 +0000888 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +0000889
890 data = {"ksus_list": _id_list, "operation_id": op_id}
891 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +0000892
yshahd0c876f2024-11-11 09:24:48 +0000893 def edit_ksu(self, session, _id, indata, kwargs):
yshah53cc9eb2024-07-05 13:06:31 +0000894 content = None
895 indata = self._remove_envelop(indata)
896
897 # Override descriptor with query string kwargs
898 if kwargs:
899 self._update_input_with_kwargs(indata, kwargs)
900 try:
901 if indata and session.get("set_project"):
902 raise EngineException(
903 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
904 HTTPStatus.UNPROCESSABLE_ENTITY,
905 )
906 # TODO self._check_edition(session, indata, _id, force)
907 if not content:
908 content = self.show(session, _id)
909
910 for okas in indata["oka"]:
911 if not okas["_id"]:
912 okas.pop("_id")
913 if not okas["sw_catalog_path"]:
914 okas.pop("sw_catalog_path")
915
916 indata = self._validate_input_edit(indata, content, force=session["force"])
917
918 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
919 _id = content.get("_id") or _id
920
921 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
yshah53cc9eb2024-07-05 13:06:31 +0000922 op_id = self.format_on_edit(content, indata)
923 self.db.replace(self.topic, _id, content)
924 return op_id
925 except ValidationError as e:
926 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
927
928 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
929 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +0000930 if _id == "delete":
931 for ksus in indata["ksus"]:
932 content = ksus
933 _id = content["_id"]
yshah53cc9eb2024-07-05 13:06:31 +0000934 content.pop("_id")
garciadeblasac285872024-12-05 12:21:09 +0100935 op_id, not_send_msg_ksu = self.delete(session, _id)
936 if not not_send_msg_ksu:
937 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +0000938 else:
garciadeblasac285872024-12-05 12:21:09 +0100939 op_id, not_send_msg_ksu = self.delete(session, _id)
940 if not not_send_msg_ksu:
941 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +0000942
garciadeblasac285872024-12-05 12:21:09 +0100943 if _id_list:
944 data = {"ksus_list": _id_list, "operation_id": op_id}
945 self._send_msg("delete", data, not_send_msg)
yshah53cc9eb2024-07-05 13:06:31 +0000946 return op_id
947
yshahd0c876f2024-11-11 09:24:48 +0000948 def delete(self, session, _id):
yshah53cc9eb2024-07-05 13:06:31 +0000949 if not self.multiproject:
950 filter_q = {}
951 else:
952 filter_q = self._get_project_filter(session)
953 filter_q[self.id_field(self.topic, _id)] = _id
954 item_content = self.db.get_one(self.topic, filter_q)
955 item_content["state"] = "IN_DELETION"
956 item_content["operatingState"] = "PROCESSING"
957 item_content["resourceState"] = "IN_PROGRESS"
yshahd0c876f2024-11-11 09:24:48 +0000958 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000959 item_content,
960 "delete",
961 None,
962 )
963 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
964
965 if item_content["oka"][0].get("_id"):
966 used_oka = {}
967 existing_oka = []
968 for okas in item_content["oka"]:
969 used_oka["_id"] = okas["_id"]
970
971 filter = self._get_project_filter(session)
972 data = self.db.get_list(self.topic, filter)
973
974 if data:
975 for ksus in data:
976 if ksus["_id"] != _id:
977 for okas in ksus["oka"]:
shahithya8bded112024-10-15 08:01:44 +0000978 self.logger.info("OKA: {}".format(okas))
979 if okas.get("sw_catalog_path", ""):
980 continue
981 elif okas["_id"] not in existing_oka:
yshah53cc9eb2024-07-05 13:06:31 +0000982 existing_oka.append(okas["_id"])
983
984 if used_oka:
985 for oka, oka_id in used_oka.items():
986 if oka_id not in existing_oka:
987 self.db.set_one(
988 self.okapkg_topic,
989 {"_id": oka_id},
990 {"_admin.usageState": "NOT_IN_USE"},
991 )
garciadeblasac285872024-12-05 12:21:09 +0100992 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
993 not_send_msg = None
994 profile_id = item_content["profile"]["_id"]
995 profile_type = item_content["profile"]["profile_type"]
996 profile_collection_map = {
997 "app_profiles": "k8sapp",
998 "resource_profiles": "k8sresource",
999 "infra_controller_profiles": "k8sinfra_controller",
1000 "infra_config_profiles": "k8sinfra_config",
1001 }
1002 profile_collection = profile_collection_map[profile_type]
1003 profile_content = self.db.get_one(
1004 profile_collection, {"_id": profile_id}, fail_on_empty=False
1005 )
1006 if not profile_content:
1007 self.db.del_one(self.topic, filter_q)
1008 not_send_msg = True
1009 return op_id, not_send_msg
yshah53cc9eb2024-07-05 13:06:31 +00001010
1011
shrinithi28d887f2025-01-08 05:27:19 +00001012class OkaTopic(DescriptorTopic, ACMOperationTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001013 topic = "okas"
1014 topic_msg = "oka"
1015 schema_new = oka_schema
1016 schema_edit = oka_schema
1017
1018 def __init__(self, db, fs, msg, auth):
1019 super().__init__(db, fs, msg, auth)
1020 self.logger = logging.getLogger("nbi.oka")
1021
1022 @staticmethod
1023 def format_on_new(content, project_id=None, make_public=False):
1024 DescriptorTopic.format_on_new(
1025 content, project_id=project_id, make_public=make_public
1026 )
garciadeblasbecc7052024-11-20 12:04:53 +01001027 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001028 content["state"] = "PENDING_CONTENT"
1029 content["operatingState"] = "PROCESSING"
1030 content["resourceState"] = "IN_PROGRESS"
1031
1032 def check_conflict_on_del(self, session, _id, db_content):
1033 usage_state = db_content["_admin"]["usageState"]
1034 if usage_state == "IN_USE":
1035 raise EngineException(
1036 "There is a KSU using this package",
1037 http_code=HTTPStatus.CONFLICT,
1038 )
1039
1040 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
yshah00cfe8b2025-01-17 04:05:45 +00001041 if "name" in edit_content:
1042 if final_content["name"] == edit_content["name"]:
1043 name = edit_content["name"]
1044 raise EngineException(
1045 f"No update, new name for the OKA is the same: {name}",
1046 http_code=HTTPStatus.CONFLICT,
1047 )
1048 else:
1049 self.check_unique_name(session, edit_content["name"])
1050 elif (
1051 "description" in edit_content
yshah53cc9eb2024-07-05 13:06:31 +00001052 and final_content["description"] == edit_content["description"]
1053 ):
yshah00cfe8b2025-01-17 04:05:45 +00001054 description = edit_content["description"]
yshah53cc9eb2024-07-05 13:06:31 +00001055 raise EngineException(
yshah00cfe8b2025-01-17 04:05:45 +00001056 f"No update, new description for the OKA is the same: {description}",
yshah53cc9eb2024-07-05 13:06:31 +00001057 http_code=HTTPStatus.CONFLICT,
1058 )
yshah53cc9eb2024-07-05 13:06:31 +00001059 return final_content
1060
1061 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1062 indata = self._remove_envelop(indata)
1063
1064 # Override descriptor with query string kwargs
1065 if kwargs:
1066 self._update_input_with_kwargs(indata, kwargs)
1067 try:
1068 if indata and session.get("set_project"):
1069 raise EngineException(
1070 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1071 HTTPStatus.UNPROCESSABLE_ENTITY,
1072 )
1073 # TODO self._check_edition(session, indata, _id, force)
1074 if not content:
1075 content = self.show(session, _id)
1076
1077 indata = self._validate_input_edit(indata, content, force=session["force"])
1078
1079 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1080 _id = content.get("_id") or _id
1081
1082 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1083 op_id = self.format_on_edit(content, indata)
1084 deep_update_rfc7396(content, indata)
1085
1086 self.db.replace(self.topic, _id, content)
1087 return op_id
1088 except ValidationError as e:
1089 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1090
1091 def delete(self, session, _id, dry_run=False, not_send_msg=None):
1092 if not self.multiproject:
1093 filter_q = {}
1094 else:
1095 filter_q = self._get_project_filter(session)
1096 filter_q[self.id_field(self.topic, _id)] = _id
1097 item_content = self.db.get_one(self.topic, filter_q)
1098 item_content["state"] = "IN_DELETION"
1099 item_content["operatingState"] = "PROCESSING"
1100 self.check_conflict_on_del(session, _id, item_content)
yshahd0c876f2024-11-11 09:24:48 +00001101 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001102 item_content,
1103 "delete",
1104 None,
1105 )
yshah53cc9eb2024-07-05 13:06:31 +00001106 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1107 self._send_msg(
1108 "delete", {"oka_id": _id, "operation_id": op_id}, not_send_msg=not_send_msg
1109 )
yshahffcac5f2024-08-19 12:49:07 +00001110 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001111
1112 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1113 # _remove_envelop
1114 if indata:
1115 if "userDefinedData" in indata:
1116 indata = indata["userDefinedData"]
1117
1118 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1119
1120 self._update_input_with_kwargs(content, kwargs)
1121 content = BaseTopic._validate_input_new(
1122 self, input=kwargs, force=session["force"]
1123 )
1124
1125 self.check_unique_name(session, content["name"])
1126 operation_params = {}
1127 for content_key, content_value in content.items():
1128 operation_params[content_key] = content_value
1129 self.format_on_new(
1130 content, session["project_id"], make_public=session["public"]
1131 )
yshahd0c876f2024-11-11 09:24:48 +00001132 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001133 content,
1134 operation_type="create",
1135 operation_params=operation_params,
1136 )
1137 content["git_name"] = self.create_gitname(content, session)
1138 _id = self.db.create(self.topic, content)
1139 rollback.append({"topic": self.topic, "_id": _id})
yshahd0c876f2024-11-11 09:24:48 +00001140 return _id, op_id
yshah53cc9eb2024-07-05 13:06:31 +00001141
1142 def upload_content(self, session, _id, indata, kwargs, headers):
1143 current_desc = self.show(session, _id)
1144
1145 compressed = None
1146 content_type = headers.get("Content-Type")
1147 if (
1148 content_type
1149 and "application/gzip" in content_type
1150 or "application/x-gzip" in content_type
1151 ):
1152 compressed = "gzip"
1153 if content_type and "application/zip" in content_type:
1154 compressed = "zip"
1155 filename = headers.get("Content-Filename")
1156 if not filename and compressed:
1157 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1158 elif not filename:
1159 filename = "package"
1160
1161 revision = 1
1162 if "revision" in current_desc["_admin"]:
1163 revision = current_desc["_admin"]["revision"] + 1
1164
1165 file_pkg = None
1166 fs_rollback = []
1167
1168 try:
1169 start = 0
1170 # Rather than using a temp folder, we will store the package in a folder based on
1171 # the current revision.
1172 proposed_revision_path = _id + ":" + str(revision)
1173 # all the content is upload here and if ok, it is rename from id_ to is folder
1174
1175 if start:
1176 if not self.fs.file_exists(proposed_revision_path, "dir"):
1177 raise EngineException(
1178 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1179 )
1180 else:
1181 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1182 self.fs.mkdir(proposed_revision_path)
1183 fs_rollback.append(proposed_revision_path)
1184
1185 storage = self.fs.get_params()
1186 storage["folder"] = proposed_revision_path
yshah2c932bd2024-09-24 18:16:07 +00001187 storage["zipfile"] = filename
yshah53cc9eb2024-07-05 13:06:31 +00001188
1189 file_path = (proposed_revision_path, filename)
1190 file_pkg = self.fs.file_open(file_path, "a+b")
1191
yshah53cc9eb2024-07-05 13:06:31 +00001192 if isinstance(indata, dict):
1193 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1194 file_pkg.write(indata_text.encode(encoding="utf-8"))
1195 else:
1196 indata_len = 0
1197 indata = indata.file
1198 while True:
1199 indata_text = indata.read(4096)
1200 indata_len += len(indata_text)
1201 if not indata_text:
1202 break
1203 file_pkg.write(indata_text)
1204
yshah53cc9eb2024-07-05 13:06:31 +00001205 # Need to close the file package here so it can be copied from the
1206 # revision to the current, unrevisioned record
1207 if file_pkg:
1208 file_pkg.close()
1209 file_pkg = None
1210
1211 # Fetch both the incoming, proposed revision and the original revision so we
1212 # can call a validate method to compare them
1213 current_revision_path = _id + "/"
1214 self.fs.sync(from_path=current_revision_path)
1215 self.fs.sync(from_path=proposed_revision_path)
1216
garciadeblas807b8bf2024-09-23 13:03:00 +02001217 # Is this required?
yshah53cc9eb2024-07-05 13:06:31 +00001218 if revision > 1:
1219 try:
1220 self._validate_descriptor_changes(
1221 _id,
1222 filename,
1223 current_revision_path,
1224 proposed_revision_path,
1225 )
1226 except Exception as e:
1227 shutil.rmtree(
1228 self.fs.path + current_revision_path, ignore_errors=True
1229 )
1230 shutil.rmtree(
1231 self.fs.path + proposed_revision_path, ignore_errors=True
1232 )
1233 # Only delete the new revision. We need to keep the original version in place
1234 # as it has not been changed.
1235 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1236 raise e
1237
1238 indata = self._remove_envelop(indata)
1239
1240 # Override descriptor with query string kwargs
1241 if kwargs:
1242 self._update_input_with_kwargs(indata, kwargs)
1243
1244 current_desc["_admin"]["storage"] = storage
1245 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1246 current_desc["_admin"]["operationalState"] = "ENABLED"
1247 current_desc["_admin"]["modified"] = time()
1248 current_desc["_admin"]["revision"] = revision
1249
1250 deep_update_rfc7396(current_desc, indata)
1251
1252 # Copy the revision to the active package name by its original id
1253 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1254 os.rename(
1255 self.fs.path + proposed_revision_path,
1256 self.fs.path + current_revision_path,
1257 )
1258 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1259 self.fs.mkdir(current_revision_path)
1260 self.fs.reverse_sync(from_path=current_revision_path)
1261
1262 shutil.rmtree(self.fs.path + _id)
1263 kwargs = {}
1264 kwargs["package"] = filename
1265 if headers["Method"] == "POST":
1266 current_desc["state"] = "IN_CREATION"
garciadeblasbecc7052024-11-20 12:04:53 +01001267 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get(
1268 "op_id"
1269 )
yshah53cc9eb2024-07-05 13:06:31 +00001270 elif headers["Method"] in ("PUT", "PATCH"):
yshahd0c876f2024-11-11 09:24:48 +00001271 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001272 current_desc,
1273 "update",
1274 kwargs,
1275 )
1276 current_desc["operatingState"] = "PROCESSING"
1277 current_desc["resourceState"] = "IN_PROGRESS"
1278
1279 self.db.replace(self.topic, _id, current_desc)
1280
1281 # Store a copy of the package as a point in time revision
1282 revision_desc = dict(current_desc)
1283 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1284 self.db.create(self.topic + "_revisions", revision_desc)
1285 fs_rollback = []
1286
yshah53cc9eb2024-07-05 13:06:31 +00001287 if headers["Method"] == "POST":
1288 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1289 elif headers["Method"] == "PUT" or "PATCH":
1290 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1291
1292 return True
1293
1294 except EngineException:
1295 raise
1296 finally:
1297 if file_pkg:
1298 file_pkg.close()
1299 for file in fs_rollback:
1300 self.fs.file_delete(file, ignore_non_exist=True)