Coverage for osm_nbi/k8s_topics.py: 16%
745 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-12 20:04 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-12 20:04 +0000
1# -*- coding: utf-8 -*-
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
16import logging
17import yaml
18import shutil
19import os
20from http import HTTPStatus
22from time import time
23from osm_nbi.base_topic import BaseTopic, EngineException
24from osm_nbi.acm_topic import ACMTopic, ACMOperationTopic, ProfileTopic
26from osm_nbi.descriptor_topics import DescriptorTopic
27from osm_nbi.validation import (
28 ValidationError,
29 validate_input,
30 clustercreation_new_schema,
31 cluster_edit_schema,
32 cluster_update_schema,
33 infra_controller_profile_create_new_schema,
34 infra_config_profile_create_new_schema,
35 app_profile_create_new_schema,
36 resource_profile_create_new_schema,
37 infra_controller_profile_create_edit_schema,
38 infra_config_profile_create_edit_schema,
39 app_profile_create_edit_schema,
40 resource_profile_create_edit_schema,
41 clusterregistration_new_schema,
42 attach_dettach_profile_schema,
43 ksu_schema,
44 oka_schema,
45)
46from osm_common.dbbase import deep_update_rfc7396, DbException
47from osm_common.msgbase import MsgException
48from osm_common.fsbase import FsException
50__author__ = (
51 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
52 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
53)
56class InfraContTopic(ProfileTopic):
57 topic = "k8sinfra_controller"
58 topic_msg = "k8s_infra_controller"
59 schema_new = infra_controller_profile_create_new_schema
60 schema_edit = infra_controller_profile_create_edit_schema
62 def __init__(self, db, fs, msg, auth):
63 BaseTopic.__init__(self, db, fs, msg, auth)
65 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
66 # To create the new infra controller profile
67 return self.new_profile(rollback, session, indata, kwargs, headers)
69 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
70 # To create the default infra controller profile while creating the cluster
71 return self.default_profile(rollback, session, indata, kwargs, headers)
73 def delete(self, session, _id, dry_run=False, not_send_msg=None):
74 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
75 return _id
78class InfraConfTopic(ProfileTopic):
79 topic = "k8sinfra_config"
80 topic_msg = "k8s_infra_config"
81 schema_new = infra_config_profile_create_new_schema
82 schema_edit = infra_config_profile_create_edit_schema
84 def __init__(self, db, fs, msg, auth):
85 BaseTopic.__init__(self, db, fs, msg, auth)
87 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
88 # To create the new infra config profile
89 return self.new_profile(rollback, session, indata, kwargs, headers)
91 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
92 # To create the default infra config profile while creating the cluster
93 return self.default_profile(rollback, session, indata, kwargs, headers)
95 def delete(self, session, _id, dry_run=False, not_send_msg=None):
96 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
97 return _id
100class AppTopic(ProfileTopic):
101 topic = "k8sapp"
102 topic_msg = "k8s_app"
103 schema_new = app_profile_create_new_schema
104 schema_edit = app_profile_create_edit_schema
106 def __init__(self, db, fs, msg, auth):
107 BaseTopic.__init__(self, db, fs, msg, auth)
109 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
110 # To create the new app profile
111 return self.new_profile(rollback, session, indata, kwargs, headers)
113 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
114 # To create the default app profile while creating the cluster
115 return self.default_profile(rollback, session, indata, kwargs, headers)
117 def delete(self, session, _id, dry_run=False, not_send_msg=None):
118 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
119 return _id
122class ResourceTopic(ProfileTopic):
123 topic = "k8sresource"
124 topic_msg = "k8s_resource"
125 schema_new = resource_profile_create_new_schema
126 schema_edit = resource_profile_create_edit_schema
128 def __init__(self, db, fs, msg, auth):
129 BaseTopic.__init__(self, db, fs, msg, auth)
131 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
132 # To create the new resource profile
133 return self.new_profile(rollback, session, indata, kwargs, headers)
135 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
136 # To create the default resource profile while creating the cluster
137 return self.default_profile(rollback, session, indata, kwargs, headers)
139 def delete(self, session, _id, dry_run=False, not_send_msg=None):
140 self.delete_profile(session, _id, dry_run=False, not_send_msg=None)
141 return _id
144class ClusterTopic(ACMTopic):
145 topic = "clusters"
146 topic_msg = "cluster"
147 schema_new = clustercreation_new_schema
148 schema_edit = attach_dettach_profile_schema
150 def __init__(self, db, fs, msg, auth):
151 super().__init__(db, fs, msg, auth)
152 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
153 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
154 self.resource_topic = ResourceTopic(db, fs, msg, auth)
155 self.app_topic = AppTopic(db, fs, msg, auth)
157 @staticmethod
158 def format_on_new(content, project_id=None, make_public=False):
159 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
160 content["current_operation"] = None
162 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
163 """
164 Creates a new k8scluster into database.
165 :param rollback: list to append the created items at database in case a rollback must be done
166 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
167 :param indata: params to be used for the k8cluster
168 :param kwargs: used to override the indata
169 :param headers: http request headers
170 :return: the _id of k8scluster created at database. Or an exception of type
171 EngineException, ValidationError, DbException, FsException, MsgException.
172 Note: Exceptions are not captured on purpose. They should be captured at called
173 """
174 step = "checking quotas" # first step must be defined outside try
175 try:
176 self.check_quota(session)
177 step = "name unique check"
178 # self.check_unique_name(session, indata["name"])
179 self.cluster_unique_name_check(session, indata["name"])
180 step = "validating input parameters"
181 cls_request = self._remove_envelop(indata)
182 self._update_input_with_kwargs(cls_request, kwargs)
183 cls_request = self._validate_input_new(cls_request, session["force"])
184 operation_params = cls_request
186 step = "filling cluster details from input data"
187 cls_create = self._create_cluster(
188 cls_request, rollback, session, indata, kwargs, headers
189 )
191 step = "creating cluster at database"
192 self.format_on_new(
193 cls_create, session["project_id"], make_public=session["public"]
194 )
195 op_id = self.format_on_operation(
196 cls_create,
197 "create",
198 operation_params,
199 )
200 _id = self.db.create(self.topic, cls_create)
201 pubkey, privkey = self._generate_age_key()
202 cls_create["age_pubkey"] = self.db.encrypt(
203 pubkey, schema_version="1.11", salt=_id
204 )
205 cls_create["age_privkey"] = self.db.encrypt(
206 privkey, schema_version="1.11", salt=_id
207 )
208 # TODO: set age_pubkey and age_privkey in the default profiles
209 rollback.append({"topic": self.topic, "_id": _id})
210 self.db.set_one("clusters", {"_id": _id}, cls_create)
211 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
213 # To add the content in old collection "k8sclusters"
214 self.add_to_old_collection(cls_create, session)
216 return _id, None
217 except (
218 ValidationError,
219 EngineException,
220 DbException,
221 MsgException,
222 FsException,
223 ) as e:
224 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
226 def _create_cluster(self, cls_request, rollback, session, indata, kwargs, headers):
227 # Check whether the region name and resource group have been given
228 region_given = "region_name" in indata
229 resource_group_given = "resource_group" in indata
231 # Get the vim_account details
232 vim_account_details = self.db.get_one(
233 "vim_accounts", {"name": cls_request["vim_account"]}
234 )
236 # Check whether the region name and resource group have been given
237 if not region_given and not resource_group_given:
238 region_name = vim_account_details["config"]["region_name"]
239 resource_group = vim_account_details["config"]["resource_group"]
240 elif region_given and not resource_group_given:
241 region_name = cls_request["region_name"]
242 resource_group = vim_account_details["config"]["resource_group"]
243 elif not region_given and resource_group_given:
244 region_name = vim_account_details["config"]["region_name"]
245 resource_group = cls_request["resource_group"]
246 else:
247 region_name = cls_request["region_name"]
248 resource_group = cls_request["resource_group"]
250 cls_desc = {
251 "name": cls_request["name"],
252 "vim_account": self.check_vim(session, cls_request["vim_account"]),
253 "k8s_version": cls_request["k8s_version"],
254 "node_size": cls_request["node_size"],
255 "node_count": cls_request["node_count"],
256 "bootstrap": cls_request["bootstrap"],
257 "region_name": region_name,
258 "resource_group": resource_group,
259 "infra_controller_profiles": [
260 self._create_default_profiles(
261 rollback, session, indata, kwargs, headers, self.infra_contr_topic
262 )
263 ],
264 "infra_config_profiles": [
265 self._create_default_profiles(
266 rollback, session, indata, kwargs, headers, self.infra_conf_topic
267 )
268 ],
269 "resource_profiles": [
270 self._create_default_profiles(
271 rollback, session, indata, kwargs, headers, self.resource_topic
272 )
273 ],
274 "app_profiles": [
275 self._create_default_profiles(
276 rollback, session, indata, kwargs, headers, self.app_topic
277 )
278 ],
279 "created": "true",
280 "state": "IN_CREATION",
281 "operatingState": "PROCESSING",
282 "git_name": self.create_gitname(cls_request, session),
283 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
284 }
285 # Add optional fields if they exist in the request
286 if "description" in cls_request:
287 cls_desc["description"] = cls_request["description"]
288 return cls_desc
290 def check_vim(self, session, name):
291 try:
292 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
293 if vim_account_details is not None:
294 return name
295 except ValidationError as e:
296 raise EngineException(
297 e,
298 HTTPStatus.UNPROCESSABLE_ENTITY,
299 )
301 def _create_default_profiles(
302 self, rollback, session, indata, kwargs, headers, topic
303 ):
304 topic = self.to_select_topic(topic)
305 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
306 return default_profiles
308 def to_select_topic(self, topic):
309 if topic == "infra_controller_profiles":
310 topic = self.infra_contr_topic
311 elif topic == "infra_config_profiles":
312 topic = self.infra_conf_topic
313 elif topic == "resource_profiles":
314 topic = self.resource_topic
315 elif topic == "app_profiles":
316 topic = self.app_topic
317 return topic
319 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
320 try:
321 filter_q = self._get_project_filter(session)
322 filter_q[self.id_field(self.topic, _id)] = _id
323 content = self.db.get_one(self.topic, filter_q)
324 existing_profiles = []
325 topic = None
326 topic = self.to_select_topic(profile)
327 for profile_id in content[profile]:
328 data = topic.show(session, profile_id, filter_q, api_req)
329 existing_profiles.append(data)
330 return existing_profiles
331 except ValidationError as e:
332 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
334 def state_check(self, profile_id, session, topic):
335 topic = self.to_select_topic(topic)
336 content = topic.show(session, profile_id, filter_q=None, api_req=False)
337 state = content["state"]
338 if state == "CREATED":
339 return
340 else:
341 raise EngineException(
342 f" {profile_id} is not in created state",
343 HTTPStatus.UNPROCESSABLE_ENTITY,
344 )
346 def edit(self, session, _id, item, indata=None, kwargs=None):
347 if item not in (
348 "infra_controller_profiles",
349 "infra_config_profiles",
350 "app_profiles",
351 "resource_profiles",
352 ):
353 self.schema_edit = cluster_edit_schema
354 super().edit(session, _id, indata=item, kwargs=kwargs, content=None)
355 else:
356 indata = self._remove_envelop(indata)
357 indata = self._validate_input_edit(
358 indata, content=None, force=session["force"]
359 )
360 if indata.get("add_profile"):
361 self.add_profile(session, _id, item, indata)
362 elif indata.get("remove_profile"):
363 self.remove_profile(session, _id, item, indata)
364 else:
365 error_msg = "Add / remove operation is only applicable"
366 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
368 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
369 check = self.db.get_one(self.topic, {"_id": _id})
370 if "name" in indata and check["name"] != indata["name"]:
371 self.check_unique_name(session, indata["name"])
372 _filter = {"name": indata["name"]}
373 topic_list = [
374 "k8sclusters",
375 "k8sinfra_controller",
376 "k8sinfra_config",
377 "k8sapp",
378 "k8sresource",
379 ]
380 # Check unique name for k8scluster and profiles
381 for topic in topic_list:
382 if self.db.get_one(
383 topic, _filter, fail_on_empty=False, fail_on_more=False
384 ):
385 raise EngineException(
386 "name '{}' already exists for {}".format(indata["name"], topic),
387 HTTPStatus.CONFLICT,
388 )
389 # Replace name in k8scluster and profiles
390 for topic in topic_list:
391 data = self.db.get_one(topic, {"name": check["name"]})
392 data["name"] = indata["name"]
393 self.db.replace(topic, data["_id"], data)
394 return True
396 def add_profile(self, session, _id, item, indata=None):
397 indata = self._remove_envelop(indata)
398 operation_params = indata
399 profile_id = indata["add_profile"][0]["id"]
400 # check state
401 self.state_check(profile_id, session, item)
402 filter_q = self._get_project_filter(session)
403 filter_q[self.id_field(self.topic, _id)] = _id
404 content = self.db.get_one(self.topic, filter_q)
405 profile_list = content[item]
407 if profile_id not in profile_list:
408 content["operatingState"] = "PROCESSING"
409 op_id = self.format_on_operation(
410 content,
411 "add",
412 operation_params,
413 )
414 self.db.set_one("clusters", {"_id": content["_id"]}, content)
415 self._send_msg(
416 "add",
417 {
418 "cluster_id": _id,
419 "profile_id": profile_id,
420 "profile_type": item,
421 "operation_id": op_id,
422 },
423 )
424 else:
425 raise EngineException(
426 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
427 )
429 def _get_default_profiles(self, session, topic):
430 topic = self.to_select_topic(topic)
431 existing_profiles = topic.list(session, filter_q=None, api_req=False)
432 default_profiles = [
433 profile["_id"]
434 for profile in existing_profiles
435 if profile.get("default", False)
436 ]
437 return default_profiles
439 def remove_profile(self, session, _id, item, indata):
440 indata = self._remove_envelop(indata)
441 operation_params = indata
442 profile_id = indata["remove_profile"][0]["id"]
443 filter_q = self._get_project_filter(session)
444 filter_q[self.id_field(self.topic, _id)] = _id
445 content = self.db.get_one(self.topic, filter_q)
446 profile_list = content[item]
448 default_profiles = self._get_default_profiles(session, item)
450 if profile_id in default_profiles:
451 raise EngineException(
452 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
453 )
454 if profile_id in profile_list:
455 op_id = self.format_on_operation(
456 content,
457 "remove",
458 operation_params,
459 )
460 self.db.set_one("clusters", {"_id": content["_id"]}, content)
461 self._send_msg(
462 "remove",
463 {
464 "cluster_id": _id,
465 "profile_id": profile_id,
466 "profile_type": item,
467 "operation_id": op_id,
468 },
469 )
470 else:
471 raise EngineException(
472 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
473 )
475 def get_cluster_creds(self, session, _id, item):
476 if not self.multiproject:
477 filter_db = {}
478 else:
479 filter_db = self._get_project_filter(session)
480 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
481 operation_params = None
482 data = self.db.get_one(self.topic, filter_db)
483 op_id = self.format_on_operation(data, item, operation_params)
484 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
485 self._send_msg("get_creds", {"cluster_id": _id, "operation_id": op_id})
486 return op_id
488 def get_cluster_creds_file(self, session, _id, item, op_id):
489 if not self.multiproject:
490 filter_db = {}
491 else:
492 filter_db = self._get_project_filter(session)
493 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
495 data = self.db.get_one(self.topic, filter_db)
496 creds_flag = None
497 for operations in data["operationHistory"]:
498 if operations["op_id"] == op_id:
499 creds_flag = operations["result"]
500 self.logger.info("Creds Flag: {}".format(creds_flag))
502 if creds_flag is True:
503 credentials = data["credentials"]
505 file_pkg = None
506 current_path = _id
508 self.fs.file_delete(current_path, ignore_non_exist=True)
509 self.fs.mkdir(current_path)
510 filename = "credentials.yaml"
511 file_path = (current_path, filename)
512 self.logger.info("File path: {}".format(file_path))
513 file_pkg = self.fs.file_open(file_path, "a+b")
515 credentials_yaml = yaml.safe_dump(
516 credentials, indent=4, default_flow_style=False
517 )
518 file_pkg.write(credentials_yaml.encode(encoding="utf-8"))
520 if file_pkg:
521 file_pkg.close()
522 file_pkg = None
523 self.fs.sync(from_path=current_path)
525 return (
526 self.fs.file_open((current_path, filename), "rb"),
527 "text/plain",
528 )
529 else:
530 raise EngineException(
531 "Not possible to get the credentials of the cluster",
532 HTTPStatus.UNPROCESSABLE_ENTITY,
533 )
535 def update_cluster(self, session, _id, item, indata):
536 if not self.multiproject:
537 filter_db = {}
538 else:
539 filter_db = self._get_project_filter(session)
540 # To allow project&user addressing by name AS WELL AS _id
541 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
542 validate_input(indata, cluster_update_schema)
543 data = self.db.get_one(self.topic, filter_db)
544 operation_params = {}
545 data["operatingState"] = "PROCESSING"
546 data["resourceState"] = "IN_PROGRESS"
547 operation_params = indata
548 op_id = self.format_on_operation(
549 data,
550 item,
551 operation_params,
552 )
553 self.db.set_one(self.topic, {"_id": _id}, data)
554 data = {"cluster_id": _id, "operation_id": op_id}
555 self._send_msg(item, data)
556 return op_id
558 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
559 op_id = self.common_delete(_id, db_content)
560 return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]}
562 def delete(self, session, _id, dry_run=False, not_send_msg=None):
563 filter_q = self._get_project_filter(session)
564 filter_q[self.id_field(self.topic, _id)] = _id
565 check = self.db.get_one(self.topic, filter_q)
566 if check["created"] == "false":
567 raise EngineException(
568 "Cannot delete registered cluster. Please deregister.",
569 HTTPStatus.UNPROCESSABLE_ENTITY,
570 )
571 super().delete(session, _id, dry_run=False, not_send_msg=None)
574class ClusterOpsTopic(ACMTopic):
575 topic = "clusters"
576 topic_msg = "cluster"
577 schema_new = clusterregistration_new_schema
579 def __init__(self, db, fs, msg, auth):
580 super().__init__(db, fs, msg, auth)
582 @staticmethod
583 def format_on_new(content, project_id=None, make_public=False):
584 ACMTopic.format_on_new(content, project_id=project_id, make_public=make_public)
585 content["current_operation"] = None
587 def add(self, rollback, session, indata, kwargs=None, headers=None):
588 step = "checking quotas"
589 try:
590 self.check_quota(session)
591 step = "name unique check"
592 self.cluster_unique_name_check(session, indata["name"])
593 # self.check_unique_name(session, indata["name"])
594 step = "validating input parameters"
595 cls_add_request = self._remove_envelop(indata)
596 self._update_input_with_kwargs(cls_add_request, kwargs)
597 cls_add_request = self._validate_input_new(
598 cls_add_request, session["force"]
599 )
600 operation_params = cls_add_request
602 step = "filling cluster details from input data"
603 cls_add_request = self._add_cluster(cls_add_request, session)
605 step = "registering the cluster at database"
606 self.format_on_new(
607 cls_add_request, session["project_id"], make_public=session["public"]
608 )
609 op_id = self.format_on_operation(
610 cls_add_request,
611 "register",
612 operation_params,
613 )
614 _id = self.db.create(self.topic, cls_add_request)
615 pubkey, privkey = self._generate_age_key()
616 cls_add_request["age_pubkey"] = self.db.encrypt(
617 pubkey, schema_version="1.11", salt=_id
618 )
619 cls_add_request["age_privkey"] = self.db.encrypt(
620 privkey, schema_version="1.11", salt=_id
621 )
622 # TODO: set age_pubkey and age_privkey in the default profiles
623 self.db.set_one(self.topic, {"_id": _id}, cls_add_request)
624 rollback.append({"topic": self.topic, "_id": _id})
625 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
627 # To add the content in old collection "k8sclusters"
628 self.add_to_old_collection(cls_add_request, session)
630 return _id, None
631 except (
632 ValidationError,
633 EngineException,
634 DbException,
635 MsgException,
636 FsException,
637 ) as e:
638 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
640 def _add_cluster(self, cls_add_request, session):
641 cls_add = {
642 "name": cls_add_request["name"],
643 "credentials": cls_add_request["credentials"],
644 "vim_account": cls_add_request["vim_account"],
645 "bootstrap": cls_add_request["bootstrap"],
646 "created": "false",
647 "state": "IN_CREATION",
648 "operatingState": "PROCESSING",
649 "git_name": self.create_gitname(cls_add_request, session),
650 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
651 }
652 # Add optional fields if they exist in the request
653 if "description" in cls_add_request:
654 cls_add["description"] = cls_add_request["description"]
655 return cls_add
657 def remove(self, session, _id, dry_run=False, not_send_msg=None):
658 """
659 Delete item by its internal _id
660 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
661 :param _id: server internal id
662 :param dry_run: make checking but do not delete
663 :param not_send_msg: To not send message (False) or store content (list) instead
664 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
665 """
667 # To allow addressing projects and users by name AS WELL AS by _id
668 if not self.multiproject:
669 filter_q = {}
670 else:
671 filter_q = self._get_project_filter(session)
672 filter_q[self.id_field(self.topic, _id)] = _id
673 item_content = self.db.get_one(self.topic, filter_q)
675 op_id = self.format_on_operation(
676 item_content,
677 "deregister",
678 None,
679 )
680 self.db.set_one(self.topic, {"_id": _id}, item_content)
682 self.check_conflict_on_del(session, _id, item_content)
683 if dry_run:
684 return None
686 if self.multiproject and session["project_id"]:
687 # remove reference from project_read if there are more projects referencing it. If it last one,
688 # do not remove reference, but delete
689 other_projects_referencing = next(
690 (
691 p
692 for p in item_content["_admin"]["projects_read"]
693 if p not in session["project_id"] and p != "ANY"
694 ),
695 None,
696 )
698 # check if there are projects referencing it (apart from ANY, that means, public)....
699 if other_projects_referencing:
700 # remove references but not delete
701 update_dict_pull = {
702 "_admin.projects_read": session["project_id"],
703 "_admin.projects_write": session["project_id"],
704 }
705 self.db.set_one(
706 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
707 )
708 return None
709 else:
710 can_write = next(
711 (
712 p
713 for p in item_content["_admin"]["projects_write"]
714 if p == "ANY" or p in session["project_id"]
715 ),
716 None,
717 )
718 if not can_write:
719 raise EngineException(
720 "You have not write permission to delete it",
721 http_code=HTTPStatus.UNAUTHORIZED,
722 )
724 # delete
725 self._send_msg(
726 "deregister",
727 {"cluster_id": _id, "operation_id": op_id},
728 not_send_msg=not_send_msg,
729 )
730 return None
733class KsusTopic(ACMTopic):
734 topic = "ksus"
735 okapkg_topic = "okas"
736 infra_topic = "k8sinfra"
737 topic_msg = "ksu"
738 schema_new = ksu_schema
739 schema_edit = ksu_schema
741 def __init__(self, db, fs, msg, auth):
742 super().__init__(db, fs, msg, auth)
743 self.logger = logging.getLogger("nbi.ksus")
745 @staticmethod
746 def format_on_new(content, project_id=None, make_public=False):
747 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
748 content["current_operation"] = None
749 content["state"] = "IN_CREATION"
750 content["operatingState"] = "PROCESSING"
751 content["resourceState"] = "IN_PROGRESS"
753 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
754 _id_list = []
755 for ksus in indata["ksus"]:
756 content = ksus
757 oka = content["oka"][0]
758 oka_flag = ""
759 if oka["_id"]:
760 oka_flag = "_id"
761 oka["sw_catalog_path"] = ""
762 elif oka["sw_catalog_path"]:
763 oka_flag = "sw_catalog_path"
765 for okas in content["oka"]:
766 if okas["_id"] and okas["sw_catalog_path"]:
767 raise EngineException(
768 "Cannot create ksu with both OKA and SW catalog path",
769 HTTPStatus.UNPROCESSABLE_ENTITY,
770 )
771 if not okas["sw_catalog_path"]:
772 okas.pop("sw_catalog_path")
773 elif not okas["_id"]:
774 okas.pop("_id")
775 if oka_flag not in okas.keys():
776 raise EngineException(
777 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
778 HTTPStatus.UNPROCESSABLE_ENTITY,
779 )
781 # Override descriptor with query string kwargs
782 content = self._remove_envelop(content)
783 self._update_input_with_kwargs(content, kwargs)
784 content = self._validate_input_new(input=content, force=session["force"])
786 # Check for unique name
787 self.check_unique_name(session, content["name"])
789 self.check_conflict_on_new(session, content)
791 operation_params = {}
792 for content_key, content_value in content.items():
793 operation_params[content_key] = content_value
794 self.format_on_new(
795 content, project_id=session["project_id"], make_public=session["public"]
796 )
797 op_id = self.format_on_operation(
798 content,
799 operation_type="create",
800 operation_params=operation_params,
801 )
802 content["git_name"] = self.create_gitname(content, session)
804 # Update Oka_package usage state
805 for okas in content["oka"]:
806 if "_id" in okas.keys():
807 self.update_usage_state(session, okas)
809 _id = self.db.create(self.topic, content)
810 rollback.append({"topic": self.topic, "_id": _id})
811 _id_list.append(_id)
812 data = {"ksus_list": _id_list, "operation_id": op_id}
813 self._send_msg("create", data)
814 return _id_list, op_id
816 def clone(self, rollback, session, _id, indata, kwargs, headers):
817 filter_db = self._get_project_filter(session)
818 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
819 data = self.db.get_one(self.topic, filter_db)
821 op_id = self.format_on_operation(
822 data,
823 "clone",
824 indata,
825 )
826 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
827 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
828 return op_id
830 def update_usage_state(self, session, oka_content):
831 _id = oka_content["_id"]
832 filter_db = self._get_project_filter(session)
833 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
835 data = self.db.get_one(self.okapkg_topic, filter_db)
836 if data["_admin"]["usageState"] == "NOT_IN_USE":
837 usage_state_update = {
838 "_admin.usageState": "IN_USE",
839 }
840 self.db.set_one(
841 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
842 )
844 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
845 indata = self._remove_envelop(indata)
847 # Override descriptor with query string kwargs
848 if kwargs:
849 self._update_input_with_kwargs(indata, kwargs)
850 try:
851 if indata and session.get("set_project"):
852 raise EngineException(
853 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
854 HTTPStatus.UNPROCESSABLE_ENTITY,
855 )
856 # TODO self._check_edition(session, indata, _id, force)
857 if not content:
858 content = self.show(session, _id)
859 indata = self._validate_input_edit(
860 input=indata, content=content, force=session["force"]
861 )
862 operation_params = indata
863 deep_update_rfc7396(content, indata)
865 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
866 _id = content.get("_id") or _id
867 op_id = self.format_on_operation(
868 content,
869 "move",
870 operation_params,
871 )
872 if content.get("_admin"):
873 now = time()
874 content["_admin"]["modified"] = now
875 content["operatingState"] = "PROCESSING"
876 content["resourceState"] = "IN_PROGRESS"
878 self.db.replace(self.topic, _id, content)
879 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
880 self._send_msg("move", data)
881 return op_id
882 except ValidationError as e:
883 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
885 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
886 if final_content["name"] != edit_content["name"]:
887 self.check_unique_name(session, edit_content["name"])
888 return final_content
890 @staticmethod
891 def format_on_edit(final_content, edit_content):
892 op_id = ACMTopic.format_on_operation(
893 final_content,
894 "update",
895 edit_content,
896 )
897 final_content["operatingState"] = "PROCESSING"
898 final_content["resourceState"] = "IN_PROGRESS"
899 if final_content.get("_admin"):
900 now = time()
901 final_content["_admin"]["modified"] = now
902 return op_id
904 def edit(self, session, _id, indata, kwargs):
905 _id_list = []
906 if _id == "update":
907 for ksus in indata["ksus"]:
908 content = ksus
909 _id = content["_id"]
910 _id_list.append(_id)
911 content.pop("_id")
912 op_id = self.edit_ksu(session, _id, content, kwargs)
913 else:
914 content = indata
915 _id_list.append(_id)
916 op_id = self.edit_ksu(session, _id, content, kwargs)
918 data = {"ksus_list": _id_list, "operation_id": op_id}
919 self._send_msg("edit", data)
921 def edit_ksu(self, session, _id, indata, kwargs):
922 content = None
923 indata = self._remove_envelop(indata)
925 # Override descriptor with query string kwargs
926 if kwargs:
927 self._update_input_with_kwargs(indata, kwargs)
928 try:
929 if indata and session.get("set_project"):
930 raise EngineException(
931 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
932 HTTPStatus.UNPROCESSABLE_ENTITY,
933 )
934 # TODO self._check_edition(session, indata, _id, force)
935 if not content:
936 content = self.show(session, _id)
938 for okas in indata["oka"]:
939 if not okas["_id"]:
940 okas.pop("_id")
941 if not okas["sw_catalog_path"]:
942 okas.pop("sw_catalog_path")
944 indata = self._validate_input_edit(indata, content, force=session["force"])
946 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
947 _id = content.get("_id") or _id
949 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
950 op_id = self.format_on_edit(content, indata)
951 self.db.replace(self.topic, _id, content)
952 return op_id
953 except ValidationError as e:
954 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
956 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
957 _id_list = []
958 if _id == "delete":
959 for ksus in indata["ksus"]:
960 content = ksus
961 _id = content["_id"]
962 content.pop("_id")
963 op_id, not_send_msg_ksu = self.delete(session, _id)
964 if not not_send_msg_ksu:
965 _id_list.append(_id)
966 else:
967 op_id, not_send_msg_ksu = self.delete(session, _id)
968 if not not_send_msg_ksu:
969 _id_list.append(_id)
971 if _id_list:
972 data = {
973 "ksus_list": _id_list,
974 "operation_id": op_id,
975 "force": session["force"],
976 }
977 self._send_msg("delete", data, not_send_msg)
978 return op_id
980 def delete(self, session, _id):
981 if not self.multiproject:
982 filter_q = {}
983 else:
984 filter_q = self._get_project_filter(session)
985 filter_q[self.id_field(self.topic, _id)] = _id
986 item_content = self.db.get_one(self.topic, filter_q)
987 item_content["state"] = "IN_DELETION"
988 item_content["operatingState"] = "PROCESSING"
989 item_content["resourceState"] = "IN_PROGRESS"
990 op_id = self.format_on_operation(
991 item_content,
992 "delete",
993 None,
994 )
995 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
997 if item_content["oka"][0].get("_id"):
998 used_oka = {}
999 existing_oka = []
1000 for okas in item_content["oka"]:
1001 used_oka["_id"] = okas["_id"]
1003 filter = self._get_project_filter(session)
1004 data = self.db.get_list(self.topic, filter)
1006 if data:
1007 for ksus in data:
1008 if ksus["_id"] != _id:
1009 for okas in ksus["oka"]:
1010 self.logger.info("OKA: {}".format(okas))
1011 if okas.get("sw_catalog_path", ""):
1012 continue
1013 elif okas["_id"] not in existing_oka:
1014 existing_oka.append(okas["_id"])
1016 if used_oka:
1017 for oka, oka_id in used_oka.items():
1018 if oka_id not in existing_oka:
1019 self.db.set_one(
1020 self.okapkg_topic,
1021 {"_id": oka_id},
1022 {"_admin.usageState": "NOT_IN_USE"},
1023 )
1024 # Check if the profile exists. If it doesn't, no message should be sent to Kafka
1025 not_send_msg = None
1026 profile_id = item_content["profile"]["_id"]
1027 profile_type = item_content["profile"]["profile_type"]
1028 profile_collection_map = {
1029 "app_profiles": "k8sapp",
1030 "resource_profiles": "k8sresource",
1031 "infra_controller_profiles": "k8sinfra_controller",
1032 "infra_config_profiles": "k8sinfra_config",
1033 }
1034 profile_collection = profile_collection_map[profile_type]
1035 profile_content = self.db.get_one(
1036 profile_collection, {"_id": profile_id}, fail_on_empty=False
1037 )
1038 if not profile_content:
1039 self.db.del_one(self.topic, filter_q)
1040 not_send_msg = True
1041 return op_id, not_send_msg
1044class OkaTopic(DescriptorTopic, ACMOperationTopic):
1045 topic = "okas"
1046 topic_msg = "oka"
1047 schema_new = oka_schema
1048 schema_edit = oka_schema
1050 def __init__(self, db, fs, msg, auth):
1051 super().__init__(db, fs, msg, auth)
1052 self.logger = logging.getLogger("nbi.oka")
1054 @staticmethod
1055 def format_on_new(content, project_id=None, make_public=False):
1056 DescriptorTopic.format_on_new(
1057 content, project_id=project_id, make_public=make_public
1058 )
1059 content["current_operation"] = None
1060 content["state"] = "PENDING_CONTENT"
1061 content["operatingState"] = "PROCESSING"
1062 content["resourceState"] = "IN_PROGRESS"
1064 def check_conflict_on_del(self, session, _id, db_content):
1065 usage_state = db_content["_admin"]["usageState"]
1066 if usage_state == "IN_USE":
1067 raise EngineException(
1068 "There is a KSU using this package",
1069 http_code=HTTPStatus.CONFLICT,
1070 )
1072 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
1073 if "name" in edit_content:
1074 if final_content["name"] == edit_content["name"]:
1075 name = edit_content["name"]
1076 raise EngineException(
1077 f"No update, new name for the OKA is the same: {name}",
1078 http_code=HTTPStatus.CONFLICT,
1079 )
1080 else:
1081 self.check_unique_name(session, edit_content["name"])
1082 elif (
1083 "description" in edit_content
1084 and final_content["description"] == edit_content["description"]
1085 ):
1086 description = edit_content["description"]
1087 raise EngineException(
1088 f"No update, new description for the OKA is the same: {description}",
1089 http_code=HTTPStatus.CONFLICT,
1090 )
1091 return final_content
1093 def edit(self, session, _id, indata=None, kwargs=None, content=None):
1094 indata = self._remove_envelop(indata)
1096 # Override descriptor with query string kwargs
1097 if kwargs:
1098 self._update_input_with_kwargs(indata, kwargs)
1099 try:
1100 if indata and session.get("set_project"):
1101 raise EngineException(
1102 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
1103 HTTPStatus.UNPROCESSABLE_ENTITY,
1104 )
1105 # TODO self._check_edition(session, indata, _id, force)
1106 if not content:
1107 content = self.show(session, _id)
1109 indata = self._validate_input_edit(indata, content, force=session["force"])
1111 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
1112 _id = content.get("_id") or _id
1114 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
1115 op_id = self.format_on_edit(content, indata)
1116 deep_update_rfc7396(content, indata)
1118 self.db.replace(self.topic, _id, content)
1119 return op_id
1120 except ValidationError as e:
1121 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1123 def delete(self, session, _id, dry_run=False, not_send_msg=None):
1124 if not self.multiproject:
1125 filter_q = {}
1126 else:
1127 filter_q = self._get_project_filter(session)
1128 filter_q[self.id_field(self.topic, _id)] = _id
1129 item_content = self.db.get_one(self.topic, filter_q)
1130 item_content["state"] = "IN_DELETION"
1131 item_content["operatingState"] = "PROCESSING"
1132 self.check_conflict_on_del(session, _id, item_content)
1133 op_id = self.format_on_operation(
1134 item_content,
1135 "delete",
1136 None,
1137 )
1138 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1139 self._send_msg(
1140 "delete",
1141 {"oka_id": _id, "operation_id": op_id, "force": session["force"]},
1142 not_send_msg=not_send_msg,
1143 )
1144 return op_id
1146 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1147 # _remove_envelop
1148 if indata:
1149 if "userDefinedData" in indata:
1150 indata = indata["userDefinedData"]
1152 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1154 self._update_input_with_kwargs(content, kwargs)
1155 content = BaseTopic._validate_input_new(
1156 self, input=kwargs, force=session["force"]
1157 )
1159 self.check_unique_name(session, content["name"])
1160 operation_params = {}
1161 for content_key, content_value in content.items():
1162 operation_params[content_key] = content_value
1163 self.format_on_new(
1164 content, session["project_id"], make_public=session["public"]
1165 )
1166 op_id = self.format_on_operation(
1167 content,
1168 operation_type="create",
1169 operation_params=operation_params,
1170 )
1171 content["git_name"] = self.create_gitname(content, session)
1172 _id = self.db.create(self.topic, content)
1173 rollback.append({"topic": self.topic, "_id": _id})
1174 return _id, op_id
1176 def upload_content(self, session, _id, indata, kwargs, headers):
1177 current_desc = self.show(session, _id)
1179 compressed = None
1180 content_type = headers.get("Content-Type")
1181 if (
1182 content_type
1183 and "application/gzip" in content_type
1184 or "application/x-gzip" in content_type
1185 ):
1186 compressed = "gzip"
1187 if content_type and "application/zip" in content_type:
1188 compressed = "zip"
1189 filename = headers.get("Content-Filename")
1190 if not filename and compressed:
1191 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1192 elif not filename:
1193 filename = "package"
1195 revision = 1
1196 if "revision" in current_desc["_admin"]:
1197 revision = current_desc["_admin"]["revision"] + 1
1199 file_pkg = None
1200 fs_rollback = []
1202 try:
1203 start = 0
1204 # Rather than using a temp folder, we will store the package in a folder based on
1205 # the current revision.
1206 proposed_revision_path = _id + ":" + str(revision)
1207 # all the content is upload here and if ok, it is rename from id_ to is folder
1209 if start:
1210 if not self.fs.file_exists(proposed_revision_path, "dir"):
1211 raise EngineException(
1212 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1213 )
1214 else:
1215 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1216 self.fs.mkdir(proposed_revision_path)
1217 fs_rollback.append(proposed_revision_path)
1219 storage = self.fs.get_params()
1220 storage["folder"] = proposed_revision_path
1221 storage["zipfile"] = filename
1223 file_path = (proposed_revision_path, filename)
1224 file_pkg = self.fs.file_open(file_path, "a+b")
1226 if isinstance(indata, dict):
1227 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1228 file_pkg.write(indata_text.encode(encoding="utf-8"))
1229 else:
1230 indata_len = 0
1231 indata = indata.file
1232 while True:
1233 indata_text = indata.read(4096)
1234 indata_len += len(indata_text)
1235 if not indata_text:
1236 break
1237 file_pkg.write(indata_text)
1239 # Need to close the file package here so it can be copied from the
1240 # revision to the current, unrevisioned record
1241 if file_pkg:
1242 file_pkg.close()
1243 file_pkg = None
1245 # Fetch both the incoming, proposed revision and the original revision so we
1246 # can call a validate method to compare them
1247 current_revision_path = _id + "/"
1248 self.fs.sync(from_path=current_revision_path)
1249 self.fs.sync(from_path=proposed_revision_path)
1251 # Is this required?
1252 if revision > 1:
1253 try:
1254 self._validate_descriptor_changes(
1255 _id,
1256 filename,
1257 current_revision_path,
1258 proposed_revision_path,
1259 )
1260 except Exception as e:
1261 shutil.rmtree(
1262 self.fs.path + current_revision_path, ignore_errors=True
1263 )
1264 shutil.rmtree(
1265 self.fs.path + proposed_revision_path, ignore_errors=True
1266 )
1267 # Only delete the new revision. We need to keep the original version in place
1268 # as it has not been changed.
1269 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1270 raise e
1272 indata = self._remove_envelop(indata)
1274 # Override descriptor with query string kwargs
1275 if kwargs:
1276 self._update_input_with_kwargs(indata, kwargs)
1278 current_desc["_admin"]["storage"] = storage
1279 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1280 current_desc["_admin"]["operationalState"] = "ENABLED"
1281 current_desc["_admin"]["modified"] = time()
1282 current_desc["_admin"]["revision"] = revision
1284 deep_update_rfc7396(current_desc, indata)
1286 # Copy the revision to the active package name by its original id
1287 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1288 os.rename(
1289 self.fs.path + proposed_revision_path,
1290 self.fs.path + current_revision_path,
1291 )
1292 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1293 self.fs.mkdir(current_revision_path)
1294 self.fs.reverse_sync(from_path=current_revision_path)
1296 shutil.rmtree(self.fs.path + _id)
1297 kwargs = {}
1298 kwargs["package"] = filename
1299 if headers["Method"] == "POST":
1300 current_desc["state"] = "IN_CREATION"
1301 op_id = current_desc.get("operationHistory", [{"op_id": None}])[-1].get(
1302 "op_id"
1303 )
1304 elif headers["Method"] in ("PUT", "PATCH"):
1305 op_id = self.format_on_operation(
1306 current_desc,
1307 "update",
1308 kwargs,
1309 )
1310 current_desc["operatingState"] = "PROCESSING"
1311 current_desc["resourceState"] = "IN_PROGRESS"
1313 self.db.replace(self.topic, _id, current_desc)
1315 # Store a copy of the package as a point in time revision
1316 revision_desc = dict(current_desc)
1317 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1318 self.db.create(self.topic + "_revisions", revision_desc)
1319 fs_rollback = []
1321 if headers["Method"] == "POST":
1322 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1323 elif headers["Method"] == "PUT" or "PATCH":
1324 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1326 return True
1328 except EngineException:
1329 raise
1330 finally:
1331 if file_pkg:
1332 file_pkg.close()
1333 for file in fs_rollback:
1334 self.fs.file_delete(file, ignore_non_exist=True)