Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 from typing import Union
17 import os
18 import uuid
19 import yaml
20 import tempfile
21 import binascii
22
23 from n2vc.config import EnvironConfig
24 from n2vc.definitions import RelationEndpoint
25 from n2vc.exceptions import K8sException
26 from n2vc.k8s_conn import K8sConnector
27 from n2vc.kubectl import Kubectl
28 from .exceptions import MethodNotImplemented
29 from n2vc.libjuju import Libjuju
30 from n2vc.utils import obj_to_dict, obj_to_yaml
31 from n2vc.store import MotorStore
32 from n2vc.vca.cloud import Cloud
33 from n2vc.vca.connection import get_connection
34
35
36 RBAC_LABEL_KEY_NAME = "rbac-id"
37 RBAC_STACK_PREFIX = "juju-credential"
38
39
40 def generate_rbac_id():
41 return binascii.hexlify(os.urandom(4)).decode()
42
43
44 class K8sJujuConnector(K8sConnector):
45 libjuju = None
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 juju_command: str = "/usr/bin/juju",
53 log: object = None,
54 loop: object = None,
55 on_update_db=None,
56 ):
57 """
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param: loop: Asyncio loop
64 """
65
66 # parent class
67 K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db)
68
69 self.fs = fs
70 self.loop = loop or asyncio.get_event_loop()
71 self.log.debug("Initializing K8S Juju connector")
72
73 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
74 self._store = MotorStore(db_uri)
75 self.loading_libjuju = asyncio.Lock(loop=self.loop)
76 self.uninstall_locks = {}
77
78 self.log.debug("K8S Juju connector initialized")
79 # TODO: Remove these commented lines:
80 # self.authenticated = False
81 # self.models = {}
82 # self.juju_secret = ""
83
84 """Initialization"""
85
86 async def init_env(
87 self,
88 k8s_creds: str,
89 namespace: str = "kube-system",
90 reuse_cluster_uuid: str = None,
91 **kwargs,
92 ) -> (str, bool):
93 """
94 It prepares a given K8s cluster environment to run Juju bundles.
95
96 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
97 '.kube/config'
98 :param namespace: optional namespace to be used for juju. By default,
99 'kube-system' will be used
100 :param reuse_cluster_uuid: existing cluster uuid for reuse
101 :param: kwargs: Additional parameters
102 vca_id (str): VCA ID
103
104 :return: uuid of the K8s cluster and True if connector has installed some
105 software in the cluster
106 (on error, an exception will be raised)
107 """
108 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
109
110 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
111 kubectl = self._get_kubectl(k8s_creds)
112
113 # CREATING RESOURCES IN K8S
114 rbac_id = generate_rbac_id()
115 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
116 labels = {RBAC_STACK_PREFIX: rbac_id}
117
118 # Create cleanup dictionary to clean up created resources
119 # if it fails in the middle of the process
120 cleanup_data = []
121 try:
122 self.log.debug("Initializing K8s cluster for juju")
123 kubectl.create_cluster_role(name=metadata_name, labels=labels)
124 self.log.debug("Cluster role created")
125 cleanup_data.append(
126 {"delete": kubectl.delete_cluster_role, "args": (metadata_name,)}
127 )
128
129 kubectl.create_service_account(name=metadata_name, labels=labels)
130 self.log.debug("Service account created")
131 cleanup_data.append(
132 {"delete": kubectl.delete_service_account, "args": (metadata_name,)}
133 )
134
135 kubectl.create_cluster_role_binding(name=metadata_name, labels=labels)
136 self.log.debug("Role binding created")
137 cleanup_data.append(
138 {
139 "delete": kubectl.delete_cluster_role_binding,
140 "args": (metadata_name,),
141 }
142 )
143 token, client_cert_data = await kubectl.get_secret_data(metadata_name)
144
145 default_storage_class = kubectl.get_default_storage_class()
146 self.log.debug("Default storage class: {}".format(default_storage_class))
147 await libjuju.add_k8s(
148 name=cluster_uuid,
149 rbac_id=rbac_id,
150 token=token,
151 client_cert_data=client_cert_data,
152 configuration=kubectl.configuration,
153 storage_class=default_storage_class,
154 credential_name=self._get_credential_name(cluster_uuid),
155 )
156 self.log.debug("K8s cluster added to juju controller")
157 return cluster_uuid, True
158 except Exception as e:
159 self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
160 if len(cleanup_data) > 0:
161 self.log.debug("Cleaning up created resources in k8s cluster...")
162 for item in cleanup_data:
163 delete_function = item["delete"]
164 delete_args = item["args"]
165 delete_function(*delete_args)
166 self.log.debug("Cleanup finished")
167 raise e
168
169 """Repo Management"""
170
171 async def repo_add(
172 self,
173 name: str,
174 url: str,
175 _type: str = "charm",
176 cert: str = None,
177 user: str = None,
178 password: str = None,
179 ):
180 raise MethodNotImplemented()
181
182 async def repo_list(self):
183 raise MethodNotImplemented()
184
185 async def repo_remove(self, name: str):
186 raise MethodNotImplemented()
187
188 async def synchronize_repos(self, cluster_uuid: str, name: str):
189 """
190 Returns None as currently add_repo is not implemented
191 """
192 return None
193
194 """Reset"""
195
196 async def reset(
197 self,
198 cluster_uuid: str,
199 force: bool = False,
200 uninstall_sw: bool = False,
201 **kwargs,
202 ) -> bool:
203 """Reset a cluster
204
205 Resets the Kubernetes cluster by removing the model that represents it.
206
207 :param cluster_uuid str: The UUID of the cluster to reset
208 :param force: Force reset
209 :param uninstall_sw: Boolean to uninstall sw
210 :param: kwargs: Additional parameters
211 vca_id (str): VCA ID
212
213 :return: Returns True if successful or raises an exception.
214 """
215
216 try:
217 self.log.debug("[reset] Removing k8s cloud")
218 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
219
220 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
221
222 cloud_creds = await libjuju.get_cloud_credentials(cloud)
223
224 await libjuju.remove_cloud(cluster_uuid)
225
226 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
227
228 kubectl = self._get_kubectl(credentials)
229
230 delete_functions = [
231 kubectl.delete_cluster_role_binding,
232 kubectl.delete_service_account,
233 kubectl.delete_cluster_role,
234 ]
235
236 credential_attrs = cloud_creds[0].result["attrs"]
237 if RBAC_LABEL_KEY_NAME in credential_attrs:
238 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
239 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
240 for delete_func in delete_functions:
241 try:
242 delete_func(metadata_name)
243 except Exception as e:
244 self.log.warning("Cannot remove resource in K8s {}".format(e))
245
246 except Exception as e:
247 self.log.debug("Caught exception during reset: {}".format(e))
248 raise e
249 return True
250
251 """Deployment"""
252
253 async def install(
254 self,
255 cluster_uuid: str,
256 kdu_model: str,
257 kdu_instance: str,
258 atomic: bool = True,
259 timeout: float = 1800,
260 params: dict = None,
261 db_dict: dict = None,
262 kdu_name: str = None,
263 namespace: str = None,
264 **kwargs,
265 ) -> bool:
266 """Install a bundle
267
268 :param cluster_uuid str: The UUID of the cluster to install to
269 :param kdu_model str: The name or path of a bundle to install
270 :param kdu_instance: Kdu instance name
271 :param atomic bool: If set, waits until the model is active and resets
272 the cluster on failure.
273 :param timeout int: The time, in seconds, to wait for the install
274 to finish
275 :param params dict: Key-value pairs of instantiation parameters
276 :param kdu_name: Name of the KDU instance to be installed
277 :param namespace: K8s namespace to use for the KDU instance
278 :param kwargs: Additional parameters
279 vca_id (str): VCA ID
280
281 :return: If successful, returns ?
282 """
283 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
284 bundle = kdu_model
285
286 if not db_dict:
287 raise K8sException("db_dict must be set")
288 if not bundle:
289 raise K8sException("bundle must be set")
290
291 if bundle.startswith("cs:"):
292 # For Juju Bundles provided by the Charm Store
293 pass
294 elif bundle.startswith("ch:"):
295 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
296 pass
297 elif bundle.startswith("http"):
298 # Download the file
299 pass
300 else:
301 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
302 os.chdir(new_workdir)
303 bundle = "local:{}".format(kdu_model)
304
305 # default namespace to kdu_instance
306 if not namespace:
307 namespace = kdu_instance
308
309 self.log.debug("Checking for model named {}".format(namespace))
310
311 # Create the new model
312 self.log.debug("Adding model: {}".format(namespace))
313 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
314 await libjuju.add_model(namespace, cloud)
315
316 # if model:
317 # TODO: Instantiation parameters
318
319 """
320 "Juju bundle that models the KDU, in any of the following ways:
321 - <juju-repo>/<juju-bundle>
322 - <juju-bundle folder under k8s_models folder in the package>
323 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
324 in the package>
325 - <URL_where_to_fetch_juju_bundle>
326 """
327 try:
328 previous_workdir = os.getcwd()
329 except FileNotFoundError:
330 previous_workdir = "/app/storage"
331
332 self.log.debug("[install] deploying {}".format(bundle))
333 await libjuju.deploy(bundle, model_name=namespace, wait=atomic, timeout=timeout)
334 os.chdir(previous_workdir)
335
336 # update information in the database (first, the VCA status, and then, the namespace)
337 if self.on_update_db:
338 await self.on_update_db(
339 cluster_uuid,
340 kdu_instance,
341 filter=db_dict["filter"],
342 vca_id=kwargs.get("vca_id"),
343 )
344
345 self.db.set_one(
346 table="nsrs",
347 q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance},
348 update_dict={"_admin.deployed.K8s.$.namespace": namespace},
349 )
350
351 return True
352
353 async def scale(
354 self,
355 kdu_instance: str,
356 scale: int,
357 resource_name: str,
358 total_timeout: float = 1800,
359 namespace: str = None,
360 **kwargs,
361 ) -> bool:
362 """Scale an application in a model
363
364 :param: kdu_instance str: KDU instance name
365 :param: scale int: Scale to which to set the application
366 :param: resource_name str: The application name in the Juju Bundle
367 :param: timeout float: The time, in seconds, to wait for the install
368 to finish
369 :param namespace str: The namespace (model) where the Bundle was deployed
370 :param kwargs: Additional parameters
371 vca_id (str): VCA ID
372
373 :return: If successful, returns True
374 """
375
376 model_name = self._obtain_namespace(
377 kdu_instance=kdu_instance, namespace=namespace
378 )
379 try:
380 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
381 await libjuju.scale_application(
382 model_name=model_name,
383 application_name=resource_name,
384 scale=scale,
385 total_timeout=total_timeout,
386 )
387 except Exception as e:
388 error_msg = "Error scaling application {} of the model {} of the kdu instance {}: {}".format(
389 resource_name, model_name, kdu_instance, e
390 )
391 self.log.error(error_msg)
392 raise K8sException(message=error_msg)
393 return True
394
395 async def get_scale_count(
396 self, resource_name: str, kdu_instance: str, namespace: str = None, **kwargs
397 ) -> int:
398 """Get an application scale count
399
400 :param: resource_name str: The application name in the Juju Bundle
401 :param: kdu_instance str: KDU instance name
402 :param namespace str: The namespace (model) where the Bundle was deployed
403 :param kwargs: Additional parameters
404 vca_id (str): VCA ID
405 :return: Return application instance count
406 """
407
408 model_name = self._obtain_namespace(
409 kdu_instance=kdu_instance, namespace=namespace
410 )
411 try:
412 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
413 status = await libjuju.get_model_status(model_name=model_name)
414 return len(status.applications[resource_name].units)
415 except Exception as e:
416 error_msg = (
417 f"Error getting scale count from application {resource_name} of the model {model_name} of "
418 f"the kdu instance {kdu_instance}: {e}"
419 )
420 self.log.error(error_msg)
421 raise K8sException(message=error_msg)
422
423 async def instances_list(self, cluster_uuid: str) -> list:
424 """
425 returns a list of deployed releases in a cluster
426
427 :param cluster_uuid: the cluster
428 :return:
429 """
430 return []
431
432 async def upgrade(
433 self,
434 cluster_uuid: str,
435 kdu_instance: str,
436 kdu_model: str = None,
437 params: dict = None,
438 ) -> str:
439 """Upgrade a model
440
441 :param cluster_uuid str: The UUID of the cluster to upgrade
442 :param kdu_instance str: The unique name of the KDU instance
443 :param kdu_model str: The name or path of the bundle to upgrade to
444 :param params dict: Key-value pairs of instantiation parameters
445
446 :return: If successful, reference to the new revision number of the
447 KDU instance.
448 """
449
450 # TODO: Loop through the bundle and upgrade each charm individually
451
452 """
453 The API doesn't have a concept of bundle upgrades, because there are
454 many possible changes: charm revision, disk, number of units, etc.
455
456 As such, we are only supporting a limited subset of upgrades. We'll
457 upgrade the charm revision but leave storage and scale untouched.
458
459 Scale changes should happen through OSM constructs, and changes to
460 storage would require a redeployment of the service, at least in this
461 initial release.
462 """
463 raise MethodNotImplemented()
464
465 """Rollback"""
466
467 async def rollback(
468 self, cluster_uuid: str, kdu_instance: str, revision: int = 0
469 ) -> str:
470 """Rollback a model
471
472 :param cluster_uuid str: The UUID of the cluster to rollback
473 :param kdu_instance str: The unique name of the KDU instance
474 :param revision int: The revision to revert to. If omitted, rolls back
475 the previous upgrade.
476
477 :return: If successful, returns the revision of active KDU instance,
478 or raises an exception
479 """
480 raise MethodNotImplemented()
481
482 """Deletion"""
483
484 async def uninstall(
485 self, cluster_uuid: str, kdu_instance: str, namespace: str = None, **kwargs
486 ) -> bool:
487 """Uninstall a KDU instance
488
489 :param cluster_uuid str: The UUID of the cluster
490 :param kdu_instance str: The unique name of the KDU instance
491 :param namespace str: The namespace (model) where the Bundle was deployed
492 :param kwargs: Additional parameters
493 vca_id (str): VCA ID
494
495 :return: Returns True if successful, or raises an exception
496 """
497 model_name = self._obtain_namespace(
498 kdu_instance=kdu_instance, namespace=namespace
499 )
500
501 self.log.debug(f"[uninstall] Destroying model: {model_name}")
502
503 will_not_delete = False
504 if model_name not in self.uninstall_locks:
505 self.uninstall_locks[model_name] = asyncio.Lock(loop=self.loop)
506 delete_lock = self.uninstall_locks[model_name]
507
508 while delete_lock.locked():
509 will_not_delete = True
510 await asyncio.sleep(0.1)
511
512 if will_not_delete:
513 self.log.info("Model {} deleted by another worker.".format(model_name))
514 return True
515
516 try:
517 async with delete_lock:
518 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
519
520 await libjuju.destroy_model(model_name, total_timeout=3600)
521 finally:
522 self.uninstall_locks.pop(model_name)
523
524 self.log.debug(f"[uninstall] Model {model_name} destroyed")
525 return True
526
527 async def upgrade_charm(
528 self,
529 ee_id: str = None,
530 path: str = None,
531 charm_id: str = None,
532 charm_type: str = None,
533 timeout: float = None,
534 ) -> str:
535 """This method upgrade charms in VNFs
536
537 Args:
538 ee_id: Execution environment id
539 path: Local path to the charm
540 charm_id: charm-id
541 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
542 timeout: (Float) Timeout for the ns update operation
543
544 Returns:
545 The output of the update operation if status equals to "completed"
546 """
547 raise K8sException(
548 "KDUs deployed with Juju Bundle do not support charm upgrade"
549 )
550
551 async def exec_primitive(
552 self,
553 cluster_uuid: str = None,
554 kdu_instance: str = None,
555 primitive_name: str = None,
556 timeout: float = 300,
557 params: dict = None,
558 db_dict: dict = None,
559 namespace: str = None,
560 **kwargs,
561 ) -> str:
562 """Exec primitive (Juju action)
563
564 :param cluster_uuid str: The UUID of the cluster
565 :param kdu_instance str: The unique name of the KDU instance
566 :param primitive_name: Name of action that will be executed
567 :param timeout: Timeout for action execution
568 :param params: Dictionary of all the parameters needed for the action
569 :param db_dict: Dictionary for any additional data
570 :param namespace str: The namespace (model) where the Bundle was deployed
571 :param kwargs: Additional parameters
572 vca_id (str): VCA ID
573
574 :return: Returns the output of the action
575 """
576 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
577
578 namespace = self._obtain_namespace(
579 kdu_instance=kdu_instance, namespace=namespace
580 )
581
582 if not params or "application-name" not in params:
583 raise K8sException(
584 "Missing application-name argument, \
585 argument needed for K8s actions"
586 )
587 try:
588 self.log.debug(
589 "[exec_primitive] Getting model "
590 "{} for the kdu_instance: {}".format(namespace, kdu_instance)
591 )
592 application_name = params["application-name"]
593 actions = await libjuju.get_actions(
594 application_name=application_name, model_name=namespace
595 )
596 if primitive_name not in actions:
597 raise K8sException("Primitive {} not found".format(primitive_name))
598 output, status = await libjuju.execute_action(
599 application_name=application_name,
600 model_name=namespace,
601 action_name=primitive_name,
602 **params,
603 )
604
605 if status != "completed":
606 raise K8sException(
607 "status is not completed: {} output: {}".format(status, output)
608 )
609 if self.on_update_db:
610 await self.on_update_db(
611 cluster_uuid=cluster_uuid,
612 kdu_instance=kdu_instance,
613 filter=db_dict["filter"],
614 )
615
616 return output
617
618 except Exception as e:
619 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
620 self.log.error(error_msg)
621 raise K8sException(message=error_msg)
622
623 """Introspection"""
624
625 async def inspect_kdu(self, kdu_model: str) -> dict:
626 """Inspect a KDU
627
628 Inspects a bundle and returns a dictionary of config parameters and
629 their default values.
630
631 :param kdu_model str: The name or path of the bundle to inspect.
632
633 :return: If successful, returns a dictionary of available parameters
634 and their default values.
635 """
636
637 kdu = {}
638 if not os.path.exists(kdu_model):
639 raise K8sException("file {} not found".format(kdu_model))
640
641 with open(kdu_model, "r") as f:
642 bundle = yaml.safe_load(f.read())
643
644 """
645 {
646 'description': 'Test bundle',
647 'bundle': 'kubernetes',
648 'applications': {
649 'mariadb-k8s': {
650 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
651 'scale': 1,
652 'options': {
653 'password': 'manopw',
654 'root_password': 'osm4u',
655 'user': 'mano'
656 },
657 'series': 'kubernetes'
658 }
659 }
660 }
661 """
662 # TODO: This should be returned in an agreed-upon format
663 kdu = bundle["applications"]
664
665 return kdu
666
667 async def help_kdu(self, kdu_model: str) -> str:
668 """View the README
669
670 If available, returns the README of the bundle.
671
672 :param kdu_model str: The name or path of a bundle
673 f
674 :return: If found, returns the contents of the README.
675 """
676 readme = None
677
678 files = ["README", "README.txt", "README.md"]
679 path = os.path.dirname(kdu_model)
680 for file in os.listdir(path):
681 if file in files:
682 with open(file, "r") as f:
683 readme = f.read()
684 break
685
686 return readme
687
688 async def status_kdu(
689 self,
690 cluster_uuid: str,
691 kdu_instance: str,
692 complete_status: bool = False,
693 yaml_format: bool = False,
694 namespace: str = None,
695 **kwargs,
696 ) -> Union[str, dict]:
697 """Get the status of the KDU
698
699 Get the current status of the KDU instance.
700
701 :param cluster_uuid str: The UUID of the cluster
702 :param kdu_instance str: The unique id of the KDU instance
703 :param complete_status: To get the complete_status of the KDU
704 :param yaml_format: To get the status in proper format for NSR record
705 :param namespace str: The namespace (model) where the Bundle was deployed
706 :param: kwargs: Additional parameters
707 vca_id (str): VCA ID
708
709 :return: Returns a dictionary containing namespace, state, resources,
710 and deployment_time and returns complete_status if complete_status is True
711 """
712 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
713 status = {}
714
715 model_name = self._obtain_namespace(
716 kdu_instance=kdu_instance, namespace=namespace
717 )
718 model_status = await libjuju.get_model_status(model_name=model_name)
719
720 if not complete_status:
721 for name in model_status.applications:
722 application = model_status.applications[name]
723 status[name] = {"status": application["status"]["status"]}
724 else:
725 if yaml_format:
726 return obj_to_yaml(model_status)
727 else:
728 return obj_to_dict(model_status)
729
730 return status
731
732 async def add_relation(
733 self, provider: RelationEndpoint, requirer: RelationEndpoint
734 ):
735 """
736 Add relation between two charmed endpoints
737
738 :param: provider: Provider relation endpoint
739 :param: requirer: Requirer relation endpoint
740 """
741 self.log.debug(f"adding new relation between {provider} and {requirer}")
742 cross_model_relation = (
743 provider.model_name != requirer.model_name
744 or provider.vca_id != requirer.vca_id
745 )
746 try:
747 if cross_model_relation:
748 # Cross-model relation
749 provider_libjuju = await self._get_libjuju(provider.vca_id)
750 requirer_libjuju = await self._get_libjuju(requirer.vca_id)
751 offer = await provider_libjuju.offer(provider)
752 if offer:
753 saas_name = await requirer_libjuju.consume(
754 requirer.model_name, offer, provider_libjuju
755 )
756 await requirer_libjuju.add_relation(
757 requirer.model_name, requirer.endpoint, saas_name
758 )
759 else:
760 # Standard relation
761 vca_id = provider.vca_id
762 model = provider.model_name
763 libjuju = await self._get_libjuju(vca_id)
764 # add juju relations between two applications
765 await libjuju.add_relation(
766 model_name=model,
767 endpoint_1=provider.endpoint,
768 endpoint_2=requirer.endpoint,
769 )
770 except Exception as e:
771 message = f"Error adding relation between {provider} and {requirer}: {e}"
772 self.log.error(message)
773 raise Exception(message=message)
774
775 async def update_vca_status(
776 self, vcastatus: dict, kdu_instance: str, namespace: str = None, **kwargs
777 ):
778 """
779 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
780
781 :param vcastatus dict: dict containing vcastatus
782 :param kdu_instance str: The unique id of the KDU instance
783 :param namespace str: The namespace (model) where the Bundle was deployed
784 :param: kwargs: Additional parameters
785 vca_id (str): VCA ID
786
787 :return: None
788 """
789
790 model_name = self._obtain_namespace(
791 kdu_instance=kdu_instance, namespace=namespace
792 )
793
794 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
795 try:
796 for vca_model_name in vcastatus:
797 # Adding executed actions
798 vcastatus[vca_model_name][
799 "executedActions"
800 ] = await libjuju.get_executed_actions(model_name=model_name)
801
802 for application in vcastatus[vca_model_name]["applications"]:
803 # Adding application actions
804 vcastatus[vca_model_name]["applications"][application][
805 "actions"
806 ] = {}
807 # Adding application configs
808 vcastatus[vca_model_name]["applications"][application][
809 "configs"
810 ] = await libjuju.get_application_configs(
811 model_name=model_name, application_name=application
812 )
813
814 except Exception as e:
815 self.log.debug("Error in updating vca status: {}".format(str(e)))
816
817 async def get_services(
818 self, cluster_uuid: str, kdu_instance: str, namespace: str
819 ) -> list:
820 """Return a list of services of a kdu_instance"""
821
822 namespace = self._obtain_namespace(
823 kdu_instance=kdu_instance, namespace=namespace
824 )
825
826 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
827 kubectl = self._get_kubectl(credentials)
828 return kubectl.get_services(
829 field_selector="metadata.namespace={}".format(namespace)
830 )
831
832 async def get_service(
833 self, cluster_uuid: str, service_name: str, namespace: str
834 ) -> object:
835 """Return data for a specific service inside a namespace"""
836
837 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
838 kubectl = self._get_kubectl(credentials)
839 return kubectl.get_services(
840 field_selector="metadata.name={},metadata.namespace={}".format(
841 service_name, namespace
842 )
843 )[0]
844
845 def get_credentials(self, cluster_uuid: str) -> str:
846 """
847 Get Cluster Kubeconfig
848 """
849 k8scluster = self.db.get_one(
850 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
851 )
852
853 self.db.encrypt_decrypt_fields(
854 k8scluster.get("credentials"),
855 "decrypt",
856 ["password", "secret"],
857 schema_version=k8scluster["schema_version"],
858 salt=k8scluster["_id"],
859 )
860
861 return yaml.safe_dump(k8scluster.get("credentials"))
862
863 def _get_credential_name(self, cluster_uuid: str) -> str:
864 """
865 Get credential name for a k8s cloud
866
867 We cannot use the cluster_uuid for the credential name directly,
868 because it cannot start with a number, it must start with a letter.
869 Therefore, the k8s cloud credential name will be "cred-" followed
870 by the cluster uuid.
871
872 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
873
874 :return: Name to use for the credential name.
875 """
876 return "cred-{}".format(cluster_uuid)
877
878 def get_namespace(self, cluster_uuid: str) -> str:
879 """Get the namespace UUID
880 Gets the namespace's unique name
881
882 :param cluster_uuid str: The UUID of the cluster
883 :returns: The namespace UUID, or raises an exception
884 """
885 pass
886
887 @staticmethod
888 def generate_kdu_instance_name(**kwargs):
889 db_dict = kwargs.get("db_dict")
890 kdu_name = kwargs.get("kdu_name", None)
891 if kdu_name:
892 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
893 else:
894 kdu_instance = db_dict["filter"]["_id"]
895 return kdu_instance
896
897 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
898 """
899 Get libjuju object
900
901 :param: vca_id: VCA ID
902 If None, get a libjuju object with a Connection to the default VCA
903 Else, geta libjuju object with a Connection to the specified VCA
904 """
905 if not vca_id:
906 while self.loading_libjuju.locked():
907 await asyncio.sleep(0.1)
908 if not self.libjuju:
909 async with self.loading_libjuju:
910 vca_connection = await get_connection(self._store)
911 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
912 return self.libjuju
913 else:
914 vca_connection = await get_connection(self._store, vca_id)
915 return Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
916
917 def _get_kubectl(self, credentials: str) -> Kubectl:
918 """
919 Get Kubectl object
920
921 :param: kubeconfig_credentials: Kubeconfig credentials
922 """
923 kubecfg = tempfile.NamedTemporaryFile()
924 with open(kubecfg.name, "w") as kubecfg_file:
925 kubecfg_file.write(credentials)
926 return Kubectl(config_file=kubecfg.name)
927
928 def _obtain_namespace(self, kdu_instance: str, namespace: str = None) -> str:
929 """
930 Obtain the namespace/model name to use in the instantiation of a Juju Bundle in K8s. The default namespace is
931 the kdu_instance name. However, if the user passes the namespace where he wants to deploy the bundle,
932 that namespace will be used.
933
934 :param kdu_instance: the default KDU instance name
935 :param namespace: the namespace passed by the User
936 """
937
938 # deault the namespace/model name to the kdu_instance name TODO -> this should be the real return... But
939 # once the namespace is not passed in most methods, I had to do this in another way. But I think this should
940 # be the procedure in the future return namespace if namespace else kdu_instance
941
942 # TODO -> has referred above, this should be avoided in the future, this is temporary, in order to avoid
943 # compatibility issues
944 return (
945 namespace
946 if namespace
947 else self._obtain_namespace_from_db(kdu_instance=kdu_instance)
948 )
949
950 def _obtain_namespace_from_db(self, kdu_instance: str) -> str:
951 db_nsrs = self.db.get_one(
952 table="nsrs", q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance}
953 )
954 for k8s in db_nsrs["_admin"]["deployed"]["K8s"]:
955 if k8s.get("kdu-instance") == kdu_instance:
956 return k8s.get("namespace")
957 return ""