blob: 8ce81f79a26b1ab54afcb37896b003d45da6f85f [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
18import tarfile
19import shutil
20import os
rshri2d386cb2024-07-05 14:35:51 +000021from http import HTTPStatus
yshah53cc9eb2024-07-05 13:06:31 +000022from uuid import uuid4
rshri2d386cb2024-07-05 14:35:51 +000023
yshah53cc9eb2024-07-05 13:06:31 +000024from time import time
rshri2d386cb2024-07-05 14:35:51 +000025from osm_nbi.base_topic import BaseTopic, EngineException
26
yshah53cc9eb2024-07-05 13:06:31 +000027from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000028from osm_nbi.validation import (
29 ValidationError,
30 clustercreation_new_schema,
31 infra_controller_profile_create_new_schema,
32 infra_config_profile_create_new_schema,
33 app_profile_create_new_schema,
34 resource_profile_create_new_schema,
35 infra_controller_profile_create_edit_schema,
36 infra_config_profile_create_edit_schema,
37 app_profile_create_edit_schema,
38 resource_profile_create_edit_schema,
39 k8scluster_new_schema,
40 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000041 ksu_schema,
42 oka_schema,
rshri2d386cb2024-07-05 14:35:51 +000043)
yshah53cc9eb2024-07-05 13:06:31 +000044from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000045from osm_common.msgbase import MsgException
46from osm_common.fsbase import FsException
47
yshah53cc9eb2024-07-05 13:06:31 +000048__author__ = (
49 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
50 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
51)
rshri2d386cb2024-07-05 14:35:51 +000052
53
54class InfraContTopic(BaseTopic):
55 topic = "k8sinfra_controller"
56 topic_msg = "k8s_infra_controller"
57 schema_new = infra_controller_profile_create_new_schema
58 schema_edit = infra_controller_profile_create_edit_schema
59
60 def __init__(self, db, fs, msg, auth):
61 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000062
63 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
64 # To create the new infra controller profile
65 return self.new_profile(rollback, session, indata, kwargs, headers)
66
67 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
68 # To create the default infra controller profile while creating the cluster
69 return self.default_profile(rollback, session, indata, kwargs, headers)
70
71 def delete(self, session, _id, dry_run=False, not_send_msg=None):
72 item_content = self.db.get_one(self.topic, {"_id": _id})
73 if item_content.get("default", False):
74 raise EngineException(
75 "Cannot delete item because it is marked as default",
76 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
77 )
78 # Before deleting, detach the profile from the associated clusters.
79 self.detach(session, _id, profile_type="infra_controller_profiles")
80 # To delete the infra controller profile
81 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +000082 return _id
rshri2d386cb2024-07-05 14:35:51 +000083
84
85class InfraConfTopic(BaseTopic):
86 topic = "k8sinfra_config"
87 topic_msg = "k8s_infra_config"
88 schema_new = infra_config_profile_create_new_schema
89 schema_edit = infra_config_profile_create_edit_schema
90
91 def __init__(self, db, fs, msg, auth):
92 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000093
94 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
95 # To create the new infra config profile
96 return self.new_profile(rollback, session, indata, kwargs, headers)
97
98 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
99 # To create the default infra config profile while creating the cluster
100 return self.default_profile(rollback, session, indata, kwargs, headers)
101
102 def delete(self, session, _id, dry_run=False, not_send_msg=None):
103 item_content = self.db.get_one(self.topic, {"_id": _id})
104 if item_content.get("default", False):
105 raise EngineException(
106 "Cannot delete item because it is marked as default",
107 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
108 )
109 # Before deleting, detach the profile from the associated clusters.
110 self.detach(session, _id, profile_type="infra_config_profiles")
111 # To delete the infra config profile
112 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000113 return _id
rshri2d386cb2024-07-05 14:35:51 +0000114
115
116class AppTopic(BaseTopic):
117 topic = "k8sapp"
118 topic_msg = "k8s_app"
119 schema_new = app_profile_create_new_schema
120 schema_edit = app_profile_create_edit_schema
121
122 def __init__(self, db, fs, msg, auth):
123 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000124
125 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
126 # To create the new app profile
127 return self.new_profile(rollback, session, indata, kwargs, headers)
128
129 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
130 # To create the default app profile while creating the cluster
131 return self.default_profile(rollback, session, indata, kwargs, headers)
132
133 def delete(self, session, _id, dry_run=False, not_send_msg=None):
134 item_content = self.db.get_one(self.topic, {"_id": _id})
135 if item_content.get("default", False):
136 raise EngineException(
137 "Cannot delete item because it is marked as default",
138 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
139 )
140 # Before deleting, detach the profile from the associated clusters.
141 self.detach(session, _id, profile_type="app_profiles")
142 # To delete the app profile
143 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000144 return _id
rshri2d386cb2024-07-05 14:35:51 +0000145
146
147class ResourceTopic(BaseTopic):
148 topic = "k8sresource"
149 topic_msg = "k8s_resource"
150 schema_new = resource_profile_create_new_schema
151 schema_edit = resource_profile_create_edit_schema
152
153 def __init__(self, db, fs, msg, auth):
154 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000155
156 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
157 # To create the new resource profile
158 return self.new_profile(rollback, session, indata, kwargs, headers)
159
160 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
161 # To create the default resource profile while creating the cluster
162 return self.default_profile(rollback, session, indata, kwargs, headers)
163
164 def delete(self, session, _id, dry_run=False, not_send_msg=None):
165 item_content = self.db.get_one(self.topic, {"_id": _id})
166 if item_content.get("default", False):
167 raise EngineException(
168 "Cannot delete item because it is marked as default",
169 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
170 )
171 # Before deleting, detach the profile from the associated clusters.
172 self.detach(session, _id, profile_type="resource_profiles")
173 # To delete the resource profile
174 super().delete(session, _id, not_send_msg=not_send_msg)
yshahffcac5f2024-08-19 12:49:07 +0000175 return _id
rshri2d386cb2024-07-05 14:35:51 +0000176
177
178class K8sTopic(BaseTopic):
179 topic = "clusters"
180 topic_msg = "cluster"
181 schema_new = clustercreation_new_schema
182 schema_edit = attach_dettach_profile_schema
183
184 def __init__(self, db, fs, msg, auth):
185 BaseTopic.__init__(self, db, fs, msg, auth)
186 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
187 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
188 self.resource_topic = ResourceTopic(db, fs, msg, auth)
189 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000190
191 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
192 """
193 Creates a new k8scluster into database.
194 :param rollback: list to append the created items at database in case a rollback must be done
195 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
196 :param indata: params to be used for the k8cluster
197 :param kwargs: used to override the indata
198 :param headers: http request headers
199 :return: the _id of k8scluster created at database. Or an exception of type
200 EngineException, ValidationError, DbException, FsException, MsgException.
201 Note: Exceptions are not captured on purpose. They should be captured at called
202 """
203 step = "checking quotas" # first step must be defined outside try
204 try:
205 self.check_quota(session)
206 step = "name unique check"
207 self.check_unique_name(session, indata["name"])
208 step = "validating input parameters"
209 cls_request = self._remove_envelop(indata)
210 self._update_input_with_kwargs(cls_request, kwargs)
211 cls_request = self._validate_input_new(cls_request, session["force"])
212 operation_params = cls_request
213
214 step = "filling cluster details from input data"
215 cls_create = self._create_cluster(
216 cls_request, rollback, session, indata, kwargs, headers
217 )
218
219 step = "creating cluster at database"
220 self.format_on_new(
221 cls_create, session["project_id"], make_public=session["public"]
222 )
223 cls_create["current_operation"] = None
224 op_id = self.format_on_operation(
225 cls_create,
226 "create",
227 operation_params,
228 )
229 _id = self.db.create(self.topic, cls_create)
garciadeblas6e88d9c2024-08-15 10:55:04 +0200230 pubkey, privkey = self._generate_age_key()
231 cls_create["age_pubkey"] = self.db.encrypt(
232 pubkey, schema_version="1.11", salt=_id
233 )
234 cls_create["age_privkey"] = self.db.encrypt(
235 privkey, schema_version="1.11", salt=_id
236 )
237 # TODO: set age_pubkey and age_privkey in the default profiles
rshri2d386cb2024-07-05 14:35:51 +0000238 rollback.append({"topic": self.topic, "_id": _id})
239 self.db.set_one("clusters", {"_id": _id}, cls_create)
240 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
241
242 return _id, None
243 except (
244 ValidationError,
245 EngineException,
246 DbException,
247 MsgException,
248 FsException,
249 ) as e:
250 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
251
252 def _create_cluster(self, cls_request, rollback, session, indata, kwargs, headers):
253 # Check whether the region name and resource group have been given
garciadeblasc3a6c492024-08-15 10:00:42 +0200254 region_given = "region_name" in indata
255 resource_group_given = "resource_group" in indata
rshri2d386cb2024-07-05 14:35:51 +0000256
257 # Get the vim_account details
258 vim_account_details = self.db.get_one(
259 "vim_accounts", {"name": cls_request["vim_account"]}
260 )
261
garciadeblasc3a6c492024-08-15 10:00:42 +0200262 # Check whether the region name and resource group have been given
263 if not region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000264 region_name = vim_account_details["config"]["region_name"]
265 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200266 elif region_given and not resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000267 region_name = cls_request["region_name"]
268 resource_group = vim_account_details["config"]["resource_group"]
garciadeblasc3a6c492024-08-15 10:00:42 +0200269 elif not region_given and resource_group_given:
rshri2d386cb2024-07-05 14:35:51 +0000270 region_name = vim_account_details["config"]["region_name"]
271 resource_group = cls_request["resource_group"]
272 else:
273 region_name = cls_request["region_name"]
274 resource_group = cls_request["resource_group"]
275
276 cls_desc = {
277 "name": cls_request["name"],
278 "vim_account": self.check_vim(session, cls_request["vim_account"]),
279 "k8s_version": cls_request["k8s_version"],
280 "node_size": cls_request["node_size"],
281 "node_count": cls_request["node_count"],
282 "description": cls_request["description"],
283 "region_name": region_name,
284 "resource_group": resource_group,
285 "infra_controller_profiles": [
286 self._create_default_profiles(
287 rollback, session, indata, kwargs, headers, self.infra_contr_topic
288 )
289 ],
290 "infra_config_profiles": [
291 self._create_default_profiles(
292 rollback, session, indata, kwargs, headers, self.infra_conf_topic
293 )
294 ],
295 "resource_profiles": [
296 self._create_default_profiles(
297 rollback, session, indata, kwargs, headers, self.resource_topic
298 )
299 ],
300 "app_profiles": [
301 self._create_default_profiles(
302 rollback, session, indata, kwargs, headers, self.app_topic
303 )
304 ],
305 "created": "true",
306 "state": "IN_CREATION",
307 "operatingState": "PROCESSING",
308 "git_name": self.create_gitname(cls_request, session),
309 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
310 }
311 return cls_desc
312
313 def check_vim(self, session, name):
314 try:
315 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
316 if vim_account_details is not None:
317 return name
318 except ValidationError as e:
319 raise EngineException(
320 e,
321 HTTPStatus.UNPROCESSABLE_ENTITY,
322 )
323
324 def _create_default_profiles(
325 self, rollback, session, indata, kwargs, headers, topic
326 ):
327 topic = self.to_select_topic(topic)
328 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
329 return default_profiles
330
331 def to_select_topic(self, topic):
332 if topic == "infra_controller_profiles":
333 topic = self.infra_contr_topic
334 elif topic == "infra_config_profiles":
335 topic = self.infra_conf_topic
336 elif topic == "resource_profiles":
337 topic = self.resource_topic
338 elif topic == "app_profiles":
339 topic = self.app_topic
340 return topic
341
342 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
343 try:
344 filter_q = self._get_project_filter(session)
345 filter_q[self.id_field(self.topic, _id)] = _id
346 content = self.db.get_one(self.topic, filter_q)
347 existing_profiles = []
348 topic = None
349 topic = self.to_select_topic(profile)
350 for profile_id in content[profile]:
351 data = topic.show(session, profile_id, filter_q, api_req)
352 existing_profiles.append(data)
353 return existing_profiles
354 except ValidationError as e:
355 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
356
357 def state_check(self, profile_id, session, topic):
358 topic = self.to_select_topic(topic)
359 content = topic.show(session, profile_id, filter_q=None, api_req=False)
360 state = content["state"]
361 if state == "CREATED":
362 return
363 else:
364 raise EngineException(
365 f" {profile_id} is not in created state",
366 HTTPStatus.UNPROCESSABLE_ENTITY,
367 )
368
369 def edit(self, session, _id, item, indata=None, kwargs=None):
370 indata = self._remove_envelop(indata)
371 indata = self._validate_input_edit(indata, content=None, force=session["force"])
372 if indata.get("add_profile"):
373 self.add_profile(session, _id, item, indata)
374 elif indata.get("remove_profile"):
375 self.remove_profile(session, _id, item, indata)
376 else:
377 error_msg = "Add / remove operation is only applicable"
378 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
379
380 def add_profile(self, session, _id, item, indata=None):
381 indata = self._remove_envelop(indata)
382 operation_params = indata
383 profile_id = indata["add_profile"][0]["id"]
384 # check state
385 self.state_check(profile_id, session, item)
386 filter_q = self._get_project_filter(session)
387 filter_q[self.id_field(self.topic, _id)] = _id
388 content = self.db.get_one(self.topic, filter_q)
389 profile_list = content[item]
390
391 if profile_id not in profile_list:
392 content["operatingState"] = "PROCESSING"
393 content["current_operation"] = None
394 op_id = self.format_on_operation(
395 content,
396 "add",
397 operation_params,
398 )
399 self.db.set_one("clusters", {"_id": content["_id"]}, content)
400 self._send_msg(
401 "add",
402 {
403 "cluster_id": _id,
404 "profile_id": profile_id,
405 "profile_type": item,
406 "operation_id": op_id,
407 },
408 )
409 else:
410 raise EngineException(
411 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
412 )
413
414 def _get_default_profiles(self, session, topic):
415 topic = self.to_select_topic(topic)
416 existing_profiles = topic.list(session, filter_q=None, api_req=False)
417 default_profiles = [
418 profile["_id"]
419 for profile in existing_profiles
420 if profile.get("default", False)
421 ]
422 return default_profiles
423
424 def remove_profile(self, session, _id, item, indata):
425 indata = self._remove_envelop(indata)
426 operation_params = indata
427 profile_id = indata["remove_profile"][0]["id"]
428 filter_q = self._get_project_filter(session)
429 filter_q[self.id_field(self.topic, _id)] = _id
430 content = self.db.get_one(self.topic, filter_q)
431 profile_list = content[item]
432
433 default_profiles = self._get_default_profiles(session, item)
434
435 if profile_id in default_profiles:
436 raise EngineException(
437 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
438 )
439 if profile_id in profile_list:
440 content["operatingState"] = "PROCESSING"
441 content["current_operation"] = None
442 op_id = self.format_on_operation(
443 content,
444 "remove",
445 operation_params,
446 )
447 self.db.set_one("clusters", {"_id": content["_id"]}, content)
448 self._send_msg(
449 "remove",
450 {
451 "cluster_id": _id,
452 "profile_id": profile_id,
453 "profile_type": item,
454 "operation_id": op_id,
455 },
456 )
457 else:
458 raise EngineException(
459 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
460 )
461
yshah53cc9eb2024-07-05 13:06:31 +0000462 def get_cluster_info(self, session, _id, item):
463 if not self.multiproject:
464 filter_db = {}
465 else:
466 filter_db = self._get_project_filter(session)
467 # To allow project&user addressing by name AS WELL AS _id
468 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
469 data = self.db.get_one(self.topic, filter_db)
470 self._send_msg(item, {"_id": _id})
471 return data
472
473 def update_cluster(self, session, _id, item, indata):
474 if not self.multiproject:
475 filter_db = {}
476 else:
477 filter_db = self._get_project_filter(session)
478 # To allow project&user addressing by name AS WELL AS _id
479 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
480 data = self.db.get_one(self.topic, filter_db)
481 data["operatingState"] = "PROCESSING"
482 data["resourceState"] = "IN_PROGRESS"
483 operation_params = indata
484 self.format_on_operation(
485 data,
486 item,
487 operation_params,
488 )
489 self.db.set_one(self.topic, {"_id": _id}, data)
490 op_id = data["current_operation"]
491 data = {"cluster_id": _id, "operation_id": op_id}
492 self._send_msg(item, data)
493 return op_id
494
rshri2d386cb2024-07-05 14:35:51 +0000495
496class K8saddTopic(BaseTopic):
497 topic = "clusters"
498 topic_msg = "cluster"
499 schema_new = k8scluster_new_schema
500
501 def __init__(self, db, fs, msg, auth):
502 BaseTopic.__init__(self, db, fs, msg, auth)
503
504 def add(self, rollback, session, indata, kwargs=None, headers=None):
505 step = "checking quotas"
506 try:
507 self.check_quota(session)
508 step = "name unique check"
509 self.check_unique_name(session, indata["name"])
510 step = "validating input parameters"
511 cls_add_request = self._remove_envelop(indata)
512 self._update_input_with_kwargs(cls_add_request, kwargs)
513 cls_add_request = self._validate_input_new(
514 cls_add_request, session["force"]
515 )
516 operation_params = cls_add_request
517
518 step = "filling cluster details from input data"
519 cls_add = self._add_cluster(cls_add_request, session)
520
521 step = "creating cluster at database"
522 self.format_on_new(
523 cls_add, session["project_id"], make_public=session["public"]
524 )
525 cls_add["current_operation"] = None
526 op_id = self.format_on_operation(
527 cls_add,
528 "register",
529 operation_params,
530 )
531 _id = self.db.create(self.topic, cls_add)
532 self.db.set_one(self.topic, {"_id": _id}, cls_add)
533 rollback.append({"topic": self.topic, "_id": _id})
534 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
535 return _id, None
536 except (
537 ValidationError,
538 EngineException,
539 DbException,
540 MsgException,
541 FsException,
542 ) as e:
543 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
544
545 def _add_cluster(self, cls_add_request, session):
546 cls_add = {
547 "name": cls_add_request["name"],
548 "description": cls_add_request["description"],
549 "credentials": cls_add_request["credentials"],
550 "vim_account": cls_add_request["vim_account"],
551 "k8s_version": cls_add_request["k8s_version"],
552 "nets": cls_add_request["nets"],
553 "created": "false",
554 "state": "IN_CREATION",
555 "operatingState": "PROCESSING",
556 "git_name": self.create_gitname(cls_add_request, session),
557 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
558 }
559 return cls_add
560
561 def remove(self, session, _id, dry_run=False, not_send_msg=None):
562 """
563 Delete item by its internal _id
564 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
565 :param _id: server internal id
566 :param dry_run: make checking but do not delete
567 :param not_send_msg: To not send message (False) or store content (list) instead
568 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
569 """
570
571 # To allow addressing projects and users by name AS WELL AS by _id
572 if not self.multiproject:
573 filter_q = {}
574 else:
575 filter_q = self._get_project_filter(session)
576 filter_q[self.id_field(self.topic, _id)] = _id
577 item_content = self.db.get_one(self.topic, filter_q)
578
579 item_content["operatingState"] = "PROCESSING"
580 item_content["current_operation"] = None
581 op_id = self.format_on_operation(
582 item_content,
583 "deregister",
584 None,
585 )
586 self.db.set_one(self.topic, {"_id": _id}, item_content)
587
588 self.check_conflict_on_del(session, _id, item_content)
589 if dry_run:
590 return None
591
592 if self.multiproject and session["project_id"]:
593 # remove reference from project_read if there are more projects referencing it. If it last one,
594 # do not remove reference, but delete
595 other_projects_referencing = next(
596 (
597 p
598 for p in item_content["_admin"]["projects_read"]
599 if p not in session["project_id"] and p != "ANY"
600 ),
601 None,
602 )
603
604 # check if there are projects referencing it (apart from ANY, that means, public)....
605 if other_projects_referencing:
606 # remove references but not delete
607 update_dict_pull = {
608 "_admin.projects_read": session["project_id"],
609 "_admin.projects_write": session["project_id"],
610 }
611 self.db.set_one(
612 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
613 )
614 return None
615 else:
616 can_write = next(
617 (
618 p
619 for p in item_content["_admin"]["projects_write"]
620 if p == "ANY" or p in session["project_id"]
621 ),
622 None,
623 )
624 if not can_write:
625 raise EngineException(
626 "You have not write permission to delete it",
627 http_code=HTTPStatus.UNAUTHORIZED,
628 )
629
630 # delete
631 self._send_msg(
632 "deregister",
633 {"cluster_id": _id, "operation_id": op_id},
634 not_send_msg=not_send_msg,
635 )
636 return None
yshah53cc9eb2024-07-05 13:06:31 +0000637
638
639class KsusTopic(BaseTopic):
640 topic = "ksus"
641 okapkg_topic = "okas"
642 infra_topic = "k8sinfra"
643 topic_msg = "ksu"
644 schema_new = ksu_schema
645 schema_edit = ksu_schema
646
647 def __init__(self, db, fs, msg, auth):
648 BaseTopic.__init__(self, db, fs, msg, auth)
649 self.logger = logging.getLogger("nbi.ksus")
650
651 @staticmethod
652 def format_on_new(content, project_id=None, make_public=False):
653 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
654 content["state"] = "IN_CREATION"
655 content["operatingState"] = "PROCESSING"
656 content["resourceState"] = "IN_PROGRESS"
657
658 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
659 _id_list = []
660 op_id = str(uuid4())
661 for ksus in indata["ksus"]:
662 content = ksus
663 oka = content["oka"][0]
664 oka_flag = ""
665 if oka["_id"]:
666 oka_flag = "_id"
667 elif oka["sw_catalog_path"]:
668 oka_flag = "sw_catalog_path"
669
670 for okas in content["oka"]:
671 if okas["_id"] and okas["sw_catalog_path"]:
672 raise EngineException(
673 "Cannot create ksu with both OKA and SW catalog path",
674 HTTPStatus.UNPROCESSABLE_ENTITY,
675 )
676 if not okas["sw_catalog_path"]:
677 okas.pop("sw_catalog_path")
678 elif not okas["_id"]:
679 okas.pop("_id")
680 if oka_flag not in okas.keys():
681 raise EngineException(
682 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
683 HTTPStatus.UNPROCESSABLE_ENTITY,
684 )
685
686 # Override descriptor with query string kwargs
687 content = self._remove_envelop(content)
688 self._update_input_with_kwargs(content, kwargs)
689 content = self._validate_input_new(input=content, force=session["force"])
690
691 # Check for unique name
692 self.check_unique_name(session, content["name"])
693
694 self.check_conflict_on_new(session, content)
695
696 operation_params = {}
697 for content_key, content_value in content.items():
698 operation_params[content_key] = content_value
699 self.format_on_new(
700 content, project_id=session["project_id"], make_public=session["public"]
701 )
702 content["current_operation"] = op_id
703 op_id = self.format_on_operation(
704 content,
705 operation_type="create",
706 operation_params=operation_params,
707 )
708 content["git_name"] = self.create_gitname(content, session)
709
710 # Update Oka_package usage state
711 for okas in content["oka"]:
712 if "_id" in okas.keys():
713 self.update_usage_state(session, okas)
714
715 _id = self.db.create(self.topic, content)
716 rollback.append({"topic": self.topic, "_id": _id})
717
718 if not op_id:
719 op_id = content["current_operation"]
720 _id_list.append(_id)
721 data = {"ksus_list": _id_list, "operation_id": op_id}
722 self._send_msg("create", data)
723 return _id_list, op_id
724
725 def clone(self, rollback, session, _id, indata, kwargs, headers):
726 filter_db = self._get_project_filter(session)
727 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
728 data = self.db.get_one(self.topic, filter_db)
729
730 data["current_operation"] = None
731 op_id = self.format_on_operation(
732 data,
733 "clone",
734 indata,
735 )
736 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
737 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
738 return op_id
739
740 def update_usage_state(self, session, oka_content):
741 _id = oka_content["_id"]
742 filter_db = self._get_project_filter(session)
743 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
744
745 data = self.db.get_one(self.okapkg_topic, filter_db)
746 if data["_admin"]["usageState"] == "NOT_IN_USE":
747 usage_state_update = {
748 "_admin.usageState": "IN_USE",
749 }
750 self.db.set_one(
751 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
752 )
753
754 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
755 indata = self._remove_envelop(indata)
756
757 # Override descriptor with query string kwargs
758 if kwargs:
759 self._update_input_with_kwargs(indata, kwargs)
760 try:
761 if indata and session.get("set_project"):
762 raise EngineException(
763 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
764 HTTPStatus.UNPROCESSABLE_ENTITY,
765 )
766 # TODO self._check_edition(session, indata, _id, force)
767 if not content:
768 content = self.show(session, _id)
769 indata = self._validate_input_edit(
770 input=indata, content=content, force=session["force"]
771 )
772 operation_params = indata
773 deep_update_rfc7396(content, indata)
774
775 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
776 _id = content.get("_id") or _id
777 content["current_operation"] = None
778 self.format_on_operation(
779 content,
780 "move",
781 operation_params,
782 )
783 if content.get("_admin"):
784 now = time()
785 content["_admin"]["modified"] = now
786 content["operatingState"] = "PROCESSING"
787 content["resourceState"] = "IN_PROGRESS"
788
789 self.db.replace(self.topic, _id, content)
790
791 op_id = content["current_operation"]
792 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
793 self._send_msg("move", data)
794 return op_id
795 except ValidationError as e:
796 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
797
798 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
799 if final_content["name"] != edit_content["name"]:
800 self.check_unique_name(session, edit_content["name"])
801 return final_content
802
803 @staticmethod
804 def format_on_edit(final_content, edit_content):
805 BaseTopic.format_on_operation(
806 final_content,
807 "update",
808 edit_content,
809 )
810 final_content["operatingState"] = "PROCESSING"
811 final_content["resourceState"] = "IN_PROGRESS"
812 if final_content.get("_admin"):
813 now = time()
814 final_content["_admin"]["modified"] = now
815 return final_content["current_operation"]
816
817 def edit(self, session, _id, indata, kwargs):
818 _id_list = []
819 op_id = str(uuid4())
820 if _id == "update":
821 for ksus in indata["ksus"]:
822 content = ksus
823 _id = content["_id"]
824 _id_list.append(_id)
825 content.pop("_id")
826 op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
827 else:
828 content = indata
829 _id_list.append(_id)
830 op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
831
832 data = {"ksus_list": _id_list, "operation_id": op_id}
833 self._send_msg("edit", data)
yshah53cc9eb2024-07-05 13:06:31 +0000834
835 def edit_ksu(self, session, _id, op_id, indata, kwargs):
836 content = None
837 indata = self._remove_envelop(indata)
838
839 # Override descriptor with query string kwargs
840 if kwargs:
841 self._update_input_with_kwargs(indata, kwargs)
842 try:
843 if indata and session.get("set_project"):
844 raise EngineException(
845 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
846 HTTPStatus.UNPROCESSABLE_ENTITY,
847 )
848 # TODO self._check_edition(session, indata, _id, force)
849 if not content:
850 content = self.show(session, _id)
851
852 for okas in indata["oka"]:
853 if not okas["_id"]:
854 okas.pop("_id")
855 if not okas["sw_catalog_path"]:
856 okas.pop("sw_catalog_path")
857
858 indata = self._validate_input_edit(indata, content, force=session["force"])
859
860 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
861 _id = content.get("_id") or _id
862
863 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
864 content["current_operation"] = op_id
865 op_id = self.format_on_edit(content, indata)
866 self.db.replace(self.topic, _id, content)
867 return op_id
868 except ValidationError as e:
869 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
870
871 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
872 _id_list = []
873 op_id = str(uuid4())
874 if _id == "delete":
875 for ksus in indata["ksus"]:
876 content = ksus
877 _id = content["_id"]
878 _id_list.append(_id)
879 content.pop("_id")
880 op_id = self.delete(session, _id, op_id)
881 else:
882 _id_list.append(_id)
883 op_id = self.delete(session, _id, op_id)
884
885 data = {"ksus_list": _id_list, "operation_id": op_id}
886 self._send_msg("delete", data)
887 return op_id
888
889 def delete(self, session, _id, op_id):
890 if not self.multiproject:
891 filter_q = {}
892 else:
893 filter_q = self._get_project_filter(session)
894 filter_q[self.id_field(self.topic, _id)] = _id
895 item_content = self.db.get_one(self.topic, filter_q)
896 item_content["state"] = "IN_DELETION"
897 item_content["operatingState"] = "PROCESSING"
898 item_content["resourceState"] = "IN_PROGRESS"
899 item_content["current_operation"] = op_id
900 self.format_on_operation(
901 item_content,
902 "delete",
903 None,
904 )
905 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
906
907 if item_content["oka"][0].get("_id"):
908 used_oka = {}
909 existing_oka = []
910 for okas in item_content["oka"]:
911 used_oka["_id"] = okas["_id"]
912
913 filter = self._get_project_filter(session)
914 data = self.db.get_list(self.topic, filter)
915
916 if data:
917 for ksus in data:
918 if ksus["_id"] != _id:
919 for okas in ksus["oka"]:
920 if okas["_id"] not in existing_oka:
921 existing_oka.append(okas["_id"])
922
923 if used_oka:
924 for oka, oka_id in used_oka.items():
925 if oka_id not in existing_oka:
926 self.db.set_one(
927 self.okapkg_topic,
928 {"_id": oka_id},
929 {"_admin.usageState": "NOT_IN_USE"},
930 )
931 return op_id
932
933
934class OkaTopic(DescriptorTopic):
935 topic = "okas"
936 topic_msg = "oka"
937 schema_new = oka_schema
938 schema_edit = oka_schema
939
940 def __init__(self, db, fs, msg, auth):
941 super().__init__(db, fs, msg, auth)
942 self.logger = logging.getLogger("nbi.oka")
943
944 @staticmethod
945 def format_on_new(content, project_id=None, make_public=False):
946 DescriptorTopic.format_on_new(
947 content, project_id=project_id, make_public=make_public
948 )
949 content["state"] = "PENDING_CONTENT"
950 content["operatingState"] = "PROCESSING"
951 content["resourceState"] = "IN_PROGRESS"
952
953 def check_conflict_on_del(self, session, _id, db_content):
954 usage_state = db_content["_admin"]["usageState"]
955 if usage_state == "IN_USE":
956 raise EngineException(
957 "There is a KSU using this package",
958 http_code=HTTPStatus.CONFLICT,
959 )
960
961 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
962 if (
963 final_content["name"] == edit_content["name"]
964 and final_content["description"] == edit_content["description"]
965 ):
966 raise EngineException(
967 "No update",
968 http_code=HTTPStatus.CONFLICT,
969 )
970 if final_content["name"] != edit_content["name"]:
971 self.check_unique_name(session, edit_content["name"])
972 return final_content
973
974 def edit(self, session, _id, indata=None, kwargs=None, content=None):
975 indata = self._remove_envelop(indata)
976
977 # Override descriptor with query string kwargs
978 if kwargs:
979 self._update_input_with_kwargs(indata, kwargs)
980 try:
981 if indata and session.get("set_project"):
982 raise EngineException(
983 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
984 HTTPStatus.UNPROCESSABLE_ENTITY,
985 )
986 # TODO self._check_edition(session, indata, _id, force)
987 if not content:
988 content = self.show(session, _id)
989
990 indata = self._validate_input_edit(indata, content, force=session["force"])
991
992 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
993 _id = content.get("_id") or _id
994
995 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
996 op_id = self.format_on_edit(content, indata)
997 deep_update_rfc7396(content, indata)
998
999 self.db.replace(self.topic, _id, content)
1000 return op_id
1001 except ValidationError as e:
1002 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1003
1004 def delete(self, session, _id, dry_run=False, not_send_msg=None):
1005 if not self.multiproject:
1006 filter_q = {}
1007 else:
1008 filter_q = self._get_project_filter(session)
1009 filter_q[self.id_field(self.topic, _id)] = _id
1010 item_content = self.db.get_one(self.topic, filter_q)
1011 item_content["state"] = "IN_DELETION"
1012 item_content["operatingState"] = "PROCESSING"
1013 self.check_conflict_on_del(session, _id, item_content)
1014 item_content["current_operation"] = None
1015 self.format_on_operation(
1016 item_content,
1017 "delete",
1018 None,
1019 )
1020 op_id = item_content["current_operation"]
1021 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1022 self._send_msg(
1023 "delete", {"oka_id": _id, "operation_id": op_id}, not_send_msg=not_send_msg
1024 )
yshahffcac5f2024-08-19 12:49:07 +00001025 return op_id
yshah53cc9eb2024-07-05 13:06:31 +00001026
1027 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1028 # _remove_envelop
1029 if indata:
1030 if "userDefinedData" in indata:
1031 indata = indata["userDefinedData"]
1032
1033 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1034
1035 self._update_input_with_kwargs(content, kwargs)
1036 content = BaseTopic._validate_input_new(
1037 self, input=kwargs, force=session["force"]
1038 )
1039
1040 self.check_unique_name(session, content["name"])
1041 operation_params = {}
1042 for content_key, content_value in content.items():
1043 operation_params[content_key] = content_value
1044 self.format_on_new(
1045 content, session["project_id"], make_public=session["public"]
1046 )
1047 content["current_operation"] = None
1048 self.format_on_operation(
1049 content,
1050 operation_type="create",
1051 operation_params=operation_params,
1052 )
1053 content["git_name"] = self.create_gitname(content, session)
1054 _id = self.db.create(self.topic, content)
1055 rollback.append({"topic": self.topic, "_id": _id})
1056 return _id, None
1057
1058 def upload_content(self, session, _id, indata, kwargs, headers):
1059 current_desc = self.show(session, _id)
1060
1061 compressed = None
1062 content_type = headers.get("Content-Type")
1063 if (
1064 content_type
1065 and "application/gzip" in content_type
1066 or "application/x-gzip" in content_type
1067 ):
1068 compressed = "gzip"
1069 if content_type and "application/zip" in content_type:
1070 compressed = "zip"
1071 filename = headers.get("Content-Filename")
1072 if not filename and compressed:
1073 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1074 elif not filename:
1075 filename = "package"
1076
1077 revision = 1
1078 if "revision" in current_desc["_admin"]:
1079 revision = current_desc["_admin"]["revision"] + 1
1080
1081 file_pkg = None
1082 fs_rollback = []
1083
1084 try:
1085 start = 0
1086 # Rather than using a temp folder, we will store the package in a folder based on
1087 # the current revision.
1088 proposed_revision_path = _id + ":" + str(revision)
1089 # all the content is upload here and if ok, it is rename from id_ to is folder
1090
1091 if start:
1092 if not self.fs.file_exists(proposed_revision_path, "dir"):
1093 raise EngineException(
1094 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1095 )
1096 else:
1097 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1098 self.fs.mkdir(proposed_revision_path)
1099 fs_rollback.append(proposed_revision_path)
1100
1101 storage = self.fs.get_params()
1102 storage["folder"] = proposed_revision_path
1103
1104 file_path = (proposed_revision_path, filename)
1105 file_pkg = self.fs.file_open(file_path, "a+b")
1106
1107 filename = indata.filename
1108
1109 if isinstance(indata, dict):
1110 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1111 file_pkg.write(indata_text.encode(encoding="utf-8"))
1112 else:
1113 indata_len = 0
1114 indata = indata.file
1115 while True:
1116 indata_text = indata.read(4096)
1117 indata_len += len(indata_text)
1118 if not indata_text:
1119 break
1120 file_pkg.write(indata_text)
1121
1122 # PACKAGE UPLOADED
1123 file_pkg.seek(0, 0)
1124 if compressed == "gzip":
1125 tar = tarfile.open(mode="r", fileobj=file_pkg)
1126 for tarinfo in tar:
1127 tarname = tarinfo.name
1128 tarname_path = tarname.split("/")
1129 self.logger.debug(
1130 "Tarname: {} Tarname Path: {}".format(tarname, tarname_path)
1131 )
1132 storage["zipfile"] = filename
1133 self.fs.file_extract(tar, proposed_revision_path)
1134 else:
1135 content = file_pkg.read()
1136 self.logger.debug("Content: {}".format(content))
1137
1138 # Need to close the file package here so it can be copied from the
1139 # revision to the current, unrevisioned record
1140 if file_pkg:
1141 file_pkg.close()
1142 file_pkg = None
1143
1144 # Fetch both the incoming, proposed revision and the original revision so we
1145 # can call a validate method to compare them
1146 current_revision_path = _id + "/"
1147 self.fs.sync(from_path=current_revision_path)
1148 self.fs.sync(from_path=proposed_revision_path)
1149
1150 if revision > 1:
1151 try:
1152 self._validate_descriptor_changes(
1153 _id,
1154 filename,
1155 current_revision_path,
1156 proposed_revision_path,
1157 )
1158 except Exception as e:
1159 shutil.rmtree(
1160 self.fs.path + current_revision_path, ignore_errors=True
1161 )
1162 shutil.rmtree(
1163 self.fs.path + proposed_revision_path, ignore_errors=True
1164 )
1165 # Only delete the new revision. We need to keep the original version in place
1166 # as it has not been changed.
1167 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1168 raise e
1169
1170 indata = self._remove_envelop(indata)
1171
1172 # Override descriptor with query string kwargs
1173 if kwargs:
1174 self._update_input_with_kwargs(indata, kwargs)
1175
1176 current_desc["_admin"]["storage"] = storage
1177 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1178 current_desc["_admin"]["operationalState"] = "ENABLED"
1179 current_desc["_admin"]["modified"] = time()
1180 current_desc["_admin"]["revision"] = revision
1181
1182 deep_update_rfc7396(current_desc, indata)
1183
1184 # Copy the revision to the active package name by its original id
1185 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1186 os.rename(
1187 self.fs.path + proposed_revision_path,
1188 self.fs.path + current_revision_path,
1189 )
1190 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1191 self.fs.mkdir(current_revision_path)
1192 self.fs.reverse_sync(from_path=current_revision_path)
1193
1194 shutil.rmtree(self.fs.path + _id)
1195 kwargs = {}
1196 kwargs["package"] = filename
1197 if headers["Method"] == "POST":
1198 current_desc["state"] = "IN_CREATION"
1199 elif headers["Method"] in ("PUT", "PATCH"):
1200 current_desc["current_operation"] = None
1201 self.format_on_operation(
1202 current_desc,
1203 "update",
1204 kwargs,
1205 )
1206 current_desc["operatingState"] = "PROCESSING"
1207 current_desc["resourceState"] = "IN_PROGRESS"
1208
1209 self.db.replace(self.topic, _id, current_desc)
1210
1211 # Store a copy of the package as a point in time revision
1212 revision_desc = dict(current_desc)
1213 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1214 self.db.create(self.topic + "_revisions", revision_desc)
1215 fs_rollback = []
1216
1217 op_id = current_desc["current_operation"]
1218 if headers["Method"] == "POST":
1219 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1220 elif headers["Method"] == "PUT" or "PATCH":
1221 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1222
1223 return True
1224
1225 except EngineException:
1226 raise
1227 finally:
1228 if file_pkg:
1229 file_pkg.close()
1230 for file in fs_rollback:
1231 self.fs.file_delete(file, ignore_non_exist=True)