blob: 6e5e66a7607924d868297748cd397d301d821f32 [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
yshah53cc9eb2024-07-05 13:06:31 +000018import shutil
19import os
rshri2d386cb2024-07-05 14:35:51 +000020from http import HTTPStatus
yshah53cc9eb2024-07-05 13:06:31 +000021from uuid import uuid4
rshri2d386cb2024-07-05 14:35:51 +000022
yshah53cc9eb2024-07-05 13:06:31 +000023from time import time
rshri2d386cb2024-07-05 14:35:51 +000024from osm_nbi.base_topic import BaseTopic, EngineException
25
yshah53cc9eb2024-07-05 13:06:31 +000026from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000027from osm_nbi.validation import (
28 ValidationError,
29 clustercreation_new_schema,
30 infra_controller_profile_create_new_schema,
31 infra_config_profile_create_new_schema,
32 app_profile_create_new_schema,
33 resource_profile_create_new_schema,
34 infra_controller_profile_create_edit_schema,
35 infra_config_profile_create_edit_schema,
36 app_profile_create_edit_schema,
37 resource_profile_create_edit_schema,
38 k8scluster_new_schema,
39 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000040 ksu_schema,
41 oka_schema,
rshri2d386cb2024-07-05 14:35:51 +000042)
yshah53cc9eb2024-07-05 13:06:31 +000043from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000044from osm_common.msgbase import MsgException
45from osm_common.fsbase import FsException
46
yshah53cc9eb2024-07-05 13:06:31 +000047__author__ = (
48 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
49 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
50)
rshri2d386cb2024-07-05 14:35:51 +000051
52
53class InfraContTopic(BaseTopic):
54 topic = "k8sinfra_controller"
55 topic_msg = "k8s_infra_controller"
56 schema_new = infra_controller_profile_create_new_schema
57 schema_edit = infra_controller_profile_create_edit_schema
58
59 def __init__(self, db, fs, msg, auth):
60 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000061
62 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
63 # To create the new infra controller profile
64 return self.new_profile(rollback, session, indata, kwargs, headers)
65
66 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
67 # To create the default infra controller profile while creating the cluster
68 return self.default_profile(rollback, session, indata, kwargs, headers)
69
70 def delete(self, session, _id, dry_run=False, not_send_msg=None):
71 item_content = self.db.get_one(self.topic, {"_id": _id})
72 if item_content.get("default", False):
73 raise EngineException(
74 "Cannot delete item because it is marked as default",
75 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
76 )
77 # Before deleting, detach the profile from the associated clusters.
78 self.detach(session, _id, profile_type="infra_controller_profiles")
79 # To delete the infra controller profile
80 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +000081 return _id
rshri2d386cb2024-07-05 14:35:51 +000082
83
84class InfraConfTopic(BaseTopic):
85 topic = "k8sinfra_config"
86 topic_msg = "k8s_infra_config"
87 schema_new = infra_config_profile_create_new_schema
88 schema_edit = infra_config_profile_create_edit_schema
89
90 def __init__(self, db, fs, msg, auth):
91 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000092
93 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
94 # To create the new infra config profile
95 return self.new_profile(rollback, session, indata, kwargs, headers)
96
97 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
98 # To create the default infra config profile while creating the cluster
99 return self.default_profile(rollback, session, indata, kwargs, headers)
100
101 def delete(self, session, _id, dry_run=False, not_send_msg=None):
102 item_content = self.db.get_one(self.topic, {"_id": _id})
103 if item_content.get("default", False):
104 raise EngineException(
105 "Cannot delete item because it is marked as default",
106 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
107 )
108 # Before deleting, detach the profile from the associated clusters.
109 self.detach(session, _id, profile_type="infra_config_profiles")
110 # To delete the infra config profile
111 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000112 return _id
rshri2d386cb2024-07-05 14:35:51 +0000113
114
115class AppTopic(BaseTopic):
116 topic = "k8sapp"
117 topic_msg = "k8s_app"
118 schema_new = app_profile_create_new_schema
119 schema_edit = app_profile_create_edit_schema
120
121 def __init__(self, db, fs, msg, auth):
122 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000123
124 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
125 # To create the new app profile
126 return self.new_profile(rollback, session, indata, kwargs, headers)
127
128 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
129 # To create the default app profile while creating the cluster
130 return self.default_profile(rollback, session, indata, kwargs, headers)
131
132 def delete(self, session, _id, dry_run=False, not_send_msg=None):
133 item_content = self.db.get_one(self.topic, {"_id": _id})
134 if item_content.get("default", False):
135 raise EngineException(
136 "Cannot delete item because it is marked as default",
137 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
138 )
139 # Before deleting, detach the profile from the associated clusters.
140 self.detach(session, _id, profile_type="app_profiles")
141 # To delete the app profile
142 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000143 return _id
rshri2d386cb2024-07-05 14:35:51 +0000144
145
146class ResourceTopic(BaseTopic):
147 topic = "k8sresource"
148 topic_msg = "k8s_resource"
149 schema_new = resource_profile_create_new_schema
150 schema_edit = resource_profile_create_edit_schema
151
152 def __init__(self, db, fs, msg, auth):
153 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000154
155 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
156 # To create the new resource profile
157 return self.new_profile(rollback, session, indata, kwargs, headers)
158
159 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
160 # To create the default resource profile while creating the cluster
161 return self.default_profile(rollback, session, indata, kwargs, headers)
162
163 def delete(self, session, _id, dry_run=False, not_send_msg=None):
164 item_content = self.db.get_one(self.topic, {"_id": _id})
165 if item_content.get("default", False):
166 raise EngineException(
167 "Cannot delete item because it is marked as default",
168 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
169 )
170 # Before deleting, detach the profile from the associated clusters.
171 self.detach(session, _id, profile_type="resource_profiles")
172 # To delete the resource profile
173 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000174 return _id
rshri2d386cb2024-07-05 14:35:51 +0000175
176
177class K8sTopic(BaseTopic):
178 topic = "clusters"
179 topic_msg = "cluster"
180 schema_new = clustercreation_new_schema
181 schema_edit = attach_dettach_profile_schema
182
183 def __init__(self, db, fs, msg, auth):
184 BaseTopic.__init__(self, db, fs, msg, auth)
185 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
186 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
187 self.resource_topic = ResourceTopic(db, fs, msg, auth)
188 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000189
190 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
191 """
192 Creates a new k8scluster into database.
193 :param rollback: list to append the created items at database in case a rollback must be done
194 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
195 :param indata: params to be used for the k8cluster
196 :param kwargs: used to override the indata
197 :param headers: http request headers
198 :return: the _id of k8scluster created at database. Or an exception of type
199 EngineException, ValidationError, DbException, FsException, MsgException.
200 Note: Exceptions are not captured on purpose. They should be captured at called
201 """
202 step = "checking quotas" # first step must be defined outside try
203 try:
204 self.check_quota(session)
205 step = "name unique check"
206 self.check_unique_name(session, indata["name"])
207 step = "validating input parameters"
208 cls_request = self._remove_envelop(indata)
209 self._update_input_with_kwargs(cls_request, kwargs)
210 cls_request = self._validate_input_new(cls_request, session["force"])
211 operation_params = cls_request
212
213 step = "filling cluster details from input data"
214 cls_create = self._create_cluster(
215 cls_request, rollback, session, indata, kwargs, headers
216 )
217
218 step = "creating cluster at database"
219 self.format_on_new(
220 cls_create, session["project_id"], make_public=session["public"]
221 )
222 cls_create["current_operation"] = None
223 op_id = self.format_on_operation(
224 cls_create,
225 "create",
226 operation_params,
227 )
228 _id = self.db.create(self.topic, cls_create)
garciadeblas6e88d9c2024-08-15 10:55:04 +0200229 pubkey, privkey = self._generate_age_key()
230 cls_create["age_pubkey"] = self.db.encrypt(
231 pubkey, schema_version="1.11", salt=_id
232 )
233 cls_create["age_privkey"] = self.db.encrypt(
234 privkey, schema_version="1.11", salt=_id
235 )
236 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000237 rollback.append({"topic": self.topic, "_id": _id})
238 self.db.set_one("clusters", {"_id": _id}, cls_create)
239 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
240
241 return _id, None
242 except (
243 ValidationError,
244 EngineException,
245 DbException,
246 MsgException,
247 FsException,
248 ) as e:
249 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
250
251 def _create_cluster(self, cls_request, rollback, session, indata, kwargs, headers):
252 # Check whether the region name and resource group have been given
garciadeblasc3a6c492024-08-15 10:00:42 +0200253 region_given = "region_name" in indata
254 resource_group_given = "resource_group" in indata
rshri2d386cb2024-07-05 14:35:51 +0000255
256 # Get the vim_account details
257 vim_account_details = self.db.get_one(
258 "vim_accounts", {"name": cls_request["vim_account"]}
259 )
260
garciadeblasc3a6c492024-08-15 10:00:42 +0200261 # Check whether the region name and resource group have been given
262 if not region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000263 region_name = vim_account_details["config"]["region_name"]
264 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200265 elif region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000266 region_name = cls_request["region_name"]
267 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200268 elif not region_given and resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000269 region_name = vim_account_details["config"]["region_name"]
270 resource_group = cls_request["resource_group"]
271 else:
272 region_name = cls_request["region_name"]
273 resource_group = cls_request["resource_group"]
274
275 cls_desc = {
276 "name": cls_request["name"],
277 "vim_account": self.check_vim(session, cls_request["vim_account"]),
278 "k8s_version": cls_request["k8s_version"],
279 "node_size": cls_request["node_size"],
280 "node_count": cls_request["node_count"],
281 "description": cls_request["description"],
282 "region_name": region_name,
283 "resource_group": resource_group,
284 "infra_controller_profiles": [
285 self._create_default_profiles(
286 rollback, session, indata, kwargs, headers, self.infra_contr_topic
287 )
288 ],
289 "infra_config_profiles": [
290 self._create_default_profiles(
291 rollback, session, indata, kwargs, headers, self.infra_conf_topic
292 )
293 ],
294 "resource_profiles": [
295 self._create_default_profiles(
296 rollback, session, indata, kwargs, headers, self.resource_topic
297 )
298 ],
299 "app_profiles": [
300 self._create_default_profiles(
301 rollback, session, indata, kwargs, headers, self.app_topic
302 )
303 ],
304 "created": "true",
305 "state": "IN_CREATION",
306 "operatingState": "PROCESSING",
307 "git_name": self.create_gitname(cls_request, session),
308 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
309 }
310 return cls_desc
311
312 def check_vim(self, session, name):
313 try:
314 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
315 if vim_account_details is not None:
316 return name
317 except ValidationError as e:
318 raise EngineException(
319 e,
320 HTTPStatus.UNPROCESSABLE_ENTITY,
321 )
322
323 def _create_default_profiles(
324 self, rollback, session, indata, kwargs, headers, topic
325 ):
326 topic = self.to_select_topic(topic)
327 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
328 return default_profiles
329
330 def to_select_topic(self, topic):
331 if topic == "infra_controller_profiles":
332 topic = self.infra_contr_topic
333 elif topic == "infra_config_profiles":
334 topic = self.infra_conf_topic
335 elif topic == "resource_profiles":
336 topic = self.resource_topic
337 elif topic == "app_profiles":
338 topic = self.app_topic
339 return topic
340
341 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
342 try:
343 filter_q = self._get_project_filter(session)
344 filter_q[self.id_field(self.topic, _id)] = _id
345 content = self.db.get_one(self.topic, filter_q)
346 existing_profiles = []
347 topic = None
348 topic = self.to_select_topic(profile)
349 for profile_id in content[profile]:
350 data = topic.show(session, profile_id, filter_q, api_req)
351 existing_profiles.append(data)
352 return existing_profiles
353 except ValidationError as e:
354 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
355
356 def state_check(self, profile_id, session, topic):
357 topic = self.to_select_topic(topic)
358 content = topic.show(session, profile_id, filter_q=None, api_req=False)
359 state = content["state"]
360 if state == "CREATED":
361 return
362 else:
363 raise EngineException(
364 f" {profile_id} is not in created state",
365 HTTPStatus.UNPROCESSABLE_ENTITY,
366 )
367
368 def edit(self, session, _id, item, indata=None, kwargs=None):
369 indata = self._remove_envelop(indata)
370 indata = self._validate_input_edit(indata, content=None, force=session["force"])
371 if indata.get("add_profile"):
372 self.add_profile(session, _id, item, indata)
373 elif indata.get("remove_profile"):
374 self.remove_profile(session, _id, item, indata)
375 else:
376 error_msg = "Add / remove operation is only applicable"
377 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
378
379 def add_profile(self, session, _id, item, indata=None):
380 indata = self._remove_envelop(indata)
381 operation_params = indata
382 profile_id = indata["add_profile"][0]["id"]
383 # check state
384 self.state_check(profile_id, session, item)
385 filter_q = self._get_project_filter(session)
386 filter_q[self.id_field(self.topic, _id)] = _id
387 content = self.db.get_one(self.topic, filter_q)
388 profile_list = content[item]
389
390 if profile_id not in profile_list:
391 content["operatingState"] = "PROCESSING"
392 content["current_operation"] = None
393 op_id = self.format_on_operation(
394 content,
395 "add",
396 operation_params,
397 )
398 self.db.set_one("clusters", {"_id": content["_id"]}, content)
399 self._send_msg(
400 "add",
401 {
402 "cluster_id": _id,
403 "profile_id": profile_id,
404 "profile_type": item,
405 "operation_id": op_id,
406 },
407 )
408 else:
409 raise EngineException(
410 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
411 )
412
413 def _get_default_profiles(self, session, topic):
414 topic = self.to_select_topic(topic)
415 existing_profiles = topic.list(session, filter_q=None, api_req=False)
416 default_profiles = [
417 profile["_id"]
418 for profile in existing_profiles
419 if profile.get("default", False)
420 ]
421 return default_profiles
422
423 def remove_profile(self, session, _id, item, indata):
424 indata = self._remove_envelop(indata)
425 operation_params = indata
426 profile_id = indata["remove_profile"][0]["id"]
427 filter_q = self._get_project_filter(session)
428 filter_q[self.id_field(self.topic, _id)] = _id
429 content = self.db.get_one(self.topic, filter_q)
430 profile_list = content[item]
431
432 default_profiles = self._get_default_profiles(session, item)
433
434 if profile_id in default_profiles:
435 raise EngineException(
436 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
437 )
438 if profile_id in profile_list:
439 content["operatingState"] = "PROCESSING"
440 content["current_operation"] = None
441 op_id = self.format_on_operation(
442 content,
443 "remove",
444 operation_params,
445 )
446 self.db.set_one("clusters", {"_id": content["_id"]}, content)
447 self._send_msg(
448 "remove",
449 {
450 "cluster_id": _id,
451 "profile_id": profile_id,
452 "profile_type": item,
453 "operation_id": op_id,
454 },
455 )
456 else:
457 raise EngineException(
458 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
459 )
460
yshah53cc9eb2024-07-05 13:06:31 +0000461 def get_cluster_info(self, session, _id, item):
462 if not self.multiproject:
463 filter_db = {}
464 else:
465 filter_db = self._get_project_filter(session)
466 # To allow project&user addressing by name AS WELL AS _id
467 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
468 data = self.db.get_one(self.topic, filter_db)
469 self._send_msg(item, {"_id": _id})
470 return data
471
472 def update_cluster(self, session, _id, item, indata):
473 if not self.multiproject:
474 filter_db = {}
475 else:
476 filter_db = self._get_project_filter(session)
477 # To allow project&user addressing by name AS WELL AS _id
478 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
479 data = self.db.get_one(self.topic, filter_db)
480 data["operatingState"] = "PROCESSING"
481 data["resourceState"] = "IN_PROGRESS"
482 operation_params = indata
483 self.format_on_operation(
484 data,
485 item,
486 operation_params,
487 )
488 self.db.set_one(self.topic, {"_id": _id}, data)
489 op_id = data["current_operation"]
490 data = {"cluster_id": _id, "operation_id": op_id}
491 self._send_msg(item, data)
492 return op_id
493
rshri2d386cb2024-07-05 14:35:51 +0000494
495class K8saddTopic(BaseTopic):
496 topic = "clusters"
497 topic_msg = "cluster"
498 schema_new = k8scluster_new_schema
499
500 def __init__(self, db, fs, msg, auth):
501 BaseTopic.__init__(self, db, fs, msg, auth)
502
503 def add(self, rollback, session, indata, kwargs=None, headers=None):
504 step = "checking quotas"
505 try:
506 self.check_quota(session)
507 step = "name unique check"
508 self.check_unique_name(session, indata["name"])
509 step = "validating input parameters"
510 cls_add_request = self._remove_envelop(indata)
511 self._update_input_with_kwargs(cls_add_request, kwargs)
512 cls_add_request = self._validate_input_new(
513 cls_add_request, session["force"]
514 )
515 operation_params = cls_add_request
516
517 step = "filling cluster details from input data"
518 cls_add = self._add_cluster(cls_add_request, session)
519
520 step = "creating cluster at database"
521 self.format_on_new(
522 cls_add, session["project_id"], make_public=session["public"]
523 )
524 cls_add["current_operation"] = None
525 op_id = self.format_on_operation(
526 cls_add,
527 "register",
528 operation_params,
529 )
530 _id = self.db.create(self.topic, cls_add)
garciadeblas866cebd2024-09-25 11:25:33 +0200531 pubkey, privkey = self._generate_age_key()
532 cls_add["age_pubkey"] = self.db.encrypt(
533 pubkey, schema_version="1.11", salt=_id
534 )
535 cls_add["age_privkey"] = self.db.encrypt(
536 privkey, schema_version="1.11", salt=_id
537 )
538 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000539 self.db.set_one(self.topic, {"_id": _id}, cls_add)
540 rollback.append({"topic": self.topic, "_id": _id})
541 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
542 return _id, None
543 except (
544 ValidationError,
545 EngineException,
546 DbException,
547 MsgException,
548 FsException,
549 ) as e:
550 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
551
552 def _add_cluster(self, cls_add_request, session):
553 cls_add = {
554 "name": cls_add_request["name"],
555 "description": cls_add_request["description"],
556 "credentials": cls_add_request["credentials"],
557 "vim_account": cls_add_request["vim_account"],
558 "k8s_version": cls_add_request["k8s_version"],
559 "nets": cls_add_request["nets"],
560 "created": "false",
561 "state": "IN_CREATION",
562 "operatingState": "PROCESSING",
563 "git_name": self.create_gitname(cls_add_request, session),
564 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
565 }
566 return cls_add
567
568 def remove(self, session, _id, dry_run=False, not_send_msg=None):
569 """
570 Delete item by its internal _id
571 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
572 :param _id: server internal id
573 :param dry_run: make checking but do not delete
574 :param not_send_msg: To not send message (False) or store content (list) instead
575 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
576 """
577
578 # To allow addressing projects and users by name AS WELL AS by _id
579 if not self.multiproject:
580 filter_q = {}
581 else:
582 filter_q = self._get_project_filter(session)
583 filter_q[self.id_field(self.topic, _id)] = _id
584 item_content = self.db.get_one(self.topic, filter_q)
585
586 item_content["operatingState"] = "PROCESSING"
587 item_content["current_operation"] = None
588 op_id = self.format_on_operation(
589 item_content,
590 "deregister",
591 None,
592 )
593 self.db.set_one(self.topic, {"_id": _id}, item_content)
594
595 self.check_conflict_on_del(session, _id, item_content)
596 if dry_run:
597 return None
598
599 if self.multiproject and session["project_id"]:
600 # remove reference from project_read if there are more projects referencing it. If it last one,
601 # do not remove reference, but delete
602 other_projects_referencing = next(
603 (
604 p
605 for p in item_content["_admin"]["projects_read"]
606 if p not in session["project_id"] and p != "ANY"
607 ),
608 None,
609 )
610
611 # check if there are projects referencing it (apart from ANY, that means, public)....
612 if other_projects_referencing:
613 # remove references but not delete
614 update_dict_pull = {
615 "_admin.projects_read": session["project_id"],
616 "_admin.projects_write": session["project_id"],
617 }
618 self.db.set_one(
619 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
620 )
621 return None
622 else:
623 can_write = next(
624 (
625 p
626 for p in item_content["_admin"]["projects_write"]
627 if p == "ANY" or p in session["project_id"]
628 ),
629 None,
630 )
631 if not can_write:
632 raise EngineException(
633 "You have not write permission to delete it",
634 http_code=HTTPStatus.UNAUTHORIZED,
635 )
636
637 # delete
638 self._send_msg(
639 "deregister",
640 {"cluster_id": _id, "operation_id": op_id},
641 not_send_msg=not_send_msg,
642 )
643 return None
yshah53cc9eb2024-07-05 13:06:31 +0000644
645
646class KsusTopic(BaseTopic):
647 topic = "ksus"
648 okapkg_topic = "okas"
649 infra_topic = "k8sinfra"
650 topic_msg = "ksu"
651 schema_new = ksu_schema
652 schema_edit = ksu_schema
653
654 def __init__(self, db, fs, msg, auth):
655 BaseTopic.__init__(self, db, fs, msg, auth)
656 self.logger = logging.getLogger("nbi.ksus")
657
658 @staticmethod
659 def format_on_new(content, project_id=None, make_public=False):
660 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
661 content["state"] = "IN_CREATION"
662 content["operatingState"] = "PROCESSING"
663 content["resourceState"] = "IN_PROGRESS"
664
665 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
666 _id_list = []
667 op_id = str(uuid4())
668 for ksus in indata["ksus"]:
669 content = ksus
670 oka = content["oka"][0]
671 oka_flag = ""
672 if oka["_id"]:
673 oka_flag = "_id"
674 elif oka["sw_catalog_path"]:
675 oka_flag = "sw_catalog_path"
676
677 for okas in content["oka"]:
678 if okas["_id"] and okas["sw_catalog_path"]:
679 raise EngineException(
680 "Cannot create ksu with both OKA and SW catalog path",
681 HTTPStatus.UNPROCESSABLE_ENTITY,
682 )
683 if not okas["sw_catalog_path"]:
684 okas.pop("sw_catalog_path")
685 elif not okas["_id"]:
686 okas.pop("_id")
687 if oka_flag not in okas.keys():
688 raise EngineException(
689 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
690 HTTPStatus.UNPROCESSABLE_ENTITY,
691 )
692
693 # Override descriptor with query string kwargs
694 content = self._remove_envelop(content)
695 self._update_input_with_kwargs(content, kwargs)
696 content = self._validate_input_new(input=content, force=session["force"])
697
698 # Check for unique name
699 self.check_unique_name(session, content["name"])
700
701 self.check_conflict_on_new(session, content)
702
703 operation_params = {}
704 for content_key, content_value in content.items():
705 operation_params[content_key] = content_value
706 self.format_on_new(
707 content, project_id=session["project_id"], make_public=session["public"]
708 )
709 content["current_operation"] = op_id
710 op_id = self.format_on_operation(
711 content,
712 operation_type="create",
713 operation_params=operation_params,
714 )
715 content["git_name"] = self.create_gitname(content, session)
716
717 # Update Oka_package usage state
718 for okas in content["oka"]:
719 if "_id" in okas.keys():
720 self.update_usage_state(session, okas)
721
722 _id = self.db.create(self.topic, content)
723 rollback.append({"topic": self.topic, "_id": _id})
724
725 if not op_id:
726 op_id = content["current_operation"]
727 _id_list.append(_id)
728 data = {"ksus_list": _id_list, "operation_id": op_id}
729 self._send_msg("create", data)
730 return _id_list, op_id
731
732 def clone(self, rollback, session, _id, indata, kwargs, headers):
733 filter_db = self._get_project_filter(session)
734 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
735 data = self.db.get_one(self.topic, filter_db)
736
737 data["current_operation"] = None
738 op_id = self.format_on_operation(
739 data,
740 "clone",
741 indata,
742 )
743 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
744 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
745 return op_id
746
747 def update_usage_state(self, session, oka_content):
748 _id = oka_content["_id"]
749 filter_db = self._get_project_filter(session)
750 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
751
752 data = self.db.get_one(self.okapkg_topic, filter_db)
753 if data["_admin"]["usageState"] == "NOT_IN_USE":
754 usage_state_update = {
755 "_admin.usageState": "IN_USE",
756 }
757 self.db.set_one(
758 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
759 )
760
761 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
762 indata = self._remove_envelop(indata)
763
764 # Override descriptor with query string kwargs
765 if kwargs:
766 self._update_input_with_kwargs(indata, kwargs)
767 try:
768 if indata and session.get("set_project"):
769 raise EngineException(
770 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
771 HTTPStatus.UNPROCESSABLE_ENTITY,
772 )
773 # TODO self._check_edition(session, indata, _id, force)
774 if not content:
775 content = self.show(session, _id)
776 indata = self._validate_input_edit(
777 input=indata, content=content, force=session["force"]
778 )
779 operation_params = indata
780 deep_update_rfc7396(content, indata)
781
782 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
783 _id = content.get("_id") or _id
784 content["current_operation"] = None
785 self.format_on_operation(
786 content,
787 "move",
788 operation_params,
789 )
790 if content.get("_admin"):
791 now = time()
792 content["_admin"]["modified"] = now
793 content["operatingState"] = "PROCESSING"
794 content["resourceState"] = "IN_PROGRESS"
795
796 self.db.replace(self.topic, _id, content)
797
798 op_id = content["current_operation"]
799 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
800 self._send_msg("move", data)
801 return op_id
802 except ValidationError as e:
803 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
804
805 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
806 if final_content["name"] != edit_content["name"]:
807 self.check_unique_name(session, edit_content["name"])
808 return final_content
809
810 @staticmethod
811 def format_on_edit(final_content, edit_content):
812 BaseTopic.format_on_operation(
813 final_content,
814 "update",
815 edit_content,
816 )
817 final_content["operatingState"] = "PROCESSING"
818 final_content["resourceState"] = "IN_PROGRESS"
819 if final_content.get("_admin"):
820 now = time()
821 final_content["_admin"]["modified"] = now
822 return final_content["current_operation"]
823
824 def edit(self, session, _id, indata, kwargs):
825 _id_list = []
826 op_id = str(uuid4())
827 if _id == "update":
828 for ksus in indata["ksus"]:
829 content = ksus
830 _id = content["_id"]
831 _id_list.append(_id)
832 content.pop("_id")
833 op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
834 else:
835 content = indata
836 _id_list.append(_id)
837 op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
838
839 data = {"ksus_list": _id_list, "operation_id": op_id}
840 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +0000841
842 def edit_ksu(self, session, _id, op_id, indata, kwargs):
843 content = None
844 indata = self._remove_envelop(indata)
845
846 # Override descriptor with query string kwargs
847 if kwargs:
848 self._update_input_with_kwargs(indata, kwargs)
849 try:
850 if indata and session.get("set_project"):
851 raise EngineException(
852 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
853 HTTPStatus.UNPROCESSABLE_ENTITY,
854 )
855 # TODO self._check_edition(session, indata, _id, force)
856 if not content:
857 content = self.show(session, _id)
858
859 for okas in indata["oka"]:
860 if not okas["_id"]:
861 okas.pop("_id")
862 if not okas["sw_catalog_path"]:
863 okas.pop("sw_catalog_path")
864
865 indata = self._validate_input_edit(indata, content, force=session["force"])
866
867 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
868 _id = content.get("_id") or _id
869
870 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
871 content["current_operation"] = op_id
872 op_id = self.format_on_edit(content, indata)
873 self.db.replace(self.topic, _id, content)
874 return op_id
875 except ValidationError as e:
876 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
877
878 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
879 _id_list = []
880 op_id = str(uuid4())
881 if _id == "delete":
882 for ksus in indata["ksus"]:
883 content = ksus
884 _id = content["_id"]
885 _id_list.append(_id)
886 content.pop("_id")
887 op_id = self.delete(session, _id, op_id)
888 else:
889 _id_list.append(_id)
890 op_id = self.delete(session, _id, op_id)
891
892 data = {"ksus_list": _id_list, "operation_id": op_id}
893 self._send_msg("delete", data)
894 return op_id
895
896 def delete(self, session, _id, op_id):
897 if not self.multiproject:
898 filter_q = {}
899 else:
900 filter_q = self._get_project_filter(session)
901 filter_q[self.id_field(self.topic, _id)] = _id
902 item_content = self.db.get_one(self.topic, filter_q)
903 item_content["state"] = "IN_DELETION"
904 item_content["operatingState"] = "PROCESSING"
905 item_content["resourceState"] = "IN_PROGRESS"
906 item_content["current_operation"] = op_id
907 self.format_on_operation(
908 item_content,
909 "delete",
910 None,
911 )
912 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
913
914 if item_content["oka"][0].get("_id"):
915 used_oka = {}
916 existing_oka = []
917 for okas in item_content["oka"]:
918 used_oka["_id"] = okas["_id"]
919
920 filter = self._get_project_filter(session)
921 data = self.db.get_list(self.topic, filter)
922
923 if data:
924 for ksus in data:
925 if ksus["_id"] != _id:
926 for okas in ksus["oka"]:
927 if okas["_id"] not in existing_oka:
928 existing_oka.append(okas["_id"])
929
930 if used_oka:
931 for oka, oka_id in used_oka.items():
932 if oka_id not in existing_oka:
933 self.db.set_one(
934 self.okapkg_topic,
935 {"_id": oka_id},
936 {"_admin.usageState": "NOT_IN_USE"},
937 )
938 return op_id
939
940
941class OkaTopic(DescriptorTopic):
942 topic = "okas"
943 topic_msg = "oka"
944 schema_new = oka_schema
945 schema_edit = oka_schema
946
947 def __init__(self, db, fs, msg, auth):
948 super().__init__(db, fs, msg, auth)
949 self.logger = logging.getLogger("nbi.oka")
950
951 @staticmethod
952 def format_on_new(content, project_id=None, make_public=False):
953 DescriptorTopic.format_on_new(
954 content, project_id=project_id, make_public=make_public
955 )
956 content["state"] = "PENDING_CONTENT"
957 content["operatingState"] = "PROCESSING"
958 content["resourceState"] = "IN_PROGRESS"
959
960 def check_conflict_on_del(self, session, _id, db_content):
961 usage_state = db_content["_admin"]["usageState"]
962 if usage_state == "IN_USE":
963 raise EngineException(
964 "There is a KSU using this package",
965 http_code=HTTPStatus.CONFLICT,
966 )
967
968 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
969 if (
970 final_content["name"] == edit_content["name"]
971 and final_content["description"] == edit_content["description"]
972 ):
973 raise EngineException(
974 "No update",
975 http_code=HTTPStatus.CONFLICT,
976 )
977 if final_content["name"] != edit_content["name"]:
978 self.check_unique_name(session, edit_content["name"])
979 return final_content
980
981 def edit(self, session, _id, indata=None, kwargs=None, content=None):
982 indata = self._remove_envelop(indata)
983
984 # Override descriptor with query string kwargs
985 if kwargs:
986 self._update_input_with_kwargs(indata, kwargs)
987 try:
988 if indata and session.get("set_project"):
989 raise EngineException(
990 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
991 HTTPStatus.UNPROCESSABLE_ENTITY,
992 )
993 # TODO self._check_edition(session, indata, _id, force)
994 if not content:
995 content = self.show(session, _id)
996
997 indata = self._validate_input_edit(indata, content, force=session["force"])
998
999 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1000 _id = content.get("_id") or _id
1001
1002 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1003 op_id = self.format_on_edit(content, indata)
1004 deep_update_rfc7396(content, indata)
1005
1006 self.db.replace(self.topic, _id, content)
1007 return op_id
1008 except ValidationError as e:
1009 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1010
1011 def delete(self, session, _id, dry_run=False, not_send_msg=None):
1012 if not self.multiproject:
1013 filter_q = {}
1014 else:
1015 filter_q = self._get_project_filter(session)
1016 filter_q[self.id_field(self.topic, _id)] = _id
1017 item_content = self.db.get_one(self.topic, filter_q)
1018 item_content["state"] = "IN_DELETION"
1019 item_content["operatingState"] = "PROCESSING"
1020 self.check_conflict_on_del(session, _id, item_content)
1021 item_content["current_operation"] = None
1022 self.format_on_operation(
1023 item_content,
1024 "delete",
1025 None,
1026 )
1027 op_id = item_content["current_operation"]
1028 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1029 self._send_msg(
1030 "delete", {"oka_id": _id, "operation_id": op_id}, not_send_msg=not_send_msg
1031 )
yshahffcac5f2024-08-19 12:49:07 +00001032 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001033
1034 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1035 # _remove_envelop
1036 if indata:
1037 if "userDefinedData" in indata:
1038 indata = indata["userDefinedData"]
1039
1040 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1041
1042 self._update_input_with_kwargs(content, kwargs)
1043 content = BaseTopic._validate_input_new(
1044 self, input=kwargs, force=session["force"]
1045 )
1046
1047 self.check_unique_name(session, content["name"])
1048 operation_params = {}
1049 for content_key, content_value in content.items():
1050 operation_params[content_key] = content_value
1051 self.format_on_new(
1052 content, session["project_id"], make_public=session["public"]
1053 )
1054 content["current_operation"] = None
1055 self.format_on_operation(
1056 content,
1057 operation_type="create",
1058 operation_params=operation_params,
1059 )
1060 content["git_name"] = self.create_gitname(content, session)
1061 _id = self.db.create(self.topic, content)
1062 rollback.append({"topic": self.topic, "_id": _id})
1063 return _id, None
1064
1065 def upload_content(self, session, _id, indata, kwargs, headers):
1066 current_desc = self.show(session, _id)
1067
1068 compressed = None
1069 content_type = headers.get("Content-Type")
1070 if (
1071 content_type
1072 and "application/gzip" in content_type
1073 or "application/x-gzip" in content_type
1074 ):
1075 compressed = "gzip"
1076 if content_type and "application/zip" in content_type:
1077 compressed = "zip"
1078 filename = headers.get("Content-Filename")
1079 if not filename and compressed:
1080 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1081 elif not filename:
1082 filename = "package"
1083
1084 revision = 1
1085 if "revision" in current_desc["_admin"]:
1086 revision = current_desc["_admin"]["revision"] + 1
1087
1088 file_pkg = None
1089 fs_rollback = []
1090
1091 try:
1092 start = 0
1093 # Rather than using a temp folder, we will store the package in a folder based on
1094 # the current revision.
1095 proposed_revision_path = _id + ":" + str(revision)
1096 # all the content is upload here and if ok, it is rename from id_ to is folder
1097
1098 if start:
1099 if not self.fs.file_exists(proposed_revision_path, "dir"):
1100 raise EngineException(
1101 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1102 )
1103 else:
1104 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1105 self.fs.mkdir(proposed_revision_path)
1106 fs_rollback.append(proposed_revision_path)
1107
1108 storage = self.fs.get_params()
1109 storage["folder"] = proposed_revision_path
yshah74eb26a2024-09-24 18:16:07 +00001110 storage["zipfile"] = filename
yshah53cc9eb2024-07-05 13:06:31 +00001111
1112 file_path = (proposed_revision_path, filename)
1113 file_pkg = self.fs.file_open(file_path, "a+b")
1114
yshah53cc9eb2024-07-05 13:06:31 +00001115 if isinstance(indata, dict):
1116 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1117 file_pkg.write(indata_text.encode(encoding="utf-8"))
1118 else:
1119 indata_len = 0
1120 indata = indata.file
1121 while True:
1122 indata_text = indata.read(4096)
1123 indata_len += len(indata_text)
1124 if not indata_text:
1125 break
1126 file_pkg.write(indata_text)
1127
yshah53cc9eb2024-07-05 13:06:31 +00001128 # Need to close the file package here so it can be copied from the
1129 # revision to the current, unrevisioned record
1130 if file_pkg:
1131 file_pkg.close()
1132 file_pkg = None
1133
1134 # Fetch both the incoming, proposed revision and the original revision so we
1135 # can call a validate method to compare them
1136 current_revision_path = _id + "/"
1137 self.fs.sync(from_path=current_revision_path)
1138 self.fs.sync(from_path=proposed_revision_path)
1139
garciadeblasf36540c2024-09-23 13:03:00 +02001140 # Is this required?
yshah53cc9eb2024-07-05 13:06:31 +00001141 if revision > 1:
1142 try:
1143 self._validate_descriptor_changes(
1144 _id,
1145 filename,
1146 current_revision_path,
1147 proposed_revision_path,
1148 )
1149 except Exception as e:
1150 shutil.rmtree(
1151 self.fs.path + current_revision_path, ignore_errors=True
1152 )
1153 shutil.rmtree(
1154 self.fs.path + proposed_revision_path, ignore_errors=True
1155 )
1156 # Only delete the new revision. We need to keep the original version in place
1157 # as it has not been changed.
1158 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1159 raise e
1160
1161 indata = self._remove_envelop(indata)
1162
1163 # Override descriptor with query string kwargs
1164 if kwargs:
1165 self._update_input_with_kwargs(indata, kwargs)
1166
1167 current_desc["_admin"]["storage"] = storage
1168 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1169 current_desc["_admin"]["operationalState"] = "ENABLED"
1170 current_desc["_admin"]["modified"] = time()
1171 current_desc["_admin"]["revision"] = revision
1172
1173 deep_update_rfc7396(current_desc, indata)
1174
1175 # Copy the revision to the active package name by its original id
1176 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1177 os.rename(
1178 self.fs.path + proposed_revision_path,
1179 self.fs.path + current_revision_path,
1180 )
1181 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1182 self.fs.mkdir(current_revision_path)
1183 self.fs.reverse_sync(from_path=current_revision_path)
1184
1185 shutil.rmtree(self.fs.path + _id)
1186 kwargs = {}
1187 kwargs["package"] = filename
1188 if headers["Method"] == "POST":
1189 current_desc["state"] = "IN_CREATION"
1190 elif headers["Method"] in ("PUT", "PATCH"):
1191 current_desc["current_operation"] = None
1192 self.format_on_operation(
1193 current_desc,
1194 "update",
1195 kwargs,
1196 )
1197 current_desc["operatingState"] = "PROCESSING"
1198 current_desc["resourceState"] = "IN_PROGRESS"
1199
1200 self.db.replace(self.topic, _id, current_desc)
1201
1202 # Store a copy of the package as a point in time revision
1203 revision_desc = dict(current_desc)
1204 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1205 self.db.create(self.topic + "_revisions", revision_desc)
1206 fs_rollback = []
1207
1208 op_id = current_desc["current_operation"]
1209 if headers["Method"] == "POST":
1210 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1211 elif headers["Method"] == "PUT" or "PATCH":
1212 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1213
1214 return True
1215
1216 except EngineException:
1217 raise
1218 finally:
1219 if file_pkg:
1220 file_pkg.close()
1221 for file in fs_rollback:
1222 self.fs.file_delete(file, ignore_non_exist=True)