blob: 2ce01e085adf2bb4c3481e3167572a22014bffe6 [file] [log] [blame]
rshri2d386cb2024-07-05 14:35:51 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
yshah53cc9eb2024-07-05 13:06:31 +000016import logging
17import yaml
18import tarfile
19import shutil
20import os
rshri2d386cb2024-07-05 14:35:51 +000021from http import HTTPStatus
yshah53cc9eb2024-07-05 13:06:31 +000022from uuid import uuid4
rshri2d386cb2024-07-05 14:35:51 +000023
yshah53cc9eb2024-07-05 13:06:31 +000024from time import time
rshri2d386cb2024-07-05 14:35:51 +000025from osm_nbi.base_topic import BaseTopic, EngineException
26
yshah53cc9eb2024-07-05 13:06:31 +000027from osm_nbi.descriptor_topics import DescriptorTopic
rshri2d386cb2024-07-05 14:35:51 +000028from osm_nbi.validation import (
29 ValidationError,
30 clustercreation_new_schema,
31 infra_controller_profile_create_new_schema,
32 infra_config_profile_create_new_schema,
33 app_profile_create_new_schema,
34 resource_profile_create_new_schema,
35 infra_controller_profile_create_edit_schema,
36 infra_config_profile_create_edit_schema,
37 app_profile_create_edit_schema,
38 resource_profile_create_edit_schema,
39 k8scluster_new_schema,
40 attach_dettach_profile_schema,
yshah53cc9eb2024-07-05 13:06:31 +000041 ksu_schema,
42 oka_schema,
rshri2d386cb2024-07-05 14:35:51 +000043)
yshah53cc9eb2024-07-05 13:06:31 +000044from osm_common.dbbase import deep_update_rfc7396, DbException
rshri2d386cb2024-07-05 14:35:51 +000045from osm_common.msgbase import MsgException
46from osm_common.fsbase import FsException
47
yshah53cc9eb2024-07-05 13:06:31 +000048__author__ = (
49 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
50 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
51)
rshri2d386cb2024-07-05 14:35:51 +000052
53
54class InfraContTopic(BaseTopic):
55 topic = "k8sinfra_controller"
56 topic_msg = "k8s_infra_controller"
57 schema_new = infra_controller_profile_create_new_schema
58 schema_edit = infra_controller_profile_create_edit_schema
59
60 def __init__(self, db, fs, msg, auth):
61 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000062
63 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
64 # To create the new infra controller profile
65 return self.new_profile(rollback, session, indata, kwargs, headers)
66
67 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
68 # To create the default infra controller profile while creating the cluster
69 return self.default_profile(rollback, session, indata, kwargs, headers)
70
71 def delete(self, session, _id, dry_run=False, not_send_msg=None):
72 item_content = self.db.get_one(self.topic, {"_id": _id})
73 if item_content.get("default", False):
74 raise EngineException(
75 "Cannot delete item because it is marked as default",
76 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
77 )
78 # Before deleting, detach the profile from the associated clusters.
79 self.detach(session, _id, profile_type="infra_controller_profiles")
80 # To delete the infra controller profile
81 super().delete(session, _id, not_send_msg=not_send_msg)
82 return
83
84
85class InfraConfTopic(BaseTopic):
86 topic = "k8sinfra_config"
87 topic_msg = "k8s_infra_config"
88 schema_new = infra_config_profile_create_new_schema
89 schema_edit = infra_config_profile_create_edit_schema
90
91 def __init__(self, db, fs, msg, auth):
92 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +000093
94 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
95 # To create the new infra config profile
96 return self.new_profile(rollback, session, indata, kwargs, headers)
97
98 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
99 # To create the default infra config profile while creating the cluster
100 return self.default_profile(rollback, session, indata, kwargs, headers)
101
102 def delete(self, session, _id, dry_run=False, not_send_msg=None):
103 item_content = self.db.get_one(self.topic, {"_id": _id})
104 if item_content.get("default", False):
105 raise EngineException(
106 "Cannot delete item because it is marked as default",
107 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
108 )
109 # Before deleting, detach the profile from the associated clusters.
110 self.detach(session, _id, profile_type="infra_config_profiles")
111 # To delete the infra config profile
112 super().delete(session, _id, not_send_msg=not_send_msg)
113 return
114
115
116class AppTopic(BaseTopic):
117 topic = "k8sapp"
118 topic_msg = "k8s_app"
119 schema_new = app_profile_create_new_schema
120 schema_edit = app_profile_create_edit_schema
121
122 def __init__(self, db, fs, msg, auth):
123 BaseTopic.__init__(self, db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000124
125 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
126 # To create the new app profile
127 return self.new_profile(rollback, session, indata, kwargs, headers)
128
129 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
130 # To create the default app profile while creating the cluster
131 return self.default_profile(rollback, session, indata, kwargs, headers)
132
133 def delete(self, session, _id, dry_run=False, not_send_msg=None):
134 item_content = self.db.get_one(self.topic, {"_id": _id})
135 if item_content.get("default", False):
136 raise EngineException(
137 "Cannot delete item because it is marked as default",
138 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
139 )
140 # Before deleting, detach the profile from the associated clusters.
141 self.detach(session, _id, profile_type="app_profiles")
142 # To delete the app profile
143 super().delete(session, _id, not_send_msg=not_send_msg)
144 return
145
146
147class ResourceTopic(BaseTopic):
148 topic = "k8sresource"
149 topic_msg = "k8s_resource"
150 schema_new = resource_profile_create_new_schema
151 schema_edit = resource_profile_create_edit_schema
152
153 def __init__(self, db, fs, msg, auth):
154 BaseTopic.__init__(self, db, fs, msg, auth)
155 # self.logger = logging.getLogger("nbi.k8s_topics")
156
157 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
158 # To create the new resource profile
159 return self.new_profile(rollback, session, indata, kwargs, headers)
160
161 def default(self, rollback, session, indata=None, kwargs=None, headers=None):
162 # To create the default resource profile while creating the cluster
163 return self.default_profile(rollback, session, indata, kwargs, headers)
164
165 def delete(self, session, _id, dry_run=False, not_send_msg=None):
166 item_content = self.db.get_one(self.topic, {"_id": _id})
167 if item_content.get("default", False):
168 raise EngineException(
169 "Cannot delete item because it is marked as default",
170 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
171 )
172 # Before deleting, detach the profile from the associated clusters.
173 self.detach(session, _id, profile_type="resource_profiles")
174 # To delete the resource profile
175 super().delete(session, _id, not_send_msg=not_send_msg)
176 return
177
178
179class K8sTopic(BaseTopic):
180 topic = "clusters"
181 topic_msg = "cluster"
182 schema_new = clustercreation_new_schema
183 schema_edit = attach_dettach_profile_schema
184
185 def __init__(self, db, fs, msg, auth):
186 BaseTopic.__init__(self, db, fs, msg, auth)
187 self.infra_contr_topic = InfraContTopic(db, fs, msg, auth)
188 self.infra_conf_topic = InfraConfTopic(db, fs, msg, auth)
189 self.resource_topic = ResourceTopic(db, fs, msg, auth)
190 self.app_topic = AppTopic(db, fs, msg, auth)
rshri2d386cb2024-07-05 14:35:51 +0000191
192 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
193 """
194 Creates a new k8scluster into database.
195 :param rollback: list to append the created items at database in case a rollback must be done
196 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
197 :param indata: params to be used for the k8cluster
198 :param kwargs: used to override the indata
199 :param headers: http request headers
200 :return: the _id of k8scluster created at database. Or an exception of type
201 EngineException, ValidationError, DbException, FsException, MsgException.
202 Note: Exceptions are not captured on purpose. They should be captured at called
203 """
204 step = "checking quotas" # first step must be defined outside try
205 try:
206 self.check_quota(session)
207 step = "name unique check"
208 self.check_unique_name(session, indata["name"])
209 step = "validating input parameters"
210 cls_request = self._remove_envelop(indata)
211 self._update_input_with_kwargs(cls_request, kwargs)
212 cls_request = self._validate_input_new(cls_request, session["force"])
213 operation_params = cls_request
214
215 step = "filling cluster details from input data"
216 cls_create = self._create_cluster(
217 cls_request, rollback, session, indata, kwargs, headers
218 )
219
220 step = "creating cluster at database"
221 self.format_on_new(
222 cls_create, session["project_id"], make_public=session["public"]
223 )
224 cls_create["current_operation"] = None
225 op_id = self.format_on_operation(
226 cls_create,
227 "create",
228 operation_params,
229 )
230 _id = self.db.create(self.topic, cls_create)
231 rollback.append({"topic": self.topic, "_id": _id})
232 self.db.set_one("clusters", {"_id": _id}, cls_create)
233 self._send_msg("create", {"cluster_id": _id, "operation_id": op_id})
234
235 return _id, None
236 except (
237 ValidationError,
238 EngineException,
239 DbException,
240 MsgException,
241 FsException,
242 ) as e:
243 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
244
245 def _create_cluster(self, cls_request, rollback, session, indata, kwargs, headers):
246 # Check whether the region name and resource group have been given
247 v = "Not_present"
248 t = "Not_present"
249 for key, value in indata.items():
250 if key == "region_name":
251 v = "Present"
252 if key == "resource_group":
253 t = "Present"
254
255 # Get the vim_account details
256 vim_account_details = self.db.get_one(
257 "vim_accounts", {"name": cls_request["vim_account"]}
258 )
259
260 if v == "Not_present" and t == "Not_present":
261 region_name = vim_account_details["config"]["region_name"]
262 resource_group = vim_account_details["config"]["resource_group"]
263 elif v == "Present" and t == "Not_present":
264 region_name = cls_request["region_name"]
265 resource_group = vim_account_details["config"]["resource_group"]
266 elif v == "Not_present" and t == "Present":
267 region_name = vim_account_details["config"]["region_name"]
268 resource_group = cls_request["resource_group"]
269 else:
270 region_name = cls_request["region_name"]
271 resource_group = cls_request["resource_group"]
272
273 cls_desc = {
274 "name": cls_request["name"],
275 "vim_account": self.check_vim(session, cls_request["vim_account"]),
276 "k8s_version": cls_request["k8s_version"],
277 "node_size": cls_request["node_size"],
278 "node_count": cls_request["node_count"],
279 "description": cls_request["description"],
280 "region_name": region_name,
281 "resource_group": resource_group,
282 "infra_controller_profiles": [
283 self._create_default_profiles(
284 rollback, session, indata, kwargs, headers, self.infra_contr_topic
285 )
286 ],
287 "infra_config_profiles": [
288 self._create_default_profiles(
289 rollback, session, indata, kwargs, headers, self.infra_conf_topic
290 )
291 ],
292 "resource_profiles": [
293 self._create_default_profiles(
294 rollback, session, indata, kwargs, headers, self.resource_topic
295 )
296 ],
297 "app_profiles": [
298 self._create_default_profiles(
299 rollback, session, indata, kwargs, headers, self.app_topic
300 )
301 ],
302 "created": "true",
303 "state": "IN_CREATION",
304 "operatingState": "PROCESSING",
305 "git_name": self.create_gitname(cls_request, session),
306 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
307 }
308 return cls_desc
309
310 def check_vim(self, session, name):
311 try:
312 vim_account_details = self.db.get_one("vim_accounts", {"name": name})
313 if vim_account_details is not None:
314 return name
315 except ValidationError as e:
316 raise EngineException(
317 e,
318 HTTPStatus.UNPROCESSABLE_ENTITY,
319 )
320
321 def _create_default_profiles(
322 self, rollback, session, indata, kwargs, headers, topic
323 ):
324 topic = self.to_select_topic(topic)
325 default_profiles = topic.default(rollback, session, indata, kwargs, headers)
326 return default_profiles
327
328 def to_select_topic(self, topic):
329 if topic == "infra_controller_profiles":
330 topic = self.infra_contr_topic
331 elif topic == "infra_config_profiles":
332 topic = self.infra_conf_topic
333 elif topic == "resource_profiles":
334 topic = self.resource_topic
335 elif topic == "app_profiles":
336 topic = self.app_topic
337 return topic
338
339 def show_one(self, session, _id, profile, filter_q=None, api_req=False):
340 try:
341 filter_q = self._get_project_filter(session)
342 filter_q[self.id_field(self.topic, _id)] = _id
343 content = self.db.get_one(self.topic, filter_q)
344 existing_profiles = []
345 topic = None
346 topic = self.to_select_topic(profile)
347 for profile_id in content[profile]:
348 data = topic.show(session, profile_id, filter_q, api_req)
349 existing_profiles.append(data)
350 return existing_profiles
351 except ValidationError as e:
352 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
353
354 def state_check(self, profile_id, session, topic):
355 topic = self.to_select_topic(topic)
356 content = topic.show(session, profile_id, filter_q=None, api_req=False)
357 state = content["state"]
358 if state == "CREATED":
359 return
360 else:
361 raise EngineException(
362 f" {profile_id} is not in created state",
363 HTTPStatus.UNPROCESSABLE_ENTITY,
364 )
365
366 def edit(self, session, _id, item, indata=None, kwargs=None):
367 indata = self._remove_envelop(indata)
368 indata = self._validate_input_edit(indata, content=None, force=session["force"])
369 if indata.get("add_profile"):
370 self.add_profile(session, _id, item, indata)
371 elif indata.get("remove_profile"):
372 self.remove_profile(session, _id, item, indata)
373 else:
374 error_msg = "Add / remove operation is only applicable"
375 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
376
377 def add_profile(self, session, _id, item, indata=None):
378 indata = self._remove_envelop(indata)
379 operation_params = indata
380 profile_id = indata["add_profile"][0]["id"]
381 # check state
382 self.state_check(profile_id, session, item)
383 filter_q = self._get_project_filter(session)
384 filter_q[self.id_field(self.topic, _id)] = _id
385 content = self.db.get_one(self.topic, filter_q)
386 profile_list = content[item]
387
388 if profile_id not in profile_list:
389 content["operatingState"] = "PROCESSING"
390 content["current_operation"] = None
391 op_id = self.format_on_operation(
392 content,
393 "add",
394 operation_params,
395 )
396 self.db.set_one("clusters", {"_id": content["_id"]}, content)
397 self._send_msg(
398 "add",
399 {
400 "cluster_id": _id,
401 "profile_id": profile_id,
402 "profile_type": item,
403 "operation_id": op_id,
404 },
405 )
406 else:
407 raise EngineException(
408 f"{item} {profile_id} already exists", HTTPStatus.UNPROCESSABLE_ENTITY
409 )
410
411 def _get_default_profiles(self, session, topic):
412 topic = self.to_select_topic(topic)
413 existing_profiles = topic.list(session, filter_q=None, api_req=False)
414 default_profiles = [
415 profile["_id"]
416 for profile in existing_profiles
417 if profile.get("default", False)
418 ]
419 return default_profiles
420
421 def remove_profile(self, session, _id, item, indata):
422 indata = self._remove_envelop(indata)
423 operation_params = indata
424 profile_id = indata["remove_profile"][0]["id"]
425 filter_q = self._get_project_filter(session)
426 filter_q[self.id_field(self.topic, _id)] = _id
427 content = self.db.get_one(self.topic, filter_q)
428 profile_list = content[item]
429
430 default_profiles = self._get_default_profiles(session, item)
431
432 if profile_id in default_profiles:
433 raise EngineException(
434 "Cannot remove default profile", HTTPStatus.UNPROCESSABLE_ENTITY
435 )
436 if profile_id in profile_list:
437 content["operatingState"] = "PROCESSING"
438 content["current_operation"] = None
439 op_id = self.format_on_operation(
440 content,
441 "remove",
442 operation_params,
443 )
444 self.db.set_one("clusters", {"_id": content["_id"]}, content)
445 self._send_msg(
446 "remove",
447 {
448 "cluster_id": _id,
449 "profile_id": profile_id,
450 "profile_type": item,
451 "operation_id": op_id,
452 },
453 )
454 else:
455 raise EngineException(
456 f"{item} {profile_id} does'nt exists", HTTPStatus.UNPROCESSABLE_ENTITY
457 )
458
yshah53cc9eb2024-07-05 13:06:31 +0000459 def get_cluster_info(self, session, _id, item):
460 if not self.multiproject:
461 filter_db = {}
462 else:
463 filter_db = self._get_project_filter(session)
464 # To allow project&user addressing by name AS WELL AS _id
465 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
466 data = self.db.get_one(self.topic, filter_db)
467 self._send_msg(item, {"_id": _id})
468 return data
469
470 def update_cluster(self, session, _id, item, indata):
471 if not self.multiproject:
472 filter_db = {}
473 else:
474 filter_db = self._get_project_filter(session)
475 # To allow project&user addressing by name AS WELL AS _id
476 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
477 data = self.db.get_one(self.topic, filter_db)
478 data["operatingState"] = "PROCESSING"
479 data["resourceState"] = "IN_PROGRESS"
480 operation_params = indata
481 self.format_on_operation(
482 data,
483 item,
484 operation_params,
485 )
486 self.db.set_one(self.topic, {"_id": _id}, data)
487 op_id = data["current_operation"]
488 data = {"cluster_id": _id, "operation_id": op_id}
489 self._send_msg(item, data)
490 return op_id
491
rshri2d386cb2024-07-05 14:35:51 +0000492
493class K8saddTopic(BaseTopic):
494 topic = "clusters"
495 topic_msg = "cluster"
496 schema_new = k8scluster_new_schema
497
498 def __init__(self, db, fs, msg, auth):
499 BaseTopic.__init__(self, db, fs, msg, auth)
500
501 def add(self, rollback, session, indata, kwargs=None, headers=None):
502 step = "checking quotas"
503 try:
504 self.check_quota(session)
505 step = "name unique check"
506 self.check_unique_name(session, indata["name"])
507 step = "validating input parameters"
508 cls_add_request = self._remove_envelop(indata)
509 self._update_input_with_kwargs(cls_add_request, kwargs)
510 cls_add_request = self._validate_input_new(
511 cls_add_request, session["force"]
512 )
513 operation_params = cls_add_request
514
515 step = "filling cluster details from input data"
516 cls_add = self._add_cluster(cls_add_request, session)
517
518 step = "creating cluster at database"
519 self.format_on_new(
520 cls_add, session["project_id"], make_public=session["public"]
521 )
522 cls_add["current_operation"] = None
523 op_id = self.format_on_operation(
524 cls_add,
525 "register",
526 operation_params,
527 )
528 _id = self.db.create(self.topic, cls_add)
529 self.db.set_one(self.topic, {"_id": _id}, cls_add)
530 rollback.append({"topic": self.topic, "_id": _id})
531 self._send_msg("register", {"cluster_id": _id, "operation_id": op_id})
532 return _id, None
533 except (
534 ValidationError,
535 EngineException,
536 DbException,
537 MsgException,
538 FsException,
539 ) as e:
540 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
541
542 def _add_cluster(self, cls_add_request, session):
543 cls_add = {
544 "name": cls_add_request["name"],
545 "description": cls_add_request["description"],
546 "credentials": cls_add_request["credentials"],
547 "vim_account": cls_add_request["vim_account"],
548 "k8s_version": cls_add_request["k8s_version"],
549 "nets": cls_add_request["nets"],
550 "created": "false",
551 "state": "IN_CREATION",
552 "operatingState": "PROCESSING",
553 "git_name": self.create_gitname(cls_add_request, session),
554 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
555 }
556 return cls_add
557
558 def remove(self, session, _id, dry_run=False, not_send_msg=None):
559 """
560 Delete item by its internal _id
561 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
562 :param _id: server internal id
563 :param dry_run: make checking but do not delete
564 :param not_send_msg: To not send message (False) or store content (list) instead
565 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
566 """
567
568 # To allow addressing projects and users by name AS WELL AS by _id
569 if not self.multiproject:
570 filter_q = {}
571 else:
572 filter_q = self._get_project_filter(session)
573 filter_q[self.id_field(self.topic, _id)] = _id
574 item_content = self.db.get_one(self.topic, filter_q)
575
576 item_content["operatingState"] = "PROCESSING"
577 item_content["current_operation"] = None
578 op_id = self.format_on_operation(
579 item_content,
580 "deregister",
581 None,
582 )
583 self.db.set_one(self.topic, {"_id": _id}, item_content)
584
585 self.check_conflict_on_del(session, _id, item_content)
586 if dry_run:
587 return None
588
589 if self.multiproject and session["project_id"]:
590 # remove reference from project_read if there are more projects referencing it. If it last one,
591 # do not remove reference, but delete
592 other_projects_referencing = next(
593 (
594 p
595 for p in item_content["_admin"]["projects_read"]
596 if p not in session["project_id"] and p != "ANY"
597 ),
598 None,
599 )
600
601 # check if there are projects referencing it (apart from ANY, that means, public)....
602 if other_projects_referencing:
603 # remove references but not delete
604 update_dict_pull = {
605 "_admin.projects_read": session["project_id"],
606 "_admin.projects_write": session["project_id"],
607 }
608 self.db.set_one(
609 self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
610 )
611 return None
612 else:
613 can_write = next(
614 (
615 p
616 for p in item_content["_admin"]["projects_write"]
617 if p == "ANY" or p in session["project_id"]
618 ),
619 None,
620 )
621 if not can_write:
622 raise EngineException(
623 "You have not write permission to delete it",
624 http_code=HTTPStatus.UNAUTHORIZED,
625 )
626
627 # delete
628 self._send_msg(
629 "deregister",
630 {"cluster_id": _id, "operation_id": op_id},
631 not_send_msg=not_send_msg,
632 )
633 return None
yshah53cc9eb2024-07-05 13:06:31 +0000634
635
636class KsusTopic(BaseTopic):
637 topic = "ksus"
638 okapkg_topic = "okas"
639 infra_topic = "k8sinfra"
640 topic_msg = "ksu"
641 schema_new = ksu_schema
642 schema_edit = ksu_schema
643
644 def __init__(self, db, fs, msg, auth):
645 BaseTopic.__init__(self, db, fs, msg, auth)
646 self.logger = logging.getLogger("nbi.ksus")
647
648 @staticmethod
649 def format_on_new(content, project_id=None, make_public=False):
650 BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
651 content["state"] = "IN_CREATION"
652 content["operatingState"] = "PROCESSING"
653 content["resourceState"] = "IN_PROGRESS"
654
655 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
656 _id_list = []
657 op_id = str(uuid4())
658 for ksus in indata["ksus"]:
659 content = ksus
660 oka = content["oka"][0]
661 oka_flag = ""
662 if oka["_id"]:
663 oka_flag = "_id"
664 elif oka["sw_catalog_path"]:
665 oka_flag = "sw_catalog_path"
666
667 for okas in content["oka"]:
668 if okas["_id"] and okas["sw_catalog_path"]:
669 raise EngineException(
670 "Cannot create ksu with both OKA and SW catalog path",
671 HTTPStatus.UNPROCESSABLE_ENTITY,
672 )
673 if not okas["sw_catalog_path"]:
674 okas.pop("sw_catalog_path")
675 elif not okas["_id"]:
676 okas.pop("_id")
677 if oka_flag not in okas.keys():
678 raise EngineException(
679 "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
680 HTTPStatus.UNPROCESSABLE_ENTITY,
681 )
682
683 # Override descriptor with query string kwargs
684 content = self._remove_envelop(content)
685 self._update_input_with_kwargs(content, kwargs)
686 content = self._validate_input_new(input=content, force=session["force"])
687
688 # Check for unique name
689 self.check_unique_name(session, content["name"])
690
691 self.check_conflict_on_new(session, content)
692
693 operation_params = {}
694 for content_key, content_value in content.items():
695 operation_params[content_key] = content_value
696 self.format_on_new(
697 content, project_id=session["project_id"], make_public=session["public"]
698 )
699 content["current_operation"] = op_id
700 op_id = self.format_on_operation(
701 content,
702 operation_type="create",
703 operation_params=operation_params,
704 )
705 content["git_name"] = self.create_gitname(content, session)
706
707 # Update Oka_package usage state
708 for okas in content["oka"]:
709 if "_id" in okas.keys():
710 self.update_usage_state(session, okas)
711
712 _id = self.db.create(self.topic, content)
713 rollback.append({"topic": self.topic, "_id": _id})
714
715 if not op_id:
716 op_id = content["current_operation"]
717 _id_list.append(_id)
718 data = {"ksus_list": _id_list, "operation_id": op_id}
719 self._send_msg("create", data)
720 return _id_list, op_id
721
722 def clone(self, rollback, session, _id, indata, kwargs, headers):
723 filter_db = self._get_project_filter(session)
724 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
725 data = self.db.get_one(self.topic, filter_db)
726
727 data["current_operation"] = None
728 op_id = self.format_on_operation(
729 data,
730 "clone",
731 indata,
732 )
733 self.db.set_one(self.topic, {"_id": data["_id"]}, data)
734 self._send_msg("clone", {"ksus_list": [data["_id"]], "operation_id": op_id})
735 return op_id
736
737 def update_usage_state(self, session, oka_content):
738 _id = oka_content["_id"]
739 filter_db = self._get_project_filter(session)
740 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
741
742 data = self.db.get_one(self.okapkg_topic, filter_db)
743 if data["_admin"]["usageState"] == "NOT_IN_USE":
744 usage_state_update = {
745 "_admin.usageState": "IN_USE",
746 }
747 self.db.set_one(
748 self.okapkg_topic, {"_id": _id}, update_dict=usage_state_update
749 )
750
751 def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
752 indata = self._remove_envelop(indata)
753
754 # Override descriptor with query string kwargs
755 if kwargs:
756 self._update_input_with_kwargs(indata, kwargs)
757 try:
758 if indata and session.get("set_project"):
759 raise EngineException(
760 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
761 HTTPStatus.UNPROCESSABLE_ENTITY,
762 )
763 # TODO self._check_edition(session, indata, _id, force)
764 if not content:
765 content = self.show(session, _id)
766 indata = self._validate_input_edit(
767 input=indata, content=content, force=session["force"]
768 )
769 operation_params = indata
770 deep_update_rfc7396(content, indata)
771
772 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
773 _id = content.get("_id") or _id
774 content["current_operation"] = None
775 self.format_on_operation(
776 content,
777 "move",
778 operation_params,
779 )
780 if content.get("_admin"):
781 now = time()
782 content["_admin"]["modified"] = now
783 content["operatingState"] = "PROCESSING"
784 content["resourceState"] = "IN_PROGRESS"
785
786 self.db.replace(self.topic, _id, content)
787
788 op_id = content["current_operation"]
789 data = {"ksus_list": [content["_id"]], "operation_id": op_id}
790 self._send_msg("move", data)
791 return op_id
792 except ValidationError as e:
793 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
794
795 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
796 if final_content["name"] != edit_content["name"]:
797 self.check_unique_name(session, edit_content["name"])
798 return final_content
799
800 @staticmethod
801 def format_on_edit(final_content, edit_content):
802 BaseTopic.format_on_operation(
803 final_content,
804 "update",
805 edit_content,
806 )
807 final_content["operatingState"] = "PROCESSING"
808 final_content["resourceState"] = "IN_PROGRESS"
809 if final_content.get("_admin"):
810 now = time()
811 final_content["_admin"]["modified"] = now
812 return final_content["current_operation"]
813
814 def edit(self, session, _id, indata, kwargs):
815 _id_list = []
816 op_id = str(uuid4())
817 if _id == "update":
818 for ksus in indata["ksus"]:
819 content = ksus
820 _id = content["_id"]
821 _id_list.append(_id)
822 content.pop("_id")
823 op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
824 else:
825 content = indata
826 _id_list.append(_id)
827 op_id = self.edit_ksu(session, _id, op_id, content, kwargs)
828
829 data = {"ksus_list": _id_list, "operation_id": op_id}
830 self._send_msg("edit", data)
831 return op_id
832
833 def edit_ksu(self, session, _id, op_id, indata, kwargs):
834 content = None
835 indata = self._remove_envelop(indata)
836
837 # Override descriptor with query string kwargs
838 if kwargs:
839 self._update_input_with_kwargs(indata, kwargs)
840 try:
841 if indata and session.get("set_project"):
842 raise EngineException(
843 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
844 HTTPStatus.UNPROCESSABLE_ENTITY,
845 )
846 # TODO self._check_edition(session, indata, _id, force)
847 if not content:
848 content = self.show(session, _id)
849
850 for okas in indata["oka"]:
851 if not okas["_id"]:
852 okas.pop("_id")
853 if not okas["sw_catalog_path"]:
854 okas.pop("sw_catalog_path")
855
856 indata = self._validate_input_edit(indata, content, force=session["force"])
857
858 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
859 _id = content.get("_id") or _id
860
861 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
862 content["current_operation"] = op_id
863 op_id = self.format_on_edit(content, indata)
864 self.db.replace(self.topic, _id, content)
865 return op_id
866 except ValidationError as e:
867 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
868
869 def delete_ksu(self, session, _id, indata, dry_run=False, not_send_msg=None):
870 _id_list = []
871 op_id = str(uuid4())
872 if _id == "delete":
873 for ksus in indata["ksus"]:
874 content = ksus
875 _id = content["_id"]
876 _id_list.append(_id)
877 content.pop("_id")
878 op_id = self.delete(session, _id, op_id)
879 else:
880 _id_list.append(_id)
881 op_id = self.delete(session, _id, op_id)
882
883 data = {"ksus_list": _id_list, "operation_id": op_id}
884 self._send_msg("delete", data)
885 return op_id
886
887 def delete(self, session, _id, op_id):
888 if not self.multiproject:
889 filter_q = {}
890 else:
891 filter_q = self._get_project_filter(session)
892 filter_q[self.id_field(self.topic, _id)] = _id
893 item_content = self.db.get_one(self.topic, filter_q)
894 item_content["state"] = "IN_DELETION"
895 item_content["operatingState"] = "PROCESSING"
896 item_content["resourceState"] = "IN_PROGRESS"
897 item_content["current_operation"] = op_id
898 self.format_on_operation(
899 item_content,
900 "delete",
901 None,
902 )
903 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
904
905 if item_content["oka"][0].get("_id"):
906 used_oka = {}
907 existing_oka = []
908 for okas in item_content["oka"]:
909 used_oka["_id"] = okas["_id"]
910
911 filter = self._get_project_filter(session)
912 data = self.db.get_list(self.topic, filter)
913
914 if data:
915 for ksus in data:
916 if ksus["_id"] != _id:
917 for okas in ksus["oka"]:
918 if okas["_id"] not in existing_oka:
919 existing_oka.append(okas["_id"])
920
921 if used_oka:
922 for oka, oka_id in used_oka.items():
923 if oka_id not in existing_oka:
924 self.db.set_one(
925 self.okapkg_topic,
926 {"_id": oka_id},
927 {"_admin.usageState": "NOT_IN_USE"},
928 )
929 return op_id
930
931
932class OkaTopic(DescriptorTopic):
933 topic = "okas"
934 topic_msg = "oka"
935 schema_new = oka_schema
936 schema_edit = oka_schema
937
938 def __init__(self, db, fs, msg, auth):
939 super().__init__(db, fs, msg, auth)
940 self.logger = logging.getLogger("nbi.oka")
941
942 @staticmethod
943 def format_on_new(content, project_id=None, make_public=False):
944 DescriptorTopic.format_on_new(
945 content, project_id=project_id, make_public=make_public
946 )
947 content["state"] = "PENDING_CONTENT"
948 content["operatingState"] = "PROCESSING"
949 content["resourceState"] = "IN_PROGRESS"
950
951 def check_conflict_on_del(self, session, _id, db_content):
952 usage_state = db_content["_admin"]["usageState"]
953 if usage_state == "IN_USE":
954 raise EngineException(
955 "There is a KSU using this package",
956 http_code=HTTPStatus.CONFLICT,
957 )
958
959 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
960 if (
961 final_content["name"] == edit_content["name"]
962 and final_content["description"] == edit_content["description"]
963 ):
964 raise EngineException(
965 "No update",
966 http_code=HTTPStatus.CONFLICT,
967 )
968 if final_content["name"] != edit_content["name"]:
969 self.check_unique_name(session, edit_content["name"])
970 return final_content
971
972 def edit(self, session, _id, indata=None, kwargs=None, content=None):
973 indata = self._remove_envelop(indata)
974
975 # Override descriptor with query string kwargs
976 if kwargs:
977 self._update_input_with_kwargs(indata, kwargs)
978 try:
979 if indata and session.get("set_project"):
980 raise EngineException(
981 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
982 HTTPStatus.UNPROCESSABLE_ENTITY,
983 )
984 # TODO self._check_edition(session, indata, _id, force)
985 if not content:
986 content = self.show(session, _id)
987
988 indata = self._validate_input_edit(indata, content, force=session["force"])
989
990 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
991 _id = content.get("_id") or _id
992
993 content = self.check_conflict_on_edit(session, content, indata, _id=_id)
994 op_id = self.format_on_edit(content, indata)
995 deep_update_rfc7396(content, indata)
996
997 self.db.replace(self.topic, _id, content)
998 return op_id
999 except ValidationError as e:
1000 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
1001
1002 def delete(self, session, _id, dry_run=False, not_send_msg=None):
1003 if not self.multiproject:
1004 filter_q = {}
1005 else:
1006 filter_q = self._get_project_filter(session)
1007 filter_q[self.id_field(self.topic, _id)] = _id
1008 item_content = self.db.get_one(self.topic, filter_q)
1009 item_content["state"] = "IN_DELETION"
1010 item_content["operatingState"] = "PROCESSING"
1011 self.check_conflict_on_del(session, _id, item_content)
1012 item_content["current_operation"] = None
1013 self.format_on_operation(
1014 item_content,
1015 "delete",
1016 None,
1017 )
1018 op_id = item_content["current_operation"]
1019 self.db.set_one(self.topic, {"_id": item_content["_id"]}, item_content)
1020 self._send_msg(
1021 "delete", {"oka_id": _id, "operation_id": op_id}, not_send_msg=not_send_msg
1022 )
1023
1024 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
1025 # _remove_envelop
1026 if indata:
1027 if "userDefinedData" in indata:
1028 indata = indata["userDefinedData"]
1029
1030 content = {"_admin": {"userDefinedData": indata, "revision": 0}}
1031
1032 self._update_input_with_kwargs(content, kwargs)
1033 content = BaseTopic._validate_input_new(
1034 self, input=kwargs, force=session["force"]
1035 )
1036
1037 self.check_unique_name(session, content["name"])
1038 operation_params = {}
1039 for content_key, content_value in content.items():
1040 operation_params[content_key] = content_value
1041 self.format_on_new(
1042 content, session["project_id"], make_public=session["public"]
1043 )
1044 content["current_operation"] = None
1045 self.format_on_operation(
1046 content,
1047 operation_type="create",
1048 operation_params=operation_params,
1049 )
1050 content["git_name"] = self.create_gitname(content, session)
1051 _id = self.db.create(self.topic, content)
1052 rollback.append({"topic": self.topic, "_id": _id})
1053 return _id, None
1054
1055 def upload_content(self, session, _id, indata, kwargs, headers):
1056 current_desc = self.show(session, _id)
1057
1058 compressed = None
1059 content_type = headers.get("Content-Type")
1060 if (
1061 content_type
1062 and "application/gzip" in content_type
1063 or "application/x-gzip" in content_type
1064 ):
1065 compressed = "gzip"
1066 if content_type and "application/zip" in content_type:
1067 compressed = "zip"
1068 filename = headers.get("Content-Filename")
1069 if not filename and compressed:
1070 filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
1071 elif not filename:
1072 filename = "package"
1073
1074 revision = 1
1075 if "revision" in current_desc["_admin"]:
1076 revision = current_desc["_admin"]["revision"] + 1
1077
1078 file_pkg = None
1079 fs_rollback = []
1080
1081 try:
1082 start = 0
1083 # Rather than using a temp folder, we will store the package in a folder based on
1084 # the current revision.
1085 proposed_revision_path = _id + ":" + str(revision)
1086 # all the content is upload here and if ok, it is rename from id_ to is folder
1087
1088 if start:
1089 if not self.fs.file_exists(proposed_revision_path, "dir"):
1090 raise EngineException(
1091 "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
1092 )
1093 else:
1094 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1095 self.fs.mkdir(proposed_revision_path)
1096 fs_rollback.append(proposed_revision_path)
1097
1098 storage = self.fs.get_params()
1099 storage["folder"] = proposed_revision_path
1100
1101 file_path = (proposed_revision_path, filename)
1102 file_pkg = self.fs.file_open(file_path, "a+b")
1103
1104 filename = indata.filename
1105
1106 if isinstance(indata, dict):
1107 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
1108 file_pkg.write(indata_text.encode(encoding="utf-8"))
1109 else:
1110 indata_len = 0
1111 indata = indata.file
1112 while True:
1113 indata_text = indata.read(4096)
1114 indata_len += len(indata_text)
1115 if not indata_text:
1116 break
1117 file_pkg.write(indata_text)
1118
1119 # PACKAGE UPLOADED
1120 file_pkg.seek(0, 0)
1121 if compressed == "gzip":
1122 tar = tarfile.open(mode="r", fileobj=file_pkg)
1123 for tarinfo in tar:
1124 tarname = tarinfo.name
1125 tarname_path = tarname.split("/")
1126 self.logger.debug(
1127 "Tarname: {} Tarname Path: {}".format(tarname, tarname_path)
1128 )
1129 storage["zipfile"] = filename
1130 self.fs.file_extract(tar, proposed_revision_path)
1131 else:
1132 content = file_pkg.read()
1133 self.logger.debug("Content: {}".format(content))
1134
1135 # Need to close the file package here so it can be copied from the
1136 # revision to the current, unrevisioned record
1137 if file_pkg:
1138 file_pkg.close()
1139 file_pkg = None
1140
1141 # Fetch both the incoming, proposed revision and the original revision so we
1142 # can call a validate method to compare them
1143 current_revision_path = _id + "/"
1144 self.fs.sync(from_path=current_revision_path)
1145 self.fs.sync(from_path=proposed_revision_path)
1146
1147 if revision > 1:
1148 try:
1149 self._validate_descriptor_changes(
1150 _id,
1151 filename,
1152 current_revision_path,
1153 proposed_revision_path,
1154 )
1155 except Exception as e:
1156 shutil.rmtree(
1157 self.fs.path + current_revision_path, ignore_errors=True
1158 )
1159 shutil.rmtree(
1160 self.fs.path + proposed_revision_path, ignore_errors=True
1161 )
1162 # Only delete the new revision. We need to keep the original version in place
1163 # as it has not been changed.
1164 self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
1165 raise e
1166
1167 indata = self._remove_envelop(indata)
1168
1169 # Override descriptor with query string kwargs
1170 if kwargs:
1171 self._update_input_with_kwargs(indata, kwargs)
1172
1173 current_desc["_admin"]["storage"] = storage
1174 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
1175 current_desc["_admin"]["operationalState"] = "ENABLED"
1176 current_desc["_admin"]["modified"] = time()
1177 current_desc["_admin"]["revision"] = revision
1178
1179 deep_update_rfc7396(current_desc, indata)
1180
1181 # Copy the revision to the active package name by its original id
1182 shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
1183 os.rename(
1184 self.fs.path + proposed_revision_path,
1185 self.fs.path + current_revision_path,
1186 )
1187 self.fs.file_delete(current_revision_path, ignore_non_exist=True)
1188 self.fs.mkdir(current_revision_path)
1189 self.fs.reverse_sync(from_path=current_revision_path)
1190
1191 shutil.rmtree(self.fs.path + _id)
1192 kwargs = {}
1193 kwargs["package"] = filename
1194 if headers["Method"] == "POST":
1195 current_desc["state"] = "IN_CREATION"
1196 elif headers["Method"] in ("PUT", "PATCH"):
1197 current_desc["current_operation"] = None
1198 self.format_on_operation(
1199 current_desc,
1200 "update",
1201 kwargs,
1202 )
1203 current_desc["operatingState"] = "PROCESSING"
1204 current_desc["resourceState"] = "IN_PROGRESS"
1205
1206 self.db.replace(self.topic, _id, current_desc)
1207
1208 # Store a copy of the package as a point in time revision
1209 revision_desc = dict(current_desc)
1210 revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
1211 self.db.create(self.topic + "_revisions", revision_desc)
1212 fs_rollback = []
1213
1214 op_id = current_desc["current_operation"]
1215 if headers["Method"] == "POST":
1216 self._send_msg("create", {"oka_id": _id, "operation_id": op_id})
1217 elif headers["Method"] == "PUT" or "PATCH":
1218 self._send_msg("edit", {"oka_id": _id, "operation_id": op_id})
1219
1220 return True
1221
1222 except EngineException:
1223 raise
1224 finally:
1225 if file_pkg:
1226 file_pkg.close()
1227 for file in fs_rollback:
1228 self.fs.file_delete(file, ignore_non_exist=True)