blob: eb335da4c470f139c55e52a270828ceb4bbfacd7 [file] [log] [blame]
shrinithi28d887f2025-01-08 05:27:19 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from pyrage import x25519
17from uuid import uuid4
18
19from http import HTTPStatus
20from time import time
21
22# from osm_common.dbbase import deep_update_rfc7396, DbException
23from osm_common.msgbase import MsgException
24from osm_common.dbbase import DbException
25from osm_common.fsbase import FsException
26from osm_nbi.base_topic import BaseTopic, EngineException
27from osm_nbi.validation import ValidationError
28
yshahf8f07632025-01-17 04:46:03 +000029import logging
30
shrinithi28d887f2025-01-08 05:27:19 +000031# import random
32# import string
33# from yaml import safe_load, YAMLError
34
35
36class ACMOperationTopic:
37 def __init__(self, db, fs, msg, auth):
38 self.multiproject = None # Declare the attribute here
yshahf8f07632025-01-17 04:46:03 +000039 self.db = db
40 self.fs = fs
41 self.msg = msg
42 self.logger = logging.getLogger("nbi.base")
43 self.auth = auth
shrinithi28d887f2025-01-08 05:27:19 +000044
45 @staticmethod
46 def format_on_operation(content, operation_type, operation_params=None):
47 op_id = str(uuid4())
48 now = time()
49 if "operationHistory" not in content:
50 content["operationHistory"] = []
51
52 operation = {}
53 operation["operationType"] = operation_type
54 operation["op_id"] = op_id
55 operation["result"] = None
56 operation["creationDate"] = now
57 operation["endDate"] = None
58 operation["workflowState"] = operation["resourceState"] = operation[
59 "operationState"
60 ] = operation["gitOperationInfo"] = None
61 operation["operationParams"] = operation_params
62
63 content["operationHistory"].append(operation)
64 return op_id
65
yshahf8f07632025-01-17 04:46:03 +000066 def check_dependency(self, check, operation_type=None):
67 topic_to_db_mapping = {
68 "cluster": "clusters",
69 "ksu": "ksus",
70 "infra_controller_profiles": "k8sinfra_controller",
71 "infra_config_profiles": "k8sinfra_config",
72 "resource_profiles": "k8sresource",
73 "app_profiles": "k8sapp",
74 "oka": "okas",
75 }
76 for topic, _id in check.items():
77 filter_q = {
78 "_id": _id,
79 }
80 if topic == "okas":
81 for oka_element in check[topic]:
82 self.check_dependency({"oka": oka_element})
83 if topic not in ("okas"):
84 element_content = self.db.get_one(topic_to_db_mapping[topic], filter_q)
85 element_name = element_content.get("name")
86 state = element_content["state"]
87 if (
88 operation_type == "delete"
89 and state == "FAILED_CREATION"
90 and element_content["operatingState"] == "IDLE"
91 ):
92 self.logger.info(f"Delete operation is allowed in {state} state")
93 return
94 elif element_content["state"] != "CREATED":
95 raise EngineException(
96 f"State of the {element_name} {topic} is {state}",
97 HTTPStatus.UNPROCESSABLE_ENTITY,
98 )
99 elif (
100 state == "CREATED"
101 and element_content["operatingState"] == "PROCESSING"
102 ):
103 raise EngineException(
104 f"operatingState of the {element_name} {topic} is not IDLE",
105 HTTPStatus.UNPROCESSABLE_ENTITY,
106 )
107
shrinithi28d887f2025-01-08 05:27:19 +0000108
109class ACMTopic(BaseTopic, ACMOperationTopic):
110 def __init__(self, db, fs, msg, auth):
111 super().__init__(db, fs, msg, auth)
112 # ACMOperationTopic.__init__(db, fs, msg, auth)
113
114 def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None):
115 step = "name unique check"
116 try:
117 self.check_unique_name(session, indata["name"])
118
119 step = "validating input parameters"
120 profile_request = self._remove_envelop(indata)
121 self._update_input_with_kwargs(profile_request, kwargs)
122 profile_request = self._validate_input_new(
123 profile_request, session["force"]
124 )
125 operation_params = profile_request
126
127 step = "filling profile details from input data"
128 profile_create = self._create_profile(profile_request, session)
129
130 step = "creating profile at database"
131 self.format_on_new(
132 profile_create, session["project_id"], make_public=session["public"]
133 )
134 profile_create["current_operation"] = None
135 op_id = ACMOperationTopic.format_on_operation(
136 profile_create,
137 "create",
138 operation_params,
139 )
140
141 _id = self.db.create(self.topic, profile_create)
142 pubkey, privkey = self._generate_age_key()
143 profile_create["age_pubkey"] = self.db.encrypt(
144 pubkey, schema_version="1.11", salt=_id
145 )
146 profile_create["age_privkey"] = self.db.encrypt(
147 privkey, schema_version="1.11", salt=_id
148 )
149 rollback.append({"topic": self.topic, "_id": _id})
150 self.db.set_one(self.topic, {"_id": _id}, profile_create)
151 if op_id:
152 profile_create["op_id"] = op_id
153 self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id})
154
155 return _id, None
156 except (
157 ValidationError,
158 EngineException,
159 DbException,
160 MsgException,
161 FsException,
162 ) as e:
163 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
164
165 def _create_profile(self, profile_request, session):
166 profile_desc = {
167 "name": profile_request["name"],
168 "description": profile_request["description"],
169 "default": False,
170 "git_name": self.create_gitname(profile_request, session),
171 "state": "IN_CREATION",
172 "operatingState": "IN_PROGRESS",
173 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
174 }
175 return profile_desc
176
177 def default_profile(
178 self, rollback, session, indata=None, kwargs=None, headers=None
179 ):
180 step = "validating input parameters"
181 try:
182 profile_request = self._remove_envelop(indata)
183 self._update_input_with_kwargs(profile_request, kwargs)
184 operation_params = profile_request
185
186 step = "filling profile details from input data"
187 profile_create = self._create_default_profile(profile_request, session)
188
189 step = "creating profile at database"
190 self.format_on_new(
191 profile_create, session["project_id"], make_public=session["public"]
192 )
193 profile_create["current_operation"] = None
194 ACMOperationTopic.format_on_operation(
195 profile_create,
196 "create",
197 operation_params,
198 )
199 _id = self.db.create(self.topic, profile_create)
200 rollback.append({"topic": self.topic, "_id": _id})
201 return _id
202 except (
203 ValidationError,
204 EngineException,
205 DbException,
206 MsgException,
207 FsException,
208 ) as e:
209 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
210
211 def _create_default_profile(self, profile_request, session):
212 profile_desc = {
213 "name": profile_request["name"],
214 "description": f"{self.topic} profile for cluster {profile_request['name']}",
215 "default": True,
216 "git_name": self.create_gitname(profile_request, session),
217 "state": "IN_CREATION",
218 "operatingState": "IN_PROGRESS",
219 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
220 }
221 return profile_desc
222
223 def detach(self, session, _id, profile_type):
224 # To detach the profiles from every cluster
225 filter_q = {}
226 existing_clusters = self.db.get_list("clusters", filter_q)
227 existing_clusters_profiles = [
228 profile["_id"]
229 for profile in existing_clusters
230 if profile.get("profile_type", _id)
231 ]
232 update_dict = None
233 for profile in existing_clusters_profiles:
234 filter_q = {"_id": profile}
235 data = self.db.get_one("clusters", filter_q)
236 if profile_type in data:
237 profile_ids = data[profile_type]
238 if _id in profile_ids:
239 profile_ids.remove(_id)
240 update_dict = {profile_type: profile_ids}
241 self.db.set_one("clusters", filter_q, update_dict)
242
243 def _generate_age_key(self):
244 ident = x25519.Identity.generate()
245 # gets the public key
246 pubkey = str(ident.to_public())
247 # gets the private key
248 privkey = str(ident)
249 # return both public and private key
250 return pubkey, privkey
251
252 def common_delete(self, _id, db_content):
253 if "state" in db_content:
254 db_content["state"] = "IN_DELETION"
255 db_content["operatingState"] = "PROCESSING"
256 # self.db.set_one(self.topic, {"_id": _id}, db_content)
257
258 db_content["current_operation"] = None
259 op_id = ACMOperationTopic.format_on_operation(
260 db_content,
261 "delete",
262 None,
263 )
264 self.db.set_one(self.topic, {"_id": _id}, db_content)
265 return op_id
266
267 def add_to_old_collection(self, content, session):
268 item = {}
269 item["name"] = content["name"]
270 item["credentials"] = {}
271 # item["k8s_version"] = content["k8s_version"]
272 if "k8s_version" in content:
273 item["k8s_version"] = content["k8s_version"]
274 else:
275 item["k8s_version"] = None
276 vim_account_details = self.db.get_one(
277 "vim_accounts", {"name": content["vim_account"]}
278 )
279 item["vim_account"] = vim_account_details["_id"]
280 item["nets"] = {"k8s_net1": None}
281 item["deployment_methods"] = {"juju-bundle": False, "helm-chart-v3": True}
282 # item["description"] = content["description"]
283 if "description" in content:
284 item["description"] = content["description"]
285 else:
286 item["description"] = None
287 item["namespace"] = "kube-system"
288 item["osm_acm"] = True
289 item["schema_version"] = "1.11"
290 self.format_on_new(item, session["project_id"], make_public=session["public"])
291 _id = self.db.create("k8sclusters", item)
292 self.logger.info(f"_id is : {_id}")
293 item_1 = self.db.get_one("k8sclusters", {"name": item["name"]})
294 item_1["_admin"]["operationalState"] = "PROCESSING"
295
296 # Create operation data
297 now = time()
298 operation_data = {
299 "lcmOperationType": "create", # Assuming 'create' operation here
300 "operationState": "PROCESSING",
301 "startTime": now,
302 "statusEnteredTime": now,
303 "detailed-status": "",
304 "operationParams": None, # Add parameters as needed
305 }
306 # create operation
307 item_1["_admin"]["operations"] = [operation_data]
308 item_1["_admin"]["current_operation"] = None
309 self.logger.info(f"content is : {item_1}")
310 self.db.set_one("k8sclusters", {"_id": item_1["_id"]}, item_1)
311 return
312
shrinithi28d887f2025-01-08 05:27:19 +0000313 def cluster_unique_name_check(self, session, name):
shrinithi194ced92025-01-29 06:01:31 +0000314 # First check using the method you have for unique name validation
shrinithi28d887f2025-01-08 05:27:19 +0000315 self.check_unique_name(session, name)
316 _filter = {"name": name}
shrinithi194ced92025-01-29 06:01:31 +0000317 topics = [
318 "k8sclusters",
319 "k8sapp",
320 "k8sinfra_config",
321 "k8sinfra_controller",
322 "k8sresource",
323 ]
324
325 # Loop through each topic to check if the name already exists in any of them
326 for item in topics:
327 if self.db.get_one(item, _filter, fail_on_empty=False, fail_on_more=False):
328 raise EngineException(
329 f"name '{name}' already exists in topic '{item}'",
330 HTTPStatus.CONFLICT,
331 )
shrinithi28d887f2025-01-08 05:27:19 +0000332
333 def list_both(self, session, filter_q=None, api_req=False):
334 """List all clusters from both new and old APIs"""
335 if not filter_q:
336 filter_q = {}
337 if self.multiproject:
338 filter_q.update(self._get_project_filter(session))
339 cluster_list1 = self.db.get_list(self.topic, filter_q)
340 cluster_list2 = self.db.get_list("k8sclusters", filter_q)
341 list1_names = {item["name"] for item in cluster_list1}
342 for item in cluster_list2:
343 if item["name"] not in list1_names:
344 # Complete the information for clusters from old API
345 item["state"] = "N/A"
346 old_state = item.get("_admin", {}).get("operationalState", "Unknown")
347 item["bootstrap"] = "NO"
348 item["operatingState"] = "N/A"
349 item["resourceState"] = old_state
350 item["created"] = "NO"
351 cluster_list1.append(item)
352 if api_req:
353 cluster_list1 = [self.sol005_projection(inst) for inst in cluster_list1]
354 return cluster_list1
355
356
357class ProfileTopic(ACMTopic):
358 profile_topic_map = {
359 "k8sapp": "app_profiles",
360 "k8sresource": "resource_profiles",
361 "k8sinfra_controller": "infra_controller_profiles",
362 "k8sinfra_config": "infra_config_profiles",
363 }
364
365 def __init__(self, db, fs, msg, auth):
366 super().__init__(db, fs, msg, auth)
367
yshah00a620a2025-01-16 12:06:40 +0000368 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
369 check = self.db.get_one(self.topic, {"_id": _id})
370 if check["default"] is True:
371 raise EngineException(
372 "Cannot edit default profiles",
373 HTTPStatus.UNPROCESSABLE_ENTITY,
374 )
375 if "name" in indata and check["name"] != indata["name"]:
376 self.check_unique_name(session, indata["name"])
377 return True
378
shrinithi28d887f2025-01-08 05:27:19 +0000379 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
380 op_id = self.common_delete(_id, db_content)
yshah781ce732025-02-28 08:56:13 +0000381 return {"profile_id": _id, "operation_id": op_id, "force": session["force"]}
shrinithi28d887f2025-01-08 05:27:19 +0000382
383 def delete_profile(self, session, _id, dry_run=False, not_send_msg=None):
384 item_content = self.db.get_one(self.topic, {"_id": _id})
385 if item_content.get("default", False):
386 raise EngineException(
387 "Cannot delete item because it is marked as default",
388 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
389 )
390 # Before deleting, detach the profile from the associated clusters.
391 profile_type = self.profile_topic_map[self.topic]
392 self.detach(session, _id, profile_type)
393 # To delete the infra controller profile
394 super().delete(session, _id, not_send_msg=not_send_msg)
395 return _id