blob: c9304725504dc88c63bf8fad917c26b10149731e [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
yshah53cc9eb2024-07-05 13:06:31 +000018import shutil
19import os
rshri2d386cb2024-07-05 14:35:51 +000020from http import HTTPStatus
21
yshah53cc9eb2024-07-05 13:06:31 +000022from time import time
rshri2d386cb2024-07-05 14:35:51 +000023from osm_nbi.base_topic import BaseTopic, EngineException
shrinithi28d887f2025-01-08 05:27:19 +000024from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic
rshri2d386cb2024-07-05 14:35:51 +000025
yshah53cc9eb2024-07-05 13:06:31 +000026from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000027from osm_nbi.validation import (
28 ValidationError,
yshah99122b82024-11-18 07:05:29 +000029 validate_input,
rshri2d386cb2024-07-05 14:35:51 +000030 clustercreation_new_schema,
yshah99122b82024-11-18 07:05:29 +000031 cluster_edit_schema,
32 cluster_update_schema,
rshri2d386cb2024-07-05 14:35:51 +000033 infra_controller_profile_create_new_schema,
34 infra_config_profile_create_new_schema,
35 app_profile_create_new_schema,
36 resource_profile_create_new_schema,
37 infra_controller_profile_create_edit_schema,
38 infra_config_profile_create_edit_schema,
39 app_profile_create_edit_schema,
40 resource_profile_create_edit_schema,
rshri17b09ec2024-11-07 05:48:12 +000041 clusterregistration_new_schema,
rshri2d386cb2024-07-05 14:35:51 +000042 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000043 ksu_schema,
44 oka_schema,
rshri2d386cb2024-07-05 14:35:51 +000045)
yshah53cc9eb2024-07-05 13:06:31 +000046from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000047from osm_common.msgbase import MsgException
48from osm_common.fsbase import FsException
49
yshah53cc9eb2024-07-05 13:06:31 +000050__author__ = (
51 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
52 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
53)
rshri2d386cb2024-07-05 14:35:51 +000054
55
shrinithi28d887f2025-01-08 05:27:19 +000056class InfraContTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000057 topic = "k8sinfra_controller"
58 topic_msg = "k8s_infra_controller"
59 schema_new = infra_controller_profile_create_new_schema
60 schema_edit = infra_controller_profile_create_edit_schema
61
62 def __init__(self, db, fs, msg, auth):
63 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000064
65 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
66 # To create the new infra controller profile
67 return self.new_profile(rollback, session, indata, kwargs, headers)
68
69 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
70 # To create the default infra controller profile while creating the cluster
71 return self.default_profile(rollback, session, indata, kwargs, headers)
72
73 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +000074 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +000075 return _id
rshri2d386cb2024-07-05 14:35:51 +000076
77
shrinithi28d887f2025-01-08 05:27:19 +000078class InfraConfTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +000079 topic = "k8sinfra_config"
80 topic_msg = "k8s_infra_config"
81 schema_new = infra_config_profile_create_new_schema
82 schema_edit = infra_config_profile_create_edit_schema
83
84 def __init__(self, db, fs, msg, auth):
85 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000086
87 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
88 # To create the new infra config profile
89 return self.new_profile(rollback, session, indata, kwargs, headers)
90
91 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
92 # To create the default infra config profile while creating the cluster
93 return self.default_profile(rollback, session, indata, kwargs, headers)
94
95 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +000096 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +000097 return _id
rshri2d386cb2024-07-05 14:35:51 +000098
99
shrinithi28d887f2025-01-08 05:27:19 +0000100class AppTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000101 topic = "k8sapp"
102 topic_msg = "k8s_app"
103 schema_new = app_profile_create_new_schema
104 schema_edit = app_profile_create_edit_schema
105
106 def __init__(self, db, fs, msg, auth):
107 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000108
109 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
110 # To create the new app profile
111 return self.new_profile(rollback, session, indata, kwargs, headers)
112
113 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
114 # To create the default app profile while creating the cluster
115 return self.default_profile(rollback, session, indata, kwargs, headers)
116
117 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +0000118 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +0000119 return _id
rshri2d386cb2024-07-05 14:35:51 +0000120
121
shrinithi28d887f2025-01-08 05:27:19 +0000122class ResourceTopic(ProfileTopic):
rshri2d386cb2024-07-05 14:35:51 +0000123 topic = "k8sresource"
124 topic_msg = "k8s_resource"
125 schema_new = resource_profile_create_new_schema
126 schema_edit = resource_profile_create_edit_schema
127
128 def __init__(self, db, fs, msg, auth):
129 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000130
131 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
132 # To create the new resource profile
133 return self.new_profile(rollback, session, indata, kwargs, headers)
134
135 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
136 # To create the default resource profile while creating the cluster
137 return self.default_profile(rollback, session, indata, kwargs, headers)
138
139 def delete(self, session, _id, dry_run=False, not_send_msg=None):
shrinithi28d887f2025-01-08 05:27:19 +0000140 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
yshahffcac5f2024-08-19 12:49:07 +0000141 return _id
rshri2d386cb2024-07-05 14:35:51 +0000142
143
shrinithi28d887f2025-01-08 05:27:19 +0000144class ClusterTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000145 topic = "clusters"
146 topic_msg = "cluster"
147 schema_new = clustercreation_new_schema
148 schema_edit = attach_dettach_profile_schema
149
150 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000151 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000152 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
153 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
154 self.resource_topic = ResourceTopic(db, fs, msg, auth)
155 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000156
garciadeblasbecc7052024-11-20 12:04:53 +0100157 @staticmethod
158 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000159 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100160 content["current_operation"] = None
161
rshri2d386cb2024-07-05 14:35:51 +0000162 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
163 """
164 Creates a new k8scluster into database.
165 :param rollback: list to append the created items at database in case a rollback must be done
166 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
167 :param indata: params to be used for the k8cluster
168 :param kwargs: used to override the indata
169 :param headers: http request headers
170 :return: the _id of k8scluster created at database. Or an exception of type
171 EngineException, ValidationError, DbException, FsException, MsgException.
172 Note: Exceptions are not captured on purpose. They should be captured at called
173 """
174 step = "checking quotas" # first step must be defined outside try
175 try:
176 self.check_quota(session)
177 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000178 # self.check_unique_name(session, indata["name"])
179 self.cluster_unique_name_check(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000180 step = "validating input parameters"
181 cls_request = self._remove_envelop(indata)
182 self._update_input_with_kwargs(cls_request, kwargs)
183 cls_request = self._validate_input_new(cls_request, session["force"])
184 operation_params = cls_request
185
186 step = "filling cluster details from input data"
187 cls_create = self._create_cluster(
188 cls_request, rollback, session, indata, kwargs, headers
189 )
190
191 step = "creating cluster at database"
192 self.format_on_new(
193 cls_create, session["project_id"], make_public=session["public"]
194 )
rshri2d386cb2024-07-05 14:35:51 +0000195 op_id = self.format_on_operation(
196 cls_create,
197 "create",
198 operation_params,
199 )
200 _id = self.db.create(self.topic, cls_create)
garciadeblas6e88d9c2024-08-15 10:55:04 +0200201 pubkey, privkey = self._generate_age_key()
202 cls_create["age_pubkey"] = self.db.encrypt(
203 pubkey, schema_version="1.11", salt=_id
204 )
205 cls_create["age_privkey"] = self.db.encrypt(
206 privkey, schema_version="1.11", salt=_id
207 )
208 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000209 rollback.append({"topic": self.topic, "_id": _id})
210 self.db.set_one("clusters", {"_id": _id}, cls_create)
211 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
212
rshri50e34dc2024-12-02 03:10:39 +0000213 # To add the content in old collection "k8sclusters"
214 self.add_to_old_collection(cls_create, session)
215
rshri2d386cb2024-07-05 14:35:51 +0000216 return _id, None
217 except (
218 ValidationError,
219 EngineException,
220 DbException,
221 MsgException,
222 FsException,
223 ) as e:
224 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
225
226 def _create_cluster(self, cls_request, rollback, session, indata, kwargs, headers):
227 # Check whether the region name and resource group have been given
garciadeblasc3a6c492024-08-15 10:00:42 +0200228 region_given = "region_name" in indata
229 resource_group_given = "resource_group" in indata
rshri2d386cb2024-07-05 14:35:51 +0000230
231 # Get the vim_account details
232 vim_account_details = self.db.get_one(
233 "vim_accounts", {"name": cls_request["vim_account"]}
234 )
235
garciadeblasc3a6c492024-08-15 10:00:42 +0200236 # Check whether the region name and resource group have been given
237 if not region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000238 region_name = vim_account_details["config"]["region_name"]
239 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200240 elif region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000241 region_name = cls_request["region_name"]
242 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200243 elif not region_given and resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000244 region_name = vim_account_details["config"]["region_name"]
245 resource_group = cls_request["resource_group"]
246 else:
247 region_name = cls_request["region_name"]
248 resource_group = cls_request["resource_group"]
249
250 cls_desc = {
251 "name": cls_request["name"],
252 "vim_account": self.check_vim(session, cls_request["vim_account"]),
253 "k8s_version": cls_request["k8s_version"],
254 "node_size": cls_request["node_size"],
255 "node_count": cls_request["node_count"],
rshri17b09ec2024-11-07 05:48:12 +0000256 "bootstrap": cls_request["bootstrap"],
rshri2d386cb2024-07-05 14:35:51 +0000257 "region_name": region_name,
258 "resource_group": resource_group,
259 "infra_controller_profiles": [
260 self._create_default_profiles(
261 rollback, session, indata, kwargs, headers, self.infra_contr_topic
262 )
263 ],
264 "infra_config_profiles": [
265 self._create_default_profiles(
266 rollback, session, indata, kwargs, headers, self.infra_conf_topic
267 )
268 ],
269 "resource_profiles": [
270 self._create_default_profiles(
271 rollback, session, indata, kwargs, headers, self.resource_topic
272 )
273 ],
274 "app_profiles": [
275 self._create_default_profiles(
276 rollback, session, indata, kwargs, headers, self.app_topic
277 )
278 ],
279 "created": "true",
280 "state": "IN_CREATION",
281 "operatingState": "PROCESSING",
282 "git_name": self.create_gitname(cls_request, session),
283 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
284 }
rshri17b09ec2024-11-07 05:48:12 +0000285 # Add optional fields if they exist in the request
286 if "description" in cls_request:
287 cls_desc["description"] = cls_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000288 return cls_desc
289
290 def check_vim(self, session, name):
291 try:
292 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
293 if vim_account_details is not None:
294 return name
295 except ValidationError as e:
296 raise EngineException(
297 e,
298 HTTPStatus.UNPROCESSABLE_ENTITY,
299 )
300
301 def _create_default_profiles(
302 self, rollback, session, indata, kwargs, headers, topic
303 ):
304 topic = self.to_select_topic(topic)
305 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
306 return default_profiles
307
308 def to_select_topic(self, topic):
309 if topic == "infra_controller_profiles":
310 topic = self.infra_contr_topic
311 elif topic == "infra_config_profiles":
312 topic = self.infra_conf_topic
313 elif topic == "resource_profiles":
314 topic = self.resource_topic
315 elif topic == "app_profiles":
316 topic = self.app_topic
317 return topic
318
319 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
320 try:
321 filter_q = self._get_project_filter(session)
322 filter_q[self.id_field(self.topic, _id)] = _id
323 content = self.db.get_one(self.topic, filter_q)
324 existing_profiles = []
325 topic = None
326 topic = self.to_select_topic(profile)
327 for profile_id in content[profile]:
328 data = topic.show(session, profile_id, filter_q, api_req)
329 existing_profiles.append(data)
330 return existing_profiles
331 except ValidationError as e:
332 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
333
334 def state_check(self, profile_id, session, topic):
335 topic = self.to_select_topic(topic)
336 content = topic.show(session, profile_id, filter_q=None, api_req=False)
337 state = content["state"]
338 if state == "CREATED":
339 return
340 else:
341 raise EngineException(
342 f" {profile_id} is not in created state",
343 HTTPStatus.UNPROCESSABLE_ENTITY,
344 )
345
346 def edit(self, session, _id, item, indata=None, kwargs=None):
rshri50e34dc2024-12-02 03:10:39 +0000347 if item not in (
yshah99122b82024-11-18 07:05:29 +0000348 "infra_controller_profiles",
349 "infra_config_profiles",
350 "app_profiles",
351 "resource_profiles",
352 ):
353 self.schema_edit = cluster_edit_schema
354 super().edit(session, _id, indata=item, kwargs=kwargs, content=None)
rshri2d386cb2024-07-05 14:35:51 +0000355 else:
yshah99122b82024-11-18 07:05:29 +0000356 indata = self._remove_envelop(indata)
357 indata = self._validate_input_edit(
358 indata, content=None, force=session["force"]
359 )
360 if indata.get("add_profile"):
361 self.add_profile(session, _id, item, indata)
362 elif indata.get("remove_profile"):
363 self.remove_profile(session, _id, item, indata)
364 else:
365 error_msg = "Add / remove operation is only applicable"
366 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
rshri2d386cb2024-07-05 14:35:51 +0000367
yshah00a620a2025-01-16 12:06:40 +0000368 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
369 check = self.db.get_one(self.topic, {"_id": _id})
370 if "name" in indata and check["name"] != indata["name"]:
371 self.check_unique_name(session, indata["name"])
372 _filter = {"name": indata["name"]}
373 topic_list = [
374 "k8sclusters",
375 "k8sinfra_controller",
376 "k8sinfra_config",
377 "k8sapp",
378 "k8sresource",
379 ]
380 # Check unique name for k8scluster and profiles
381 for topic in topic_list:
382 if self.db.get_one(
383 topic, _filter, fail_on_empty=False, fail_on_more=False
384 ):
385 raise EngineException(
386 "name '{}' already exists for {}".format(indata["name"], topic),
387 HTTPStatus.CONFLICT,
388 )
389 # Replace name in k8scluster and profiles
390 for topic in topic_list:
391 data = self.db.get_one(topic, {"name": check["name"]})
392 data["name"] = indata["name"]
393 self.db.replace(topic, data["_id"], data)
394 return True
395
rshri2d386cb2024-07-05 14:35:51 +0000396 def add_profile(self, session, _id, item, indata=None):
397 indata = self._remove_envelop(indata)
398 operation_params = indata
399 profile_id = indata["add_profile"][0]["id"]
400 # check state
401 self.state_check(profile_id, session, item)
402 filter_q = self._get_project_filter(session)
403 filter_q[self.id_field(self.topic, _id)] = _id
404 content = self.db.get_one(self.topic, filter_q)
405 profile_list = content[item]
406
407 if profile_id not in profile_list:
408 content["operatingState"] = "PROCESSING"
rshri2d386cb2024-07-05 14:35:51 +0000409 op_id = self.format_on_operation(
410 content,
411 "add",
412 operation_params,
413 )
414 self.db.set_one("clusters", {"_id": content["_id"]}, content)
415 self._send_msg(
416 "add",
417 {
418 "cluster_id": _id,
419 "profile_id": profile_id,
420 "profile_type": item,
421 "operation_id": op_id,
422 },
423 )
424 else:
425 raise EngineException(
426 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
427 )
428
429 def _get_default_profiles(self, session, topic):
430 topic = self.to_select_topic(topic)
431 existing_profiles = topic.list(session, filter_q=None, api_req=False)
432 default_profiles = [
433 profile["_id"]
434 for profile in existing_profiles
435 if profile.get("default", False)
436 ]
437 return default_profiles
438
439 def remove_profile(self, session, _id, item, indata):
440 indata = self._remove_envelop(indata)
441 operation_params = indata
442 profile_id = indata["remove_profile"][0]["id"]
443 filter_q = self._get_project_filter(session)
444 filter_q[self.id_field(self.topic, _id)] = _id
445 content = self.db.get_one(self.topic, filter_q)
446 profile_list = content[item]
447
448 default_profiles = self._get_default_profiles(session, item)
449
450 if profile_id in default_profiles:
451 raise EngineException(
452 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
453 )
454 if profile_id in profile_list:
rshri2d386cb2024-07-05 14:35:51 +0000455 op_id = self.format_on_operation(
456 content,
457 "remove",
458 operation_params,
459 )
460 self.db.set_one("clusters", {"_id": content["_id"]}, content)
461 self._send_msg(
462 "remove",
463 {
464 "cluster_id": _id,
465 "profile_id": profile_id,
466 "profile_type": item,
467 "operation_id": op_id,
468 },
469 )
470 else:
471 raise EngineException(
472 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
473 )
474
shahithyab9eb4142024-10-17 05:51:39 +0000475 def get_cluster_creds(self, session, _id, item):
yshah53cc9eb2024-07-05 13:06:31 +0000476 if not self.multiproject:
477 filter_db = {}
478 else:
479 filter_db = self._get_project_filter(session)
yshah53cc9eb2024-07-05 13:06:31 +0000480 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
garciadeblasbecc7052024-11-20 12:04:53 +0100481 operation_params = None
shahithyab9eb4142024-10-17 05:51:39 +0000482 data = self.db.get_one(self.topic, filter_db)
garciadeblasbecc7052024-11-20 12:04:53 +0100483 op_id = self.format_on_operation(data, item, operation_params)
shahithyab9eb4142024-10-17 05:51:39 +0000484 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
485 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id})
486 return op_id
487
488 def get_cluster_creds_file(self, session, _id, item, op_id):
489 if not self.multiproject:
490 filter_db = {}
491 else:
492 filter_db = self._get_project_filter(session)
493 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
shahithya8bded112024-10-15 08:01:44 +0000494
495 data = self.db.get_one(self.topic, filter_db)
shahithyab9eb4142024-10-17 05:51:39 +0000496 creds_flag = None
497 for operations in data["operationHistory"]:
498 if operations["op_id"] == op_id:
499 creds_flag = operations["result"]
500 self.logger.info("Creds Flag: {}".format(creds_flag))
shahithya8bded112024-10-15 08:01:44 +0000501
shahithyab9eb4142024-10-17 05:51:39 +0000502 if creds_flag is True:
503 credentials = data["credentials"]
shahithya8bded112024-10-15 08:01:44 +0000504
shahithyab9eb4142024-10-17 05:51:39 +0000505 file_pkg = None
506 current_path = _id
shahithya8bded112024-10-15 08:01:44 +0000507
shahithyab9eb4142024-10-17 05:51:39 +0000508 self.fs.file_delete(current_path, ignore_non_exist=True)
509 self.fs.mkdir(current_path)
510 filename = "credentials.yaml"
511 file_path = (current_path, filename)
512 self.logger.info("File path: {}".format(file_path))
513 file_pkg = self.fs.file_open(file_path, "a+b")
shahithya8bded112024-10-15 08:01:44 +0000514
shahithyab9eb4142024-10-17 05:51:39 +0000515 credentials_yaml = yaml.safe_dump(
516 credentials, indent=4, default_flow_style=False
517 )
518 file_pkg.write(credentials_yaml.encode(encoding="utf-8"))
shahithya8bded112024-10-15 08:01:44 +0000519
shahithyab9eb4142024-10-17 05:51:39 +0000520 if file_pkg:
521 file_pkg.close()
522 file_pkg = None
523 self.fs.sync(from_path=current_path)
shahithya8bded112024-10-15 08:01:44 +0000524
shahithyab9eb4142024-10-17 05:51:39 +0000525 return (
526 self.fs.file_open((current_path, filename), "rb"),
527 "text/plain",
528 )
529 else:
530 raise EngineException(
531 "Not possible to get the credentials of the cluster",
532 HTTPStatus.UNPROCESSABLE_ENTITY,
533 )
yshah53cc9eb2024-07-05 13:06:31 +0000534
535 def update_cluster(self, session, _id, item, indata):
536 if not self.multiproject:
537 filter_db = {}
538 else:
539 filter_db = self._get_project_filter(session)
540 # To allow project&user addressing by name AS WELL AS _id
541 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
yshah99122b82024-11-18 07:05:29 +0000542 validate_input(indata, cluster_update_schema)
yshah53cc9eb2024-07-05 13:06:31 +0000543 data = self.db.get_one(self.topic, filter_db)
yshah99122b82024-11-18 07:05:29 +0000544 operation_params = {}
yshah53cc9eb2024-07-05 13:06:31 +0000545 data["operatingState"] = "PROCESSING"
546 data["resourceState"] = "IN_PROGRESS"
547 operation_params = indata
yshahd0c876f2024-11-11 09:24:48 +0000548 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000549 data,
550 item,
551 operation_params,
552 )
553 self.db.set_one(self.topic, {"_id": _id}, data)
yshah53cc9eb2024-07-05 13:06:31 +0000554 data = {"cluster_id": _id, "operation_id": op_id}
555 self._send_msg(item, data)
556 return op_id
557
shrinithi28d887f2025-01-08 05:27:19 +0000558 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
559 op_id = self.common_delete(_id, db_content)
560 return {"cluster_id": _id, "operation_id": op_id}
rshri2d386cb2024-07-05 14:35:51 +0000561
shrinithi28d887f2025-01-08 05:27:19 +0000562 def delete(self, session, _id, dry_run=False, not_send_msg=None):
563 filter_q = self._get_project_filter(session)
564 filter_q[self.id_field(self.topic, _id)] = _id
565 check = self.db.get_one(self.topic, filter_q)
566 if check["created"] == "false":
567 raise EngineException(
568 "Cannot delete registered cluster",
569 HTTPStatus.UNPROCESSABLE_ENTITY,
570 )
571 super().delete(session, _id, dry_run=False, not_send_msg=None)
572
573
574class ClusterOpsTopic(ACMTopic):
rshri2d386cb2024-07-05 14:35:51 +0000575 topic = "clusters"
576 topic_msg = "cluster"
rshri17b09ec2024-11-07 05:48:12 +0000577 schema_new = clusterregistration_new_schema
rshri2d386cb2024-07-05 14:35:51 +0000578
579 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000580 super().__init__(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000581
garciadeblasbecc7052024-11-20 12:04:53 +0100582 @staticmethod
583 def format_on_new(content, project_id=None, make_public=False):
shrinithi28d887f2025-01-08 05:27:19 +0000584 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100585 content["current_operation"] = None
586
rshri2d386cb2024-07-05 14:35:51 +0000587 def add(self, rollback, session, indata, kwargs=None, headers=None):
588 step = "checking quotas"
589 try:
590 self.check_quota(session)
591 step = "name unique check"
shrinithi28d887f2025-01-08 05:27:19 +0000592 self.cluster_unique_name_check(session, indata["name"])
593 # self.check_unique_name(session, indata["name"])
rshri2d386cb2024-07-05 14:35:51 +0000594 step = "validating input parameters"
595 cls_add_request = self._remove_envelop(indata)
596 self._update_input_with_kwargs(cls_add_request, kwargs)
597 cls_add_request = self._validate_input_new(
598 cls_add_request, session["force"]
599 )
600 operation_params = cls_add_request
601
602 step = "filling cluster details from input data"
rshri17b09ec2024-11-07 05:48:12 +0000603 cls_add_request = self._add_cluster(cls_add_request, session)
rshri2d386cb2024-07-05 14:35:51 +0000604
rshri17b09ec2024-11-07 05:48:12 +0000605 step = "registering the cluster at database"
rshri2d386cb2024-07-05 14:35:51 +0000606 self.format_on_new(
rshri17b09ec2024-11-07 05:48:12 +0000607 cls_add_request, session["project_id"], make_public=session["public"]
rshri2d386cb2024-07-05 14:35:51 +0000608 )
rshri2d386cb2024-07-05 14:35:51 +0000609 op_id = self.format_on_operation(
rshri17b09ec2024-11-07 05:48:12 +0000610 cls_add_request,
rshri2d386cb2024-07-05 14:35:51 +0000611 "register",
612 operation_params,
613 )
rshri17b09ec2024-11-07 05:48:12 +0000614 _id = self.db.create(self.topic, cls_add_request)
garciadeblas9d9d9262024-09-25 11:25:33 +0200615 pubkey, privkey = self._generate_age_key()
rshri17b09ec2024-11-07 05:48:12 +0000616 cls_add_request["age_pubkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200617 pubkey, schema_version="1.11", salt=_id
618 )
rshri17b09ec2024-11-07 05:48:12 +0000619 cls_add_request["age_privkey"] = self.db.encrypt(
garciadeblas9d9d9262024-09-25 11:25:33 +0200620 privkey, schema_version="1.11", salt=_id
621 )
622 # TODO: set age_pubkey and age_privkey in the default profiles
rshri17b09ec2024-11-07 05:48:12 +0000623 self.db.set_one(self.topic, {"_id": _id}, cls_add_request)
rshri2d386cb2024-07-05 14:35:51 +0000624 rollback.append({"topic": self.topic, "_id": _id})
625 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
rshri50e34dc2024-12-02 03:10:39 +0000626
627 # To add the content in old collection "k8sclusters"
628 self.add_to_old_collection(cls_add_request, session)
629
rshri2d386cb2024-07-05 14:35:51 +0000630 return _id, None
631 except (
632 ValidationError,
633 EngineException,
634 DbException,
635 MsgException,
636 FsException,
637 ) as e:
638 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
639
640 def _add_cluster(self, cls_add_request, session):
641 cls_add = {
642 "name": cls_add_request["name"],
rshri2d386cb2024-07-05 14:35:51 +0000643 "credentials": cls_add_request["credentials"],
644 "vim_account": cls_add_request["vim_account"],
rshri17b09ec2024-11-07 05:48:12 +0000645 "bootstrap": cls_add_request["bootstrap"],
rshri2d386cb2024-07-05 14:35:51 +0000646 "created": "false",
647 "state": "IN_CREATION",
648 "operatingState": "PROCESSING",
649 "git_name": self.create_gitname(cls_add_request, session),
650 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
651 }
rshri17b09ec2024-11-07 05:48:12 +0000652 # Add optional fields if they exist in the request
653 if "description" in cls_add_request:
654 cls_add["description"] = cls_add_request["description"]
rshri2d386cb2024-07-05 14:35:51 +0000655 return cls_add
656
657 def remove(self, session, _id, dry_run=False, not_send_msg=None):
658 """
659 Delete item by its internal _id
660 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
661 :param _id: server internal id
662 :param dry_run: make checking but do not delete
663 :param not_send_msg: To not send message (False) or store content (list) instead
664 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
665 """
666
667 # To allow addressing projects and users by name AS WELL AS by _id
668 if not self.multiproject:
669 filter_q = {}
670 else:
671 filter_q = self._get_project_filter(session)
672 filter_q[self.id_field(self.topic, _id)] = _id
673 item_content = self.db.get_one(self.topic, filter_q)
674
rshri2d386cb2024-07-05 14:35:51 +0000675 op_id = self.format_on_operation(
676 item_content,
677 "deregister",
678 None,
679 )
680 self.db.set_one(self.topic, {"_id": _id}, item_content)
681
682 self.check_conflict_on_del(session, _id, item_content)
683 if dry_run:
684 return None
685
686 if self.multiproject and session["project_id"]:
687 # remove reference from project_read if there are more projects referencing it. If it last one,
688 # do not remove reference, but delete
689 other_projects_referencing = next(
690 (
691 p
692 for p in item_content["_admin"]["projects_read"]
693 if p not in session["project_id"] and p != "ANY"
694 ),
695 None,
696 )
697
698 # check if there are projects referencing it (apart from ANY, that means, public)....
699 if other_projects_referencing:
700 # remove references but not delete
701 update_dict_pull = {
702 "_admin.projects_read": session["project_id"],
703 "_admin.projects_write": session["project_id"],
704 }
705 self.db.set_one(
706 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
707 )
708 return None
709 else:
710 can_write = next(
711 (
712 p
713 for p in item_content["_admin"]["projects_write"]
714 if p == "ANY" or p in session["project_id"]
715 ),
716 None,
717 )
718 if not can_write:
719 raise EngineException(
720 "You have not write permission to delete it",
721 http_code=HTTPStatus.UNAUTHORIZED,
722 )
723
724 # delete
725 self._send_msg(
726 "deregister",
727 {"cluster_id": _id, "operation_id": op_id},
728 not_send_msg=not_send_msg,
729 )
730 return None
yshah53cc9eb2024-07-05 13:06:31 +0000731
732
shrinithi28d887f2025-01-08 05:27:19 +0000733class KsusTopic(ACMTopic):
yshah53cc9eb2024-07-05 13:06:31 +0000734 topic = "ksus"
735 okapkg_topic = "okas"
736 infra_topic = "k8sinfra"
737 topic_msg = "ksu"
738 schema_new = ksu_schema
739 schema_edit = ksu_schema
740
741 def __init__(self, db, fs, msg, auth):
shrinithi28d887f2025-01-08 05:27:19 +0000742 super().__init__(db, fs, msg, auth)
yshah53cc9eb2024-07-05 13:06:31 +0000743 self.logger = logging.getLogger("nbi.ksus")
744
745 @staticmethod
746 def format_on_new(content, project_id=None, make_public=False):
747 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
garciadeblasbecc7052024-11-20 12:04:53 +0100748 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +0000749 content["state"] = "IN_CREATION"
750 content["operatingState"] = "PROCESSING"
751 content["resourceState"] = "IN_PROGRESS"
752
753 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
754 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +0000755 for ksus in indata["ksus"]:
756 content = ksus
757 oka = content["oka"][0]
758 oka_flag = ""
759 if oka["_id"]:
760 oka_flag = "_id"
shahithya8bded112024-10-15 08:01:44 +0000761 oka["sw_catalog_path"] = ""
yshah53cc9eb2024-07-05 13:06:31 +0000762 elif oka["sw_catalog_path"]:
763 oka_flag = "sw_catalog_path"
764
765 for okas in content["oka"]:
766 if okas["_id"] and okas["sw_catalog_path"]:
767 raise EngineException(
768 "Cannot create ksu with both OKA and SW catalog path",
769 HTTPStatus.UNPROCESSABLE_ENTITY,
770 )
771 if not okas["sw_catalog_path"]:
772 okas.pop("sw_catalog_path")
773 elif not okas["_id"]:
774 okas.pop("_id")
775 if oka_flag not in okas.keys():
776 raise EngineException(
777 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
778 HTTPStatus.UNPROCESSABLE_ENTITY,
779 )
780
781 # Override descriptor with query string kwargs
782 content = self._remove_envelop(content)
783 self._update_input_with_kwargs(content, kwargs)
784 content = self._validate_input_new(input=content, force=session["force"])
785
786 # Check for unique name
787 self.check_unique_name(session, content["name"])
788
789 self.check_conflict_on_new(session, content)
790
791 operation_params = {}
792 for content_key, content_value in content.items():
793 operation_params[content_key] = content_value
794 self.format_on_new(
795 content, project_id=session["project_id"], make_public=session["public"]
796 )
yshah53cc9eb2024-07-05 13:06:31 +0000797 op_id = self.format_on_operation(
798 content,
799 operation_type="create",
800 operation_params=operation_params,
801 )
802 content["git_name"] = self.create_gitname(content, session)
803
804 # Update Oka_package usage state
805 for okas in content["oka"]:
806 if "_id" in okas.keys():
807 self.update_usage_state(session, okas)
808
809 _id = self.db.create(self.topic, content)
810 rollback.append({"topic": self.topic, "_id": _id})
yshah53cc9eb2024-07-05 13:06:31 +0000811 _id_list.append(_id)
812 data = {"ksus_list": _id_list, "operation_id": op_id}
813 self._send_msg("create", data)
814 return _id_list, op_id
815
816 def clone(self, rollback, session, _id, indata, kwargs, headers):
817 filter_db = self._get_project_filter(session)
818 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
819 data = self.db.get_one(self.topic, filter_db)
820
yshah53cc9eb2024-07-05 13:06:31 +0000821 op_id = self.format_on_operation(
822 data,
823 "clone",
824 indata,
825 )
826 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
827 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
828 return op_id
829
830 def update_usage_state(self, session, oka_content):
831 _id = oka_content["_id"]
832 filter_db = self._get_project_filter(session)
833 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
834
835 data = self.db.get_one(self.okapkg_topic, filter_db)
836 if data["_admin"]["usageState"] == "NOT_IN_USE":
837 usage_state_update = {
838 "_admin.usageState": "IN_USE",
839 }
840 self.db.set_one(
841 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
842 )
843
844 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
845 indata = self._remove_envelop(indata)
846
847 # Override descriptor with query string kwargs
848 if kwargs:
849 self._update_input_with_kwargs(indata, kwargs)
850 try:
851 if indata and session.get("set_project"):
852 raise EngineException(
853 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
854 HTTPStatus.UNPROCESSABLE_ENTITY,
855 )
856 # TODO self._check_edition(session, indata, _id, force)
857 if not content:
858 content = self.show(session, _id)
859 indata = self._validate_input_edit(
860 input=indata, content=content, force=session["force"]
861 )
862 operation_params = indata
863 deep_update_rfc7396(content, indata)
864
865 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
866 _id = content.get("_id") or _id
yshahd0c876f2024-11-11 09:24:48 +0000867 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000868 content,
869 "move",
870 operation_params,
871 )
872 if content.get("_admin"):
873 now = time()
874 content["_admin"]["modified"] = now
875 content["operatingState"] = "PROCESSING"
876 content["resourceState"] = "IN_PROGRESS"
877
878 self.db.replace(self.topic, _id, content)
yshah53cc9eb2024-07-05 13:06:31 +0000879 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
880 self._send_msg("move", data)
881 return op_id
882 except ValidationError as e:
883 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
884
885 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
886 if final_content["name"] != edit_content["name"]:
887 self.check_unique_name(session, edit_content["name"])
888 return final_content
889
890 @staticmethod
891 def format_on_edit(final_content, edit_content):
shrinithi28d887f2025-01-08 05:27:19 +0000892 op_id = ACMTopic.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000893 final_content,
894 "update",
895 edit_content,
896 )
897 final_content["operatingState"] = "PROCESSING"
898 final_content["resourceState"] = "IN_PROGRESS"
899 if final_content.get("_admin"):
900 now = time()
901 final_content["_admin"]["modified"] = now
yshahd0c876f2024-11-11 09:24:48 +0000902 return op_id
yshah53cc9eb2024-07-05 13:06:31 +0000903
904 def edit(self, session, _id, indata, kwargs):
905 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +0000906 if _id == "update":
907 for ksus in indata["ksus"]:
908 content = ksus
909 _id = content["_id"]
910 _id_list.append(_id)
911 content.pop("_id")
yshahd0c876f2024-11-11 09:24:48 +0000912 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +0000913 else:
914 content = indata
915 _id_list.append(_id)
yshahd0c876f2024-11-11 09:24:48 +0000916 op_id = self.edit_ksu(session, _id, content, kwargs)
yshah53cc9eb2024-07-05 13:06:31 +0000917
918 data = {"ksus_list": _id_list, "operation_id": op_id}
919 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +0000920
yshahd0c876f2024-11-11 09:24:48 +0000921 def edit_ksu(self, session, _id, indata, kwargs):
yshah53cc9eb2024-07-05 13:06:31 +0000922 content = None
923 indata = self._remove_envelop(indata)
924
925 # Override descriptor with query string kwargs
926 if kwargs:
927 self._update_input_with_kwargs(indata, kwargs)
928 try:
929 if indata and session.get("set_project"):
930 raise EngineException(
931 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
932 HTTPStatus.UNPROCESSABLE_ENTITY,
933 )
934 # TODO self._check_edition(session, indata, _id, force)
935 if not content:
936 content = self.show(session, _id)
937
938 for okas in indata["oka"]:
939 if not okas["_id"]:
940 okas.pop("_id")
941 if not okas["sw_catalog_path"]:
942 okas.pop("sw_catalog_path")
943
944 indata = self._validate_input_edit(indata, content, force=session["force"])
945
946 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
947 _id = content.get("_id") or _id
948
949 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
yshah53cc9eb2024-07-05 13:06:31 +0000950 op_id = self.format_on_edit(content, indata)
951 self.db.replace(self.topic, _id, content)
952 return op_id
953 except ValidationError as e:
954 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
955
956 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
957 _id_list = []
yshah53cc9eb2024-07-05 13:06:31 +0000958 if _id == "delete":
959 for ksus in indata["ksus"]:
960 content = ksus
961 _id = content["_id"]
yshah53cc9eb2024-07-05 13:06:31 +0000962 content.pop("_id")
garciadeblasac285872024-12-05 12:21:09 +0100963 op_id, not_send_msg_ksu = self.delete(session, _id)
964 if not not_send_msg_ksu:
965 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +0000966 else:
garciadeblasac285872024-12-05 12:21:09 +0100967 op_id, not_send_msg_ksu = self.delete(session, _id)
968 if not not_send_msg_ksu:
969 _id_list.append(_id)
yshah53cc9eb2024-07-05 13:06:31 +0000970
garciadeblasac285872024-12-05 12:21:09 +0100971 if _id_list:
972 data = {"ksus_list": _id_list, "operation_id": op_id}
973 self._send_msg("delete", data, not_send_msg)
yshah53cc9eb2024-07-05 13:06:31 +0000974 return op_id
975
yshahd0c876f2024-11-11 09:24:48 +0000976 def delete(self, session, _id):
yshah53cc9eb2024-07-05 13:06:31 +0000977 if not self.multiproject:
978 filter_q = {}
979 else:
980 filter_q = self._get_project_filter(session)
981 filter_q[self.id_field(self.topic, _id)] = _id
982 item_content = self.db.get_one(self.topic, filter_q)
983 item_content["state"] = "IN_DELETION"
984 item_content["operatingState"] = "PROCESSING"
985 item_content["resourceState"] = "IN_PROGRESS"
yshahd0c876f2024-11-11 09:24:48 +0000986 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +0000987 item_content,
988 "delete",
989 None,
990 )
991 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
992
993 if item_content["oka"][0].get("_id"):
994 used_oka = {}
995 existing_oka = []
996 for okas in item_content["oka"]:
997 used_oka["_id"] = okas["_id"]
998
999 filter = self._get_project_filter(session)
1000 data = self.db.get_list(self.topic, filter)
1001
1002 if data:
1003 for ksus in data:
1004 if ksus["_id"] != _id:
1005 for okas in ksus["oka"]:
shahithya8bded112024-10-15 08:01:44 +00001006 self.logger.info("OKA: {}".format(okas))
1007 if okas.get("sw_catalog_path", ""):
1008 continue
1009 elif okas["_id"] not in existing_oka:
yshah53cc9eb2024-07-05 13:06:31 +00001010 existing_oka.append(okas["_id"])
1011
1012 if used_oka:
1013 for oka, oka_id in used_oka.items():
1014 if oka_id not in existing_oka:
1015 self.db.set_one(
1016 self.okapkg_topic,
1017 {"_id": oka_id},
1018 {"_admin.usageState": "NOT_IN_USE"},
1019 )
garciadeblasac285872024-12-05 12:21:09 +01001020 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1021 not_send_msg = None
1022 profile_id = item_content["profile"]["_id"]
1023 profile_type = item_content["profile"]["profile_type"]
1024 profile_collection_map = {
1025 "app_profiles": "k8sapp",
1026 "resource_profiles": "k8sresource",
1027 "infra_controller_profiles": "k8sinfra_controller",
1028 "infra_config_profiles": "k8sinfra_config",
1029 }
1030 profile_collection = profile_collection_map[profile_type]
1031 profile_content = self.db.get_one(
1032 profile_collection, {"_id": profile_id}, fail_on_empty=False
1033 )
1034 if not profile_content:
1035 self.db.del_one(self.topic, filter_q)
1036 not_send_msg = True
1037 return op_id, not_send_msg
yshah53cc9eb2024-07-05 13:06:31 +00001038
1039
shrinithi28d887f2025-01-08 05:27:19 +00001040class OkaTopic(DescriptorTopic, ACMOperationTopic):
yshah53cc9eb2024-07-05 13:06:31 +00001041 topic = "okas"
1042 topic_msg = "oka"
1043 schema_new = oka_schema
1044 schema_edit = oka_schema
1045
1046 def __init__(self, db, fs, msg, auth):
1047 super().__init__(db, fs, msg, auth)
1048 self.logger = logging.getLogger("nbi.oka")
1049
1050 @staticmethod
1051 def format_on_new(content, project_id=None, make_public=False):
1052 DescriptorTopic.format_on_new(
1053 content, project_id=project_id, make_public=make_public
1054 )
garciadeblasbecc7052024-11-20 12:04:53 +01001055 content["current_operation"] = None
yshah53cc9eb2024-07-05 13:06:31 +00001056 content["state"] = "PENDING_CONTENT"
1057 content["operatingState"] = "PROCESSING"
1058 content["resourceState"] = "IN_PROGRESS"
1059
1060 def check_conflict_on_del(self, session, _id, db_content):
1061 usage_state = db_content["_admin"]["usageState"]
1062 if usage_state == "IN_USE":
1063 raise EngineException(
1064 "There is a KSU using this package",
1065 http_code=HTTPStatus.CONFLICT,
1066 )
1067
1068 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
yshah00cfe8b2025-01-17 04:05:45 +00001069 if "name" in edit_content:
1070 if final_content["name"] == edit_content["name"]:
1071 name = edit_content["name"]
1072 raise EngineException(
1073 f"No update, new name for the OKA is the same: {name}",
1074 http_code=HTTPStatus.CONFLICT,
1075 )
1076 else:
1077 self.check_unique_name(session, edit_content["name"])
1078 elif (
1079 "description" in edit_content
yshah53cc9eb2024-07-05 13:06:31 +00001080 and final_content["description"] == edit_content["description"]
1081 ):
yshah00cfe8b2025-01-17 04:05:45 +00001082 description = edit_content["description"]
yshah53cc9eb2024-07-05 13:06:31 +00001083 raise EngineException(
yshah00cfe8b2025-01-17 04:05:45 +00001084 f"No update, new description for the OKA is the same: {description}",
yshah53cc9eb2024-07-05 13:06:31 +00001085 http_code=HTTPStatus.CONFLICT,
1086 )
yshah53cc9eb2024-07-05 13:06:31 +00001087 return final_content
1088
1089 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1090 indata = self._remove_envelop(indata)
1091
1092 # Override descriptor with query string kwargs
1093 if kwargs:
1094 self._update_input_with_kwargs(indata, kwargs)
1095 try:
1096 if indata and session.get("set_project"):
1097 raise EngineException(
1098 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1099 HTTPStatus.UNPROCESSABLE_ENTITY,
1100 )
1101 # TODO self._check_edition(session, indata, _id, force)
1102 if not content:
1103 content = self.show(session, _id)
1104
1105 indata = self._validate_input_edit(indata, content, force=session["force"])
1106
1107 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1108 _id = content.get("_id") or _id
1109
1110 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1111 op_id = self.format_on_edit(content, indata)
1112 deep_update_rfc7396(content, indata)
1113
1114 self.db.replace(self.topic, _id, content)
1115 return op_id
1116 except ValidationError as e:
1117 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1118
1119 def delete(self, session, _id, dry_run=False, not_send_msg=None):
1120 if not self.multiproject:
1121 filter_q = {}
1122 else:
1123 filter_q = self._get_project_filter(session)
1124 filter_q[self.id_field(self.topic, _id)] = _id
1125 item_content = self.db.get_one(self.topic, filter_q)
1126 item_content["state"] = "IN_DELETION"
1127 item_content["operatingState"] = "PROCESSING"
1128 self.check_conflict_on_del(session, _id, item_content)
yshahd0c876f2024-11-11 09:24:48 +00001129 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001130 item_content,
1131 "delete",
1132 None,
1133 )
yshah53cc9eb2024-07-05 13:06:31 +00001134 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1135 self._send_msg(
1136 "delete", {"oka_id": _id, "operation_id": op_id}, not_send_msg=not_send_msg
1137 )
yshahffcac5f2024-08-19 12:49:07 +00001138 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001139
1140 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1141 # _remove_envelop
1142 if indata:
1143 if "userDefinedData" in indata:
1144 indata = indata["userDefinedData"]
1145
1146 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1147
1148 self._update_input_with_kwargs(content, kwargs)
1149 content = BaseTopic._validate_input_new(
1150 self, input=kwargs, force=session["force"]
1151 )
1152
1153 self.check_unique_name(session, content["name"])
1154 operation_params = {}
1155 for content_key, content_value in content.items():
1156 operation_params[content_key] = content_value
1157 self.format_on_new(
1158 content, session["project_id"], make_public=session["public"]
1159 )
yshahd0c876f2024-11-11 09:24:48 +00001160 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001161 content,
1162 operation_type="create",
1163 operation_params=operation_params,
1164 )
1165 content["git_name"] = self.create_gitname(content, session)
1166 _id = self.db.create(self.topic, content)
1167 rollback.append({"topic": self.topic, "_id": _id})
yshahd0c876f2024-11-11 09:24:48 +00001168 return _id, op_id
yshah53cc9eb2024-07-05 13:06:31 +00001169
1170 def upload_content(self, session, _id, indata, kwargs, headers):
1171 current_desc = self.show(session, _id)
1172
1173 compressed = None
1174 content_type = headers.get("Content-Type")
1175 if (
1176 content_type
1177 and "application/gzip" in content_type
1178 or "application/x-gzip" in content_type
1179 ):
1180 compressed = "gzip"
1181 if content_type and "application/zip" in content_type:
1182 compressed = "zip"
1183 filename = headers.get("Content-Filename")
1184 if not filename and compressed:
1185 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1186 elif not filename:
1187 filename = "package"
1188
1189 revision = 1
1190 if "revision" in current_desc["_admin"]:
1191 revision = current_desc["_admin"]["revision"] + 1
1192
1193 file_pkg = None
1194 fs_rollback = []
1195
1196 try:
1197 start = 0
1198 # Rather than using a temp folder, we will store the package in a folder based on
1199 # the current revision.
1200 proposed_revision_path = _id + ":" + str(revision)
1201 # all the content is upload here and if ok, it is rename from id_ to is folder
1202
1203 if start:
1204 if not self.fs.file_exists(proposed_revision_path, "dir"):
1205 raise EngineException(
1206 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1207 )
1208 else:
1209 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1210 self.fs.mkdir(proposed_revision_path)
1211 fs_rollback.append(proposed_revision_path)
1212
1213 storage = self.fs.get_params()
1214 storage["folder"] = proposed_revision_path
yshah2c932bd2024-09-24 18:16:07 +00001215 storage["zipfile"] = filename
yshah53cc9eb2024-07-05 13:06:31 +00001216
1217 file_path = (proposed_revision_path, filename)
1218 file_pkg = self.fs.file_open(file_path, "a+b")
1219
yshah53cc9eb2024-07-05 13:06:31 +00001220 if isinstance(indata, dict):
1221 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1222 file_pkg.write(indata_text.encode(encoding="utf-8"))
1223 else:
1224 indata_len = 0
1225 indata = indata.file
1226 while True:
1227 indata_text = indata.read(4096)
1228 indata_len += len(indata_text)
1229 if not indata_text:
1230 break
1231 file_pkg.write(indata_text)
1232
yshah53cc9eb2024-07-05 13:06:31 +00001233 # Need to close the file package here so it can be copied from the
1234 # revision to the current, unrevisioned record
1235 if file_pkg:
1236 file_pkg.close()
1237 file_pkg = None
1238
1239 # Fetch both the incoming, proposed revision and the original revision so we
1240 # can call a validate method to compare them
1241 current_revision_path = _id + "/"
1242 self.fs.sync(from_path=current_revision_path)
1243 self.fs.sync(from_path=proposed_revision_path)
1244
garciadeblas807b8bf2024-09-23 13:03:00 +02001245 # Is this required?
yshah53cc9eb2024-07-05 13:06:31 +00001246 if revision > 1:
1247 try:
1248 self._validate_descriptor_changes(
1249 _id,
1250 filename,
1251 current_revision_path,
1252 proposed_revision_path,
1253 )
1254 except Exception as e:
1255 shutil.rmtree(
1256 self.fs.path + current_revision_path, ignore_errors=True
1257 )
1258 shutil.rmtree(
1259 self.fs.path + proposed_revision_path, ignore_errors=True
1260 )
1261 # Only delete the new revision. We need to keep the original version in place
1262 # as it has not been changed.
1263 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1264 raise e
1265
1266 indata = self._remove_envelop(indata)
1267
1268 # Override descriptor with query string kwargs
1269 if kwargs:
1270 self._update_input_with_kwargs(indata, kwargs)
1271
1272 current_desc["_admin"]["storage"] = storage
1273 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1274 current_desc["_admin"]["operationalState"] = "ENABLED"
1275 current_desc["_admin"]["modified"] = time()
1276 current_desc["_admin"]["revision"] = revision
1277
1278 deep_update_rfc7396(current_desc, indata)
1279
1280 # Copy the revision to the active package name by its original id
1281 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1282 os.rename(
1283 self.fs.path + proposed_revision_path,
1284 self.fs.path + current_revision_path,
1285 )
1286 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1287 self.fs.mkdir(current_revision_path)
1288 self.fs.reverse_sync(from_path=current_revision_path)
1289
1290 shutil.rmtree(self.fs.path + _id)
1291 kwargs = {}
1292 kwargs["package"] = filename
1293 if headers["Method"] == "POST":
1294 current_desc["state"] = "IN_CREATION"
garciadeblasbecc7052024-11-20 12:04:53 +01001295 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get(
1296 "op_id"
1297 )
yshah53cc9eb2024-07-05 13:06:31 +00001298 elif headers["Method"] in ("PUT", "PATCH"):
yshahd0c876f2024-11-11 09:24:48 +00001299 op_id = self.format_on_operation(
yshah53cc9eb2024-07-05 13:06:31 +00001300 current_desc,
1301 "update",
1302 kwargs,
1303 )
1304 current_desc["operatingState"] = "PROCESSING"
1305 current_desc["resourceState"] = "IN_PROGRESS"
1306
1307 self.db.replace(self.topic, _id, current_desc)
1308
1309 # Store a copy of the package as a point in time revision
1310 revision_desc = dict(current_desc)
1311 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1312 self.db.create(self.topic + "_revisions", revision_desc)
1313 fs_rollback = []
1314
yshah53cc9eb2024-07-05 13:06:31 +00001315 if headers["Method"] == "POST":
1316 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1317 elif headers["Method"] == "PUT" or "PATCH":
1318 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1319
1320 return True
1321
1322 except EngineException:
1323 raise
1324 finally:
1325 if file_pkg:
1326 file_pkg.close()
1327 for file in fs_rollback:
1328 self.fs.file_delete(file, ignore_non_exist=True)