blob: 27273f9d3b3d312b669204c81e9638e21c5fdf26 [file] [log] [blame]
shrinithi28d887f2025-01-08 05:27:19 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from pyrage import x25519
17from uuid import uuid4
18
19from http import HTTPStatus
20from time import time
21
22# from osm_common.dbbase import deep_update_rfc7396, DbException
23from osm_common.msgbase import MsgException
24from osm_common.dbbase import DbException
25from osm_common.fsbase import FsException
26from osm_nbi.base_topic import BaseTopic, EngineException
27from osm_nbi.validation import ValidationError
28
29# import logging
30# import random
31# import string
32# from yaml import safe_load, YAMLError
33
34
35class ACMOperationTopic:
36 def __init__(self, db, fs, msg, auth):
37 self.multiproject = None # Declare the attribute here
38
39 @staticmethod
40 def format_on_operation(content, operation_type, operation_params=None):
41 op_id = str(uuid4())
42 now = time()
43 if "operationHistory" not in content:
44 content["operationHistory"] = []
45
46 operation = {}
47 operation["operationType"] = operation_type
48 operation["op_id"] = op_id
49 operation["result"] = None
50 operation["creationDate"] = now
51 operation["endDate"] = None
52 operation["workflowState"] = operation["resourceState"] = operation[
53 "operationState"
54 ] = operation["gitOperationInfo"] = None
55 operation["operationParams"] = operation_params
56
57 content["operationHistory"].append(operation)
58 return op_id
59
60
61class ACMTopic(BaseTopic, ACMOperationTopic):
62 def __init__(self, db, fs, msg, auth):
63 super().__init__(db, fs, msg, auth)
64 # ACMOperationTopic.__init__(db, fs, msg, auth)
65
66 def new_profile(self, rollback, session, indata=None, kwargs=None, headers=None):
67 step = "name unique check"
68 try:
69 self.check_unique_name(session, indata["name"])
70
71 step = "validating input parameters"
72 profile_request = self._remove_envelop(indata)
73 self._update_input_with_kwargs(profile_request, kwargs)
74 profile_request = self._validate_input_new(
75 profile_request, session["force"]
76 )
77 operation_params = profile_request
78
79 step = "filling profile details from input data"
80 profile_create = self._create_profile(profile_request, session)
81
82 step = "creating profile at database"
83 self.format_on_new(
84 profile_create, session["project_id"], make_public=session["public"]
85 )
86 profile_create["current_operation"] = None
87 op_id = ACMOperationTopic.format_on_operation(
88 profile_create,
89 "create",
90 operation_params,
91 )
92
93 _id = self.db.create(self.topic, profile_create)
94 pubkey, privkey = self._generate_age_key()
95 profile_create["age_pubkey"] = self.db.encrypt(
96 pubkey, schema_version="1.11", salt=_id
97 )
98 profile_create["age_privkey"] = self.db.encrypt(
99 privkey, schema_version="1.11", salt=_id
100 )
101 rollback.append({"topic": self.topic, "_id": _id})
102 self.db.set_one(self.topic, {"_id": _id}, profile_create)
103 if op_id:
104 profile_create["op_id"] = op_id
105 self._send_msg("profile_create", {"profile_id": _id, "operation_id": op_id})
106
107 return _id, None
108 except (
109 ValidationError,
110 EngineException,
111 DbException,
112 MsgException,
113 FsException,
114 ) as e:
115 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
116
117 def _create_profile(self, profile_request, session):
118 profile_desc = {
119 "name": profile_request["name"],
120 "description": profile_request["description"],
121 "default": False,
122 "git_name": self.create_gitname(profile_request, session),
123 "state": "IN_CREATION",
124 "operatingState": "IN_PROGRESS",
125 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
126 }
127 return profile_desc
128
129 def default_profile(
130 self, rollback, session, indata=None, kwargs=None, headers=None
131 ):
132 step = "validating input parameters"
133 try:
134 profile_request = self._remove_envelop(indata)
135 self._update_input_with_kwargs(profile_request, kwargs)
136 operation_params = profile_request
137
138 step = "filling profile details from input data"
139 profile_create = self._create_default_profile(profile_request, session)
140
141 step = "creating profile at database"
142 self.format_on_new(
143 profile_create, session["project_id"], make_public=session["public"]
144 )
145 profile_create["current_operation"] = None
146 ACMOperationTopic.format_on_operation(
147 profile_create,
148 "create",
149 operation_params,
150 )
151 _id = self.db.create(self.topic, profile_create)
152 rollback.append({"topic": self.topic, "_id": _id})
153 return _id
154 except (
155 ValidationError,
156 EngineException,
157 DbException,
158 MsgException,
159 FsException,
160 ) as e:
161 raise type(e)("{} while '{}'".format(e, step), http_code=e.http_code)
162
163 def _create_default_profile(self, profile_request, session):
164 profile_desc = {
165 "name": profile_request["name"],
166 "description": f"{self.topic} profile for cluster {profile_request['name']}",
167 "default": True,
168 "git_name": self.create_gitname(profile_request, session),
169 "state": "IN_CREATION",
170 "operatingState": "IN_PROGRESS",
171 "resourceState": "IN_PROGRESS.REQUEST_RECEIVED",
172 }
173 return profile_desc
174
175 def detach(self, session, _id, profile_type):
176 # To detach the profiles from every cluster
177 filter_q = {}
178 existing_clusters = self.db.get_list("clusters", filter_q)
179 existing_clusters_profiles = [
180 profile["_id"]
181 for profile in existing_clusters
182 if profile.get("profile_type", _id)
183 ]
184 update_dict = None
185 for profile in existing_clusters_profiles:
186 filter_q = {"_id": profile}
187 data = self.db.get_one("clusters", filter_q)
188 if profile_type in data:
189 profile_ids = data[profile_type]
190 if _id in profile_ids:
191 profile_ids.remove(_id)
192 update_dict = {profile_type: profile_ids}
193 self.db.set_one("clusters", filter_q, update_dict)
194
195 def _generate_age_key(self):
196 ident = x25519.Identity.generate()
197 # gets the public key
198 pubkey = str(ident.to_public())
199 # gets the private key
200 privkey = str(ident)
201 # return both public and private key
202 return pubkey, privkey
203
204 def common_delete(self, _id, db_content):
205 if "state" in db_content:
206 db_content["state"] = "IN_DELETION"
207 db_content["operatingState"] = "PROCESSING"
208 # self.db.set_one(self.topic, {"_id": _id}, db_content)
209
210 db_content["current_operation"] = None
211 op_id = ACMOperationTopic.format_on_operation(
212 db_content,
213 "delete",
214 None,
215 )
216 self.db.set_one(self.topic, {"_id": _id}, db_content)
217 return op_id
218
219 def add_to_old_collection(self, content, session):
220 item = {}
221 item["name"] = content["name"]
222 item["credentials"] = {}
223 # item["k8s_version"] = content["k8s_version"]
224 if "k8s_version" in content:
225 item["k8s_version"] = content["k8s_version"]
226 else:
227 item["k8s_version"] = None
228 vim_account_details = self.db.get_one(
229 "vim_accounts", {"name": content["vim_account"]}
230 )
231 item["vim_account"] = vim_account_details["_id"]
232 item["nets"] = {"k8s_net1": None}
233 item["deployment_methods"] = {"juju-bundle": False, "helm-chart-v3": True}
234 # item["description"] = content["description"]
235 if "description" in content:
236 item["description"] = content["description"]
237 else:
238 item["description"] = None
239 item["namespace"] = "kube-system"
240 item["osm_acm"] = True
241 item["schema_version"] = "1.11"
242 self.format_on_new(item, session["project_id"], make_public=session["public"])
243 _id = self.db.create("k8sclusters", item)
244 self.logger.info(f"_id is : {_id}")
245 item_1 = self.db.get_one("k8sclusters", {"name": item["name"]})
246 item_1["_admin"]["operationalState"] = "PROCESSING"
247
248 # Create operation data
249 now = time()
250 operation_data = {
251 "lcmOperationType": "create", # Assuming 'create' operation here
252 "operationState": "PROCESSING",
253 "startTime": now,
254 "statusEnteredTime": now,
255 "detailed-status": "",
256 "operationParams": None, # Add parameters as needed
257 }
258 # create operation
259 item_1["_admin"]["operations"] = [operation_data]
260 item_1["_admin"]["current_operation"] = None
261 self.logger.info(f"content is : {item_1}")
262 self.db.set_one("k8sclusters", {"_id": item_1["_id"]}, item_1)
263 return
264
shrinithi28d887f2025-01-08 05:27:19 +0000265 def cluster_unique_name_check(self, session, name):
266 self.check_unique_name(session, name)
267 _filter = {"name": name}
268 topic = "k8sclusters"
269 if self.db.get_one(topic, _filter, fail_on_empty=False, fail_on_more=False):
270 raise EngineException(
271 "name '{}' already exists".format(name),
272 HTTPStatus.CONFLICT,
273 )
274
275 def list_both(self, session, filter_q=None, api_req=False):
276 """List all clusters from both new and old APIs"""
277 if not filter_q:
278 filter_q = {}
279 if self.multiproject:
280 filter_q.update(self._get_project_filter(session))
281 cluster_list1 = self.db.get_list(self.topic, filter_q)
282 cluster_list2 = self.db.get_list("k8sclusters", filter_q)
283 list1_names = {item["name"] for item in cluster_list1}
284 for item in cluster_list2:
285 if item["name"] not in list1_names:
286 # Complete the information for clusters from old API
287 item["state"] = "N/A"
288 old_state = item.get("_admin", {}).get("operationalState", "Unknown")
289 item["bootstrap"] = "NO"
290 item["operatingState"] = "N/A"
291 item["resourceState"] = old_state
292 item["created"] = "NO"
293 cluster_list1.append(item)
294 if api_req:
295 cluster_list1 = [self.sol005_projection(inst) for inst in cluster_list1]
296 return cluster_list1
297
298
299class ProfileTopic(ACMTopic):
300 profile_topic_map = {
301 "k8sapp": "app_profiles",
302 "k8sresource": "resource_profiles",
303 "k8sinfra_controller": "infra_controller_profiles",
304 "k8sinfra_config": "infra_config_profiles",
305 }
306
307 def __init__(self, db, fs, msg, auth):
308 super().__init__(db, fs, msg, auth)
309
yshah00a620a2025-01-16 12:06:40 +0000310 def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
311 check = self.db.get_one(self.topic, {"_id": _id})
312 if check["default"] is True:
313 raise EngineException(
314 "Cannot edit default profiles",
315 HTTPStatus.UNPROCESSABLE_ENTITY,
316 )
317 if "name" in indata and check["name"] != indata["name"]:
318 self.check_unique_name(session, indata["name"])
319 return True
320
shrinithi28d887f2025-01-08 05:27:19 +0000321 def delete_extra_before(self, session, _id, db_content, not_send_msg=None):
322 op_id = self.common_delete(_id, db_content)
323 return {"profile_id": _id, "operation_id": op_id}
324
325 def delete_profile(self, session, _id, dry_run=False, not_send_msg=None):
326 item_content = self.db.get_one(self.topic, {"_id": _id})
327 if item_content.get("default", False):
328 raise EngineException(
329 "Cannot delete item because it is marked as default",
330 http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
331 )
332 # Before deleting, detach the profile from the associated clusters.
333 profile_type = self.profile_topic_map[self.topic]
334 self.detach(session, _id, profile_type)
335 # To delete the infra controller profile
336 super().delete(session, _id, not_send_msg=not_send_msg)
337 return _id