Coverage for osm_nbi/acm_topic.py: 17%
186 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-10 20:04 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-10 20:04 +0000
1# -*- coding: utf-8 -*-
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
16from pyrage import x25519
17from uuid import uuid4
19from http import HTTPStatus
20from time import time
22# from osm_common.dbbase import deep_update_rfc7396, DbException
23from osm_common.msgbase import MsgException
24from osm_common.dbbase import DbException
25from osm_common.fsbase import FsException
26from osm_nbi.base_topic import BaseTopic, EngineException
27from osm_nbi.validation import ValidationError
29# import logging
30# import random
31# import string
32# from yaml import safe_load, YAMLError
35class ACMOperationTopic:
36 def __init__(self, db, fs, msg, auth):
37 self.multiproject = None # Declare the attribute here
39 @staticmethod
40 def format_on_operation(content, operation_type, operation_params=None):
41 op_id = str(uuid4())
42 now = time()
43 if "operationHistory" not in content:
44 content["operationHistory"] = []
46 operation = {}
47 operation["operationType"] = operation_type
48 operation["op_id"] = op_id
49 operation["result"] = None
50 operation["creationDate"] = now
51 operation["endDate"] = None
52 operation["workflowState"] = operation["resourceState"] = operation[
53 "operationState"
54 ] = operation["gitOperationInfo"] = None
55 operation["operationParams"] = operation_params
57 content["operationHistory"].append(operation)
58 return op_id
61class ACMTopic(BaseTopic, ACMOperationTopic):
62 def __init__(self, db, fs, msg, auth):
63 super().__init__(db, fs, msg, auth)
64 # ACMOperationTopic.__init__(db, fs, msg, auth)
66 def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None):
67 step = "name unique check"
68 try:
69 self.check_unique_name(session, indata["name"])
71 step = "validating input parameters"
72 profile_request = self._remove_envelop(indata)
73 self._update_input_with_kwargs(profile_request, kwargs)
74 profile_request = self._validate_input_new(
75 profile_request, session["force"]
76 )
77 operation_params = profile_request
79 step = "filling profile details from input data"
80 profile_create = self._create_profile(profile_request, session)
82 step = "creating profile at database"
83 self.format_on_new(
84 profile_create, session["project_id"], make_public=session["public"]
85 )
86 profile_create["current_operation"] = None
87 op_id = ACMOperationTopic.format_on_operation(
88 profile_create,
89 "create",
90 operation_params,
91 )
93 _id = self.db.create(self.topic, profile_create)
94 pubkey, privkey = self._generate_age_key()
95 profile_create["age_pubkey"] = self.db.encrypt(
96 pubkey, schema_version="1.11", salt=_id
97 )
98 profile_create["age_privkey"] = self.db.encrypt(
99 privkey, schema_version="1.11", salt=_id
100 )
101 rollback.append({"topic": self.topic, "_id": _id})
102 self.db.set_one(self.topic, {"_id": _id}, profile_create)
103 if op_id:
104 profile_create["op_id"] = op_id
105 self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id})
107 return _id, None
108 except (
109 ValidationError,
110 EngineException,
111 DbException,
112 MsgException,
113 FsException,
114 ) as e:
115 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
117 def _create_profile(self, profile_request, session):
118 profile_desc = {
119 "name": profile_request["name"],
120 "description": profile_request["description"],
121 "default": False,
122 "git_name": self.create_gitname(profile_request, session),
123 "state": "IN_CREATION",
124 "operatingState": "IN_PROGRESS",
125 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
126 }
127 return profile_desc
129 def default_profile(
130 self, rollback, session, indata=None, kwargs=None, headers=None
131 ):
132 step = "validating input parameters"
133 try:
134 profile_request = self._remove_envelop(indata)
135 self._update_input_with_kwargs(profile_request, kwargs)
136 operation_params = profile_request
138 step = "filling profile details from input data"
139 profile_create = self._create_default_profile(profile_request, session)
141 step = "creating profile at database"
142 self.format_on_new(
143 profile_create, session["project_id"], make_public=session["public"]
144 )
145 profile_create["current_operation"] = None
146 ACMOperationTopic.format_on_operation(
147 profile_create,
148 "create",
149 operation_params,
150 )
151 _id = self.db.create(self.topic, profile_create)
152 rollback.append({"topic": self.topic, "_id": _id})
153 return _id
154 except (
155 ValidationError,
156 EngineException,
157 DbException,
158 MsgException,
159 FsException,
160 ) as e:
161 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
163 def _create_default_profile(self, profile_request, session):
164 profile_desc = {
165 "name": profile_request["name"],
166 "description": f"{self.topic} profile for cluster {profile_request['name']}",
167 "default": True,
168 "git_name": self.create_gitname(profile_request, session),
169 "state": "IN_CREATION",
170 "operatingState": "IN_PROGRESS",
171 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
172 }
173 return profile_desc
175 def detach(self, session, _id, profile_type):
176 # To detach the profiles from every cluster
177 filter_q = {}
178 existing_clusters = self.db.get_list("clusters", filter_q)
179 existing_clusters_profiles = [
180 profile["_id"]
181 for profile in existing_clusters
182 if profile.get("profile_type", _id)
183 ]
184 update_dict = None
185 for profile in existing_clusters_profiles:
186 filter_q = {"_id": profile}
187 data = self.db.get_one("clusters", filter_q)
188 if profile_type in data:
189 profile_ids = data[profile_type]
190 if _id in profile_ids:
191 profile_ids.remove(_id)
192 update_dict = {profile_type: profile_ids}
193 self.db.set_one("clusters", filter_q, update_dict)
195 def _generate_age_key(self):
196 ident = x25519.Identity.generate()
197 # gets the public key
198 pubkey = str(ident.to_public())
199 # gets the private key
200 privkey = str(ident)
201 # return both public and private key
202 return pubkey, privkey
204 def common_delete(self, _id, db_content):
205 if "state" in db_content:
206 db_content["state"] = "IN_DELETION"
207 db_content["operatingState"] = "PROCESSING"
208 # self.db.set_one(self.topic, {"_id": _id}, db_content)
210 db_content["current_operation"] = None
211 op_id = ACMOperationTopic.format_on_operation(
212 db_content,
213 "delete",
214 None,
215 )
216 self.db.set_one(self.topic, {"_id": _id}, db_content)
217 return op_id
219 def add_to_old_collection(self, content, session):
220 item = {}
221 item["name"] = content["name"]
222 item["credentials"] = {}
223 # item["k8s_version"] = content["k8s_version"]
224 if "k8s_version" in content:
225 item["k8s_version"] = content["k8s_version"]
226 else:
227 item["k8s_version"] = None
228 vim_account_details = self.db.get_one(
229 "vim_accounts", {"name": content["vim_account"]}
230 )
231 item["vim_account"] = vim_account_details["_id"]
232 item["nets"] = {"k8s_net1": None}
233 item["deployment_methods"] = {"juju-bundle": False, "helm-chart-v3": True}
234 # item["description"] = content["description"]
235 if "description" in content:
236 item["description"] = content["description"]
237 else:
238 item["description"] = None
239 item["namespace"] = "kube-system"
240 item["osm_acm"] = True
241 item["schema_version"] = "1.11"
242 self.format_on_new(item, session["project_id"], make_public=session["public"])
243 _id = self.db.create("k8sclusters", item)
244 self.logger.info(f"_id is : {_id}")
245 item_1 = self.db.get_one("k8sclusters", {"name": item["name"]})
246 item_1["_admin"]["operationalState"] = "PROCESSING"
248 # Create operation data
249 now = time()
250 operation_data = {
251 "lcmOperationType": "create", # Assuming 'create' operation here
252 "operationState": "PROCESSING",
253 "startTime": now,
254 "statusEnteredTime": now,
255 "detailed-status": "",
256 "operationParams": None, # Add parameters as needed
257 }
258 # create operation
259 item_1["_admin"]["operations"] = [operation_data]
260 item_1["_admin"]["current_operation"] = None
261 self.logger.info(f"content is : {item_1}")
262 self.db.set_one("k8sclusters", {"_id": item_1["_id"]}, item_1)
263 return
265 def cluster_unique_name_check(self, session, name):
266 # First check using the method you have for unique name validation
267 self.check_unique_name(session, name)
268 _filter = {"name": name}
269 topics = [
270 "k8sclusters",
271 "k8sapp",
272 "k8sinfra_config",
273 "k8sinfra_controller",
274 "k8sresource",
275 ]
277 # Loop through each topic to check if the name already exists in any of them
278 for item in topics:
279 if self.db.get_one(item, _filter, fail_on_empty=False, fail_on_more=False):
280 raise EngineException(
281 f"name '{name}' already exists in topic '{item}'",
282 HTTPStatus.CONFLICT,
283 )
285 def list_both(self, session, filter_q=None, api_req=False):
286 """List all clusters from both new and old APIs"""
287 if not filter_q:
288 filter_q = {}
289 if self.multiproject:
290 filter_q.update(self._get_project_filter(session))
291 cluster_list1 = self.db.get_list(self.topic, filter_q)
292 cluster_list2 = self.db.get_list("k8sclusters", filter_q)
293 list1_names = {item["name"] for item in cluster_list1}
294 for item in cluster_list2:
295 if item["name"] not in list1_names:
296 # Complete the information for clusters from old API
297 item["state"] = "N/A"
298 old_state = item.get("_admin", {}).get("operationalState", "Unknown")
299 item["bootstrap"] = "NO"
300 item["operatingState"] = "N/A"
301 item["resourceState"] = old_state
302 item["created"] = "NO"
303 cluster_list1.append(item)
304 if api_req:
305 cluster_list1 = [self.sol005_projection(inst) for inst in cluster_list1]
306 return cluster_list1
309class ProfileTopic(ACMTopic):
310 profile_topic_map = {
311 "k8sapp": "app_profiles",
312 "k8sresource": "resource_profiles",
313 "k8sinfra_controller": "infra_controller_profiles",
314 "k8sinfra_config": "infra_config_profiles",
315 }
317 def __init__(self, db, fs, msg, auth):
318 super().__init__(db, fs, msg, auth)
320 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
321 check = self.db.get_one(self.topic, {"_id": _id})
322 if check["default"] is True:
323 raise EngineException(
324 "Cannot edit default profiles",
325 HTTPStatus.UNPROCESSABLE_ENTITY,
326 )
327 if "name" in indata and check["name"] != indata["name"]:
328 self.check_unique_name(session, indata["name"])
329 return True
331 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
332 op_id = self.common_delete(_id, db_content)
333 return {"profile_id": _id, "operation_id": op_id, "force": session["force"]}
335 def delete_profile(self, session, _id, dry_run=False, not_send_msg=None):
336 item_content = self.db.get_one(self.topic, {"_id": _id})
337 if item_content.get("default", False):
338 raise EngineException(
339 "Cannot delete item because it is marked as default",
340 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
341 )
342 # Before deleting, detach the profile from the associated clusters.
343 profile_type = self.profile_topic_map[self.topic]
344 self.detach(session, _id, profile_type)
345 # To delete the infra controller profile
346 super().delete(session, _id, not_send_msg=not_send_msg)
347 return _id