Code Coverage

Cobertura Coverage Report > n2vc >

k8s_juju_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
k8s_juju_conn.py
100%
1/1
88%
284/322
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_juju_conn.py
88%
284/322
N/A

Source

n2vc/k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import asyncio
16 1 from typing import Union
17 1 import os
18 1 import uuid
19 1 import yaml
20 1 import tempfile
21 1 import binascii
22
23 1 from n2vc.config import EnvironConfig
24 1 from n2vc.definitions import RelationEndpoint
25 1 from n2vc.exceptions import K8sException
26 1 from n2vc.k8s_conn import K8sConnector
27 1 from n2vc.kubectl import Kubectl
28 1 from .exceptions import MethodNotImplemented
29 1 from n2vc.libjuju import Libjuju
30 1 from n2vc.utils import obj_to_dict, obj_to_yaml
31 1 from n2vc.store import MotorStore
32 1 from n2vc.vca.cloud import Cloud
33 1 from n2vc.vca.connection import get_connection
34
35
36 1 RBAC_LABEL_KEY_NAME = "rbac-id"
37 1 RBAC_STACK_PREFIX = "juju-credential"
38
39
40 1 def generate_rbac_id():
41 1     return binascii.hexlify(os.urandom(4)).decode()
42
43
44 1 class K8sJujuConnector(K8sConnector):
45 1     libjuju = None
46
47 1     def __init__(
48         self,
49         fs: object,
50         db: object,
51         kubectl_command: str = "/usr/bin/kubectl",
52         juju_command: str = "/usr/bin/juju",
53         log: object = None,
54         on_update_db=None,
55     ):
56         """
57         :param fs: file system for kubernetes and helm configuration
58         :param db: Database object
59         :param kubectl_command: path to kubectl executable
60         :param helm_command: path to helm executable
61         :param log: logger
62         """
63
64         # parent class
65 1         K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db)
66
67 1         self.fs = fs
68 1         self.log.debug("Initializing K8S Juju connector")
69
70 1         db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
71 1         self._store = MotorStore(db_uri)
72 1         self.loading_libjuju = asyncio.Lock()
73 1         self.uninstall_locks = {}
74
75 1         self.log.debug("K8S Juju connector initialized")
76         # TODO: Remove these commented lines:
77         # self.authenticated = False
78         # self.models = {}
79         # self.juju_secret = ""
80
81 1     """Initialization"""
82
83 1     async def init_env(
84         self,
85         k8s_creds: str,
86         namespace: str = "kube-system",
87         reuse_cluster_uuid: str = None,
88         **kwargs,
89     ) -> (str, bool):
90         """
91         It prepares a given K8s cluster environment to run Juju bundles.
92
93         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
94             '.kube/config'
95         :param namespace: optional namespace to be used for juju. By default,
96             'kube-system' will be used
97         :param reuse_cluster_uuid: existing cluster uuid for reuse
98         :param: kwargs: Additional parameters
99             vca_id (str): VCA ID
100
101         :return: uuid of the K8s cluster and True if connector has installed some
102             software in the cluster
103             (on error, an exception will be raised)
104         """
105 1         libjuju = await self._get_libjuju(kwargs.get("vca_id"))
106
107 1         cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
108 1         kubectl = self._get_kubectl(k8s_creds)
109
110         # CREATING RESOURCES IN K8S
111 1         rbac_id = generate_rbac_id()
112 1         metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
113 1         labels = {RBAC_STACK_PREFIX: rbac_id}
114
115         # Create cleanup dictionary to clean up created resources
116         # if it fails in the middle of the process
117 1         cleanup_data = []
118 1         try:
119 1             self.log.debug("Initializing K8s cluster for juju")
120 1             kubectl.create_cluster_role(name=metadata_name, labels=labels)
121 1             self.log.debug("Cluster role created")
122 1             cleanup_data.append(
123                 {"delete": kubectl.delete_cluster_role, "args": (metadata_name,)}
124             )
125
126 1             kubectl.create_service_account(name=metadata_name, labels=labels)
127 1             self.log.debug("Service account created")
128 1             cleanup_data.append(
129                 {"delete": kubectl.delete_service_account, "args": (metadata_name,)}
130             )
131
132 1             kubectl.create_cluster_role_binding(name=metadata_name, labels=labels)
133 1             self.log.debug("Role binding created")
134 1             cleanup_data.append(
135                 {
136                     "delete": kubectl.delete_cluster_role_binding,
137                     "args": (metadata_name,),
138                 }
139             )
140 1             token, client_cert_data = await kubectl.get_secret_data(metadata_name)
141
142 1             default_storage_class = kubectl.get_default_storage_class()
143 1             self.log.debug("Default storage class: {}".format(default_storage_class))
144 1             await libjuju.add_k8s(
145                 name=cluster_uuid,
146                 rbac_id=rbac_id,
147                 token=token,
148                 client_cert_data=client_cert_data,
149                 configuration=kubectl.configuration,
150                 storage_class=default_storage_class,
151                 credential_name=self._get_credential_name(cluster_uuid),
152             )
153 1             self.log.debug("K8s cluster added to juju controller")
154 1             return cluster_uuid, True
155 1         except Exception as e:
156 1             self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
157 1             if len(cleanup_data) > 0:
158 1                 self.log.debug("Cleaning up created resources in k8s cluster...")
159 1                 for item in cleanup_data:
160 1                     delete_function = item["delete"]
161 1                     delete_args = item["args"]
162 1                     delete_function(*delete_args)
163 1                 self.log.debug("Cleanup finished")
164 1             raise e
165
166 1     """Repo Management"""
167
168 1     async def repo_add(
169         self,
170         name: str,
171         url: str,
172         _type: str = "charm",
173         cert: str = None,
174         user: str = None,
175         password: str = None,
176     ):
177 1         raise MethodNotImplemented()
178
179 1     async def repo_list(self):
180 1         raise MethodNotImplemented()
181
182 1     async def repo_remove(self, name: str):
183 1         raise MethodNotImplemented()
184
185 1     async def synchronize_repos(self, cluster_uuid: str, name: str):
186         """
187         Returns None as currently add_repo is not implemented
188         """
189 1         return None
190
191 1     """Reset"""
192
193 1     async def reset(
194         self,
195         cluster_uuid: str,
196         force: bool = False,
197         uninstall_sw: bool = False,
198         **kwargs,
199     ) -> bool:
200         """Reset a cluster
201
202         Resets the Kubernetes cluster by removing the model that represents it.
203
204         :param cluster_uuid str: The UUID of the cluster to reset
205         :param force: Force reset
206         :param uninstall_sw: Boolean to uninstall sw
207         :param: kwargs: Additional parameters
208             vca_id (str): VCA ID
209
210         :return: Returns True if successful or raises an exception.
211         """
212
213 1         try:
214 1             self.log.debug("[reset] Removing k8s cloud")
215 1             libjuju = await self._get_libjuju(kwargs.get("vca_id"))
216
217 1             cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
218
219 1             cloud_creds = await libjuju.get_cloud_credentials(cloud)
220
221 1             await libjuju.remove_cloud(cluster_uuid)
222
223 1             credentials = self.get_credentials(cluster_uuid=cluster_uuid)
224
225 1             kubectl = self._get_kubectl(credentials)
226
227 1             delete_functions = [
228                 kubectl.delete_cluster_role_binding,
229                 kubectl.delete_service_account,
230                 kubectl.delete_cluster_role,
231             ]
232
233 1             credential_attrs = cloud_creds[0].result["attrs"]
234 1             if RBAC_LABEL_KEY_NAME in credential_attrs:
235 1                 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
236 1                 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
237 1                 for delete_func in delete_functions:
238 1                     try:
239 1                         delete_func(metadata_name)
240 0                     except Exception as e:
241 0                         self.log.warning("Cannot remove resource in K8s {}".format(e))
242
243 1         except Exception as e:
244 1             self.log.debug("Caught exception during reset: {}".format(e))
245 1             raise e
246 1         return True
247
248 1     """Deployment"""
249
250 1     async def install(
251         self,
252         cluster_uuid: str,
253         kdu_model: str,
254         kdu_instance: str,
255         atomic: bool = True,
256         timeout: float = 1800,
257         params: dict = None,
258         db_dict: dict = None,
259         kdu_name: str = None,
260         namespace: str = None,
261         **kwargs,
262     ) -> bool:
263         """Install a bundle
264
265         :param cluster_uuid str: The UUID of the cluster to install to
266         :param kdu_model str: The name or path of a bundle to install
267         :param kdu_instance: Kdu instance name
268         :param atomic bool: If set, waits until the model is active and resets
269                             the cluster on failure.
270         :param timeout int: The time, in seconds, to wait for the install
271                             to finish
272         :param params dict: Key-value pairs of instantiation parameters
273         :param kdu_name: Name of the KDU instance to be installed
274         :param namespace: K8s namespace to use for the KDU instance
275         :param kwargs: Additional parameters
276             vca_id (str): VCA ID
277
278         :return: If successful, returns ?
279         """
280 1         libjuju = await self._get_libjuju(kwargs.get("vca_id"))
281 1         bundle = kdu_model
282
283 1         if not db_dict:
284 1             raise K8sException("db_dict must be set")
285 1         if not bundle:
286 1             raise K8sException("bundle must be set")
287
288 1         if bundle.startswith("cs:"):
289             # For Juju Bundles provided by the Charm Store
290 1             pass
291 1         elif bundle.startswith("ch:"):
292             # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
293 0             pass
294 1         elif bundle.startswith("http"):
295             # Download the file
296 1             pass
297         else:
298 1             new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
299 1             os.chdir(new_workdir)
300 1             bundle = "local:{}".format(kdu_model)
301
302         # default namespace to kdu_instance
303 1         if not namespace:
304 1             namespace = kdu_instance
305
306 1         self.log.debug("Checking for model named {}".format(namespace))
307
308         # Create the new model
309 1         self.log.debug("Adding model: {}".format(namespace))
310 1         cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
311 1         await libjuju.add_model(namespace, cloud)
312
313         # if model:
314         # TODO: Instantiation parameters
315
316 1         """
317         "Juju bundle that models the KDU, in any of the following ways:
318             - <juju-repo>/<juju-bundle>
319             - <juju-bundle folder under k8s_models folder in the package>
320             - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
321                 in the package>
322             - <URL_where_to_fetch_juju_bundle>
323         """
324 1         try:
325 1             previous_workdir = os.getcwd()
326 1         except FileNotFoundError:
327 1             previous_workdir = "/app/storage"
328
329 1         self.log.debug("[install] deploying {}".format(bundle))
330 1         instantiation_params = params.get("overlay") if params else None
331 1         await libjuju.deploy(
332             bundle,
333             model_name=namespace,
334             wait=atomic,
335             timeout=timeout,
336             instantiation_params=instantiation_params,
337         )
338 1         os.chdir(previous_workdir)
339
340         # update information in the database (first, the VCA status, and then, the namespace)
341 1         if self.on_update_db:
342 0             await self.on_update_db(
343                 cluster_uuid,
344                 kdu_instance,
345                 filter=db_dict["filter"],
346                 vca_id=kwargs.get("vca_id"),
347             )
348
349 1         self.db.set_one(
350             table="nsrs",
351             q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance},
352             update_dict={"_admin.deployed.K8s.$.namespace": namespace},
353         )
354
355 1         return True
356
357 1     async def scale(
358         self,
359         kdu_instance: str,
360         scale: int,
361         resource_name: str,
362         total_timeout: float = 1800,
363         namespace: str = None,
364         **kwargs,
365     ) -> bool:
366         """Scale an application in a model
367
368         :param: kdu_instance str:        KDU instance name
369         :param: scale int:               Scale to which to set the application
370         :param: resource_name str:       The application name in the Juju Bundle
371         :param: timeout float:           The time, in seconds, to wait for the install
372                                          to finish
373         :param namespace str: The namespace (model) where the Bundle was deployed
374         :param kwargs:                   Additional parameters
375                                             vca_id (str): VCA ID
376
377         :return: If successful, returns True
378         """
379
380 1         model_name = self._obtain_namespace(
381             kdu_instance=kdu_instance, namespace=namespace
382         )
383 1         try:
384 1             libjuju = await self._get_libjuju(kwargs.get("vca_id"))
385 1             await libjuju.scale_application(
386                 model_name=model_name,
387                 application_name=resource_name,
388                 scale=scale,
389                 total_timeout=total_timeout,
390             )
391 1         except Exception as e:
392 1             error_msg = "Error scaling application {} of the model {} of the kdu instance {}: {}".format(
393                 resource_name, model_name, kdu_instance, e
394             )
395 1             self.log.error(error_msg)
396 1             raise K8sException(message=error_msg)
397 1         return True
398
399 1     async def get_scale_count(
400         self, resource_name: str, kdu_instance: str, namespace: str = None, **kwargs
401     ) -> int:
402         """Get an application scale count
403
404         :param: resource_name str:       The application name in the Juju Bundle
405         :param: kdu_instance str:        KDU instance name
406         :param namespace str: The namespace (model) where the Bundle was deployed
407         :param kwargs:                   Additional parameters
408                                             vca_id (str): VCA ID
409         :return: Return application instance count
410         """
411
412 1         model_name = self._obtain_namespace(
413             kdu_instance=kdu_instance, namespace=namespace
414         )
415 1         try:
416 1             libjuju = await self._get_libjuju(kwargs.get("vca_id"))
417 1             status = await libjuju.get_model_status(model_name=model_name)
418 1             return len(status.applications[resource_name].units)
419 0         except Exception as e:
420 0             error_msg = (
421                 f"Error getting scale count from application {resource_name} of the model {model_name} of "
422                 f"the kdu instance {kdu_instance}: {e}"
423             )
424 0             self.log.error(error_msg)
425 0             raise K8sException(message=error_msg)
426
427 1     async def instances_list(self, cluster_uuid: str) -> list:
428         """
429         returns a list of deployed releases in a cluster
430
431         :param cluster_uuid: the cluster
432         :return:
433         """
434 1         return []
435
436 1     async def upgrade(
437         self,
438         cluster_uuid: str,
439         kdu_instance: str,
440         kdu_model: str = None,
441         params: dict = None,
442     ) -> str:
443         """Upgrade a model
444
445         :param cluster_uuid str: The UUID of the cluster to upgrade
446         :param kdu_instance str: The unique name of the KDU instance
447         :param kdu_model str: The name or path of the bundle to upgrade to
448         :param params dict: Key-value pairs of instantiation parameters
449
450         :return: If successful, reference to the new revision number of the
451                  KDU instance.
452         """
453
454         # TODO: Loop through the bundle and upgrade each charm individually
455
456 1         """
457         The API doesn't have a concept of bundle upgrades, because there are
458         many possible changes: charm revision, disk, number of units, etc.
459
460         As such, we are only supporting a limited subset of upgrades. We'll
461         upgrade the charm revision but leave storage and scale untouched.
462
463         Scale changes should happen through OSM constructs, and changes to
464         storage would require a redeployment of the service, at least in this
465         initial release.
466         """
467 1         raise MethodNotImplemented()
468
469 1     """Rollback"""
470
471 1     async def rollback(
472         self, cluster_uuid: str, kdu_instance: str, revision: int = 0
473     ) -> str:
474         """Rollback a model
475
476         :param cluster_uuid str: The UUID of the cluster to rollback
477         :param kdu_instance str: The unique name of the KDU instance
478         :param revision int: The revision to revert to. If omitted, rolls back
479                              the previous upgrade.
480
481         :return: If successful, returns the revision of active KDU instance,
482                  or raises an exception
483         """
484 1         raise MethodNotImplemented()
485
486 1     """Deletion"""
487
488 1     async def uninstall(
489         self, cluster_uuid: str, kdu_instance: str, namespace: str = None, **kwargs
490     ) -> bool:
491         """Uninstall a KDU instance
492
493         :param cluster_uuid str: The UUID of the cluster
494         :param kdu_instance str: The unique name of the KDU instance
495         :param namespace str: The namespace (model) where the Bundle was deployed
496         :param kwargs: Additional parameters
497             vca_id (str): VCA ID
498
499         :return: Returns True if successful, or raises an exception
500         """
501 1         model_name = self._obtain_namespace(
502             kdu_instance=kdu_instance, namespace=namespace
503         )
504
505 1         self.log.debug(f"[uninstall] Destroying model: {model_name}")
506
507 1         will_not_delete = False
508 1         if model_name not in self.uninstall_locks:
509 1             self.uninstall_locks[model_name] = asyncio.Lock()
510 1         delete_lock = self.uninstall_locks[model_name]
511
512 1         while delete_lock.locked():
513 0             will_not_delete = True
514 0             await asyncio.sleep(0.1)
515
516 1         if will_not_delete:
517 0             self.log.info("Model {} deleted by another worker.".format(model_name))
518 0             return True
519
520 1         try:
521 1             async with delete_lock:
522 1                 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
523
524 1                 await libjuju.destroy_model(model_name, total_timeout=3600)
525         finally:
526 1             self.uninstall_locks.pop(model_name)
527
528 1         self.log.debug(f"[uninstall] Model {model_name} destroyed")
529 1         return True
530
531 1     async def upgrade_charm(
532         self,
533         ee_id: str = None,
534         path: str = None,
535         charm_id: str = None,
536         charm_type: str = None,
537         timeout: float = None,
538     ) -> str:
539         """This method upgrade charms in VNFs
540
541         Args:
542             ee_id:  Execution environment id
543             path:   Local path to the charm
544             charm_id:   charm-id
545             charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
546             timeout: (Float)    Timeout for the ns update operation
547
548         Returns:
549             The output of the update operation if status equals to "completed"
550         """
551 0         raise K8sException(
552             "KDUs deployed with Juju Bundle do not support charm upgrade"
553         )
554
555 1     async def exec_primitive(
556         self,
557         cluster_uuid: str = None,
558         kdu_instance: str = None,
559         primitive_name: str = None,
560         timeout: float = 300,
561         params: dict = None,
562         db_dict: dict = None,
563         namespace: str = None,
564         **kwargs,
565     ) -> str:
566         """Exec primitive (Juju action)
567
568         :param cluster_uuid str: The UUID of the cluster
569         :param kdu_instance str: The unique name of the KDU instance
570         :param primitive_name: Name of action that will be executed
571         :param timeout: Timeout for action execution
572         :param params: Dictionary of all the parameters needed for the action
573         :param db_dict: Dictionary for any additional data
574         :param namespace str: The namespace (model) where the Bundle was deployed
575         :param kwargs: Additional parameters
576             vca_id (str): VCA ID
577
578         :return: Returns the output of the action
579         """
580 1         libjuju = await self._get_libjuju(kwargs.get("vca_id"))
581
582 1         namespace = self._obtain_namespace(
583             kdu_instance=kdu_instance, namespace=namespace
584         )
585
586 1         if not params or "application-name" not in params:
587 1             raise K8sException(
588                 "Missing application-name argument, \
589                                 argument needed for K8s actions"
590             )
591 1         try:
592 1             self.log.debug(
593                 "[exec_primitive] Getting model "
594                 "{} for the kdu_instance: {}".format(namespace, kdu_instance)
595             )
596 1             application_name = params["application-name"]
597 1             actions = await libjuju.get_actions(
598                 application_name=application_name, model_name=namespace
599             )
600 1             if primitive_name not in actions:
601 1                 raise K8sException("Primitive {} not found".format(primitive_name))
602 1             output, status = await libjuju.execute_action(
603                 application_name=application_name,
604                 model_name=namespace,
605                 action_name=primitive_name,
606                 **params,
607             )
608
609 1             if status != "completed":
610 1                 raise K8sException(
611                     "status is not completed: {} output: {}".format(status, output)
612                 )
613 1             if self.on_update_db:
614 0                 await self.on_update_db(
615                     cluster_uuid=cluster_uuid,
616                     kdu_instance=kdu_instance,
617                     filter=db_dict["filter"],
618                 )
619
620 1             return output
621
622 1         except Exception as e:
623 1             error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
624 1             self.log.error(error_msg)
625 1             raise K8sException(message=error_msg)
626
627 1     """Introspection"""
628
629 1     async def inspect_kdu(self, kdu_model: str) -> dict:
630         """Inspect a KDU
631
632         Inspects a bundle and returns a dictionary of config parameters and
633         their default values.
634
635         :param kdu_model str: The name or path of the bundle to inspect.
636
637         :return: If successful, returns a dictionary of available parameters
638                  and their default values.
639         """
640
641 1         kdu = {}
642 1         if not os.path.exists(kdu_model):
643 1             raise K8sException("file {} not found".format(kdu_model))
644
645 1         with open(kdu_model, "r") as f:
646 1             bundle = yaml.safe_load(f.read())
647
648 1             """
649             {
650                 'description': 'Test bundle',
651                 'bundle': 'kubernetes',
652                 'applications': {
653                     'mariadb-k8s': {
654                         'charm': 'cs:~charmed-osm/mariadb-k8s-20',
655                         'scale': 1,
656                         'options': {
657                             'password': 'manopw',
658                             'root_password': 'osm4u',
659                             'user': 'mano'
660                         },
661                         'series': 'kubernetes'
662                     }
663                 }
664             }
665             """
666             # TODO: This should be returned in an agreed-upon format
667 1             kdu = bundle["applications"]
668
669 1         return kdu
670
671 1     async def help_kdu(self, kdu_model: str) -> str:
672         """View the README
673
674                 If available, returns the README of the bundle.
675
676                 :param kdu_model str: The name or path of a bundle
677         f
678                 :return: If found, returns the contents of the README.
679         """
680 1         readme = None
681
682 1         files = ["README", "README.txt", "README.md"]
683 1         path = os.path.dirname(kdu_model)
684 1         for file in os.listdir(path):
685 1             if file in files:
686 1                 with open(file, "r") as f:
687 1                     readme = f.read()
688 1                     break
689
690 1         return readme
691
692 1     async def status_kdu(
693         self,
694         cluster_uuid: str,
695         kdu_instance: str,
696         complete_status: bool = False,
697         yaml_format: bool = False,
698         namespace: str = None,
699         **kwargs,
700     ) -> Union[str, dict]:
701         """Get the status of the KDU
702
703         Get the current status of the KDU instance.
704
705         :param cluster_uuid str: The UUID of the cluster
706         :param kdu_instance str: The unique id of the KDU instance
707         :param complete_status: To get the complete_status of the KDU
708         :param yaml_format: To get the status in proper format for NSR record
709         :param namespace str: The namespace (model) where the Bundle was deployed
710         :param: kwargs: Additional parameters
711             vca_id (str): VCA ID
712
713         :return: Returns a dictionary containing namespace, state, resources,
714                  and deployment_time and returns complete_status if complete_status is True
715         """
716 1         libjuju = await self._get_libjuju(kwargs.get("vca_id"))
717 1         status = {}
718
719 1         model_name = self._obtain_namespace(
720             kdu_instance=kdu_instance, namespace=namespace
721         )
722 1         model_status = await libjuju.get_model_status(model_name=model_name)
723
724 1         if not complete_status:
725 1             for name in model_status.applications:
726 1                 application = model_status.applications[name]
727 1                 status[name] = {"status": application["status"]["status"]}
728         else:
729 0             if yaml_format:
730 0                 return obj_to_yaml(model_status)
731             else:
732 0                 return obj_to_dict(model_status)
733
734 1         return status
735
736 1     async def add_relation(
737         self, provider: RelationEndpoint, requirer: RelationEndpoint
738     ):
739         """
740         Add relation between two charmed endpoints
741
742         :param: provider: Provider relation endpoint
743         :param: requirer: Requirer relation endpoint
744         """
745 1         self.log.debug(f"adding new relation between {provider} and {requirer}")
746 1         cross_model_relation = (
747             provider.model_name != requirer.model_name
748             or provider.vca_id != requirer.vca_id
749         )
750 1         try:
751 1             if cross_model_relation:
752                 # Cross-model relation
753 1                 provider_libjuju = await self._get_libjuju(provider.vca_id)
754 1                 requirer_libjuju = await self._get_libjuju(requirer.vca_id)
755 1                 offer = await provider_libjuju.offer(provider)
756 1                 if offer:
757 1                     saas_name = await requirer_libjuju.consume(
758                         requirer.model_name, offer, provider_libjuju
759                     )
760 1                     await requirer_libjuju.add_relation(
761                         requirer.model_name, requirer.endpoint, saas_name
762                     )
763             else:
764                 # Standard relation
765 1                 vca_id = provider.vca_id
766 1                 model = provider.model_name
767 1                 libjuju = await self._get_libjuju(vca_id)
768                 # add juju relations between two applications
769 1                 await libjuju.add_relation(
770                     model_name=model,
771                     endpoint_1=provider.endpoint,
772                     endpoint_2=requirer.endpoint,
773                 )
774 1         except Exception as e:
775 1             message = f"Error adding relation between {provider} and {requirer}: {e}"
776 1             self.log.error(message)
777 1             raise Exception(message=message)
778
779 1     async def update_vca_status(
780         self, vcastatus: dict, kdu_instance: str, namespace: str = None, **kwargs
781     ):
782         """
783         Add all configs, actions, executed actions of all applications in a model to vcastatus dict
784
785         :param vcastatus dict: dict containing vcastatus
786         :param kdu_instance str: The unique id of the KDU instance
787         :param namespace str: The namespace (model) where the Bundle was deployed
788         :param: kwargs: Additional parameters
789             vca_id (str): VCA ID
790
791         :return: None
792         """
793
794 1         model_name = self._obtain_namespace(
795             kdu_instance=kdu_instance, namespace=namespace
796         )
797
798 1         libjuju = await self._get_libjuju(kwargs.get("vca_id"))
799 1         try:
800 1             for vca_model_name in vcastatus:
801                 # Adding executed actions
802 1                 vcastatus[vca_model_name][
803                     "executedActions"
804                 ] = await libjuju.get_executed_actions(model_name=model_name)
805
806 1                 for application in vcastatus[vca_model_name]["applications"]:
807                     # Adding application actions
808 1                     vcastatus[vca_model_name]["applications"][application][
809                         "actions"
810                     ] = {}
811                     # Adding application configs
812 1                     vcastatus[vca_model_name]["applications"][application][
813                         "configs"
814                     ] = await libjuju.get_application_configs(
815                         model_name=model_name, application_name=application
816                     )
817
818 1         except Exception as e:
819 1             self.log.debug("Error in updating vca status: {}".format(str(e)))
820
821 1     async def get_services(
822         self, cluster_uuid: str, kdu_instance: str, namespace: str
823     ) -> list:
824         """Return a list of services of a kdu_instance"""
825
826 1         namespace = self._obtain_namespace(
827             kdu_instance=kdu_instance, namespace=namespace
828         )
829
830 1         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
831 1         kubectl = self._get_kubectl(credentials)
832 1         return kubectl.get_services(
833             field_selector="metadata.namespace={}".format(namespace)
834         )
835
836 1     async def get_service(
837         self, cluster_uuid: str, service_name: str, namespace: str
838     ) -> object:
839         """Return data for a specific service inside a namespace"""
840
841 1         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
842 1         kubectl = self._get_kubectl(credentials)
843 1         return kubectl.get_services(
844             field_selector="metadata.name={},metadata.namespace={}".format(
845                 service_name, namespace
846             )
847         )[0]
848
849 1     def get_credentials(self, cluster_uuid: str) -> str:
850         """
851         Get Cluster Kubeconfig
852         """
853 1         k8scluster = self.db.get_one(
854             "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
855         )
856
857 1         self.db.encrypt_decrypt_fields(
858             k8scluster.get("credentials"),
859             "decrypt",
860             ["password", "secret"],
861             schema_version=k8scluster["schema_version"],
862             salt=k8scluster["_id"],
863         )
864
865 1         return yaml.safe_dump(k8scluster.get("credentials"))
866
867 1     def _get_credential_name(self, cluster_uuid: str) -> str:
868         """
869         Get credential name for a k8s cloud
870
871         We cannot use the cluster_uuid for the credential name directly,
872         because it cannot start with a number, it must start with a letter.
873         Therefore, the k8s cloud credential name will be "cred-" followed
874         by the cluster uuid.
875
876         :param: cluster_uuid:   Cluster UUID of the kubernetes cloud (=cloud_name)
877
878         :return:                Name to use for the credential name.
879         """
880 1         return "cred-{}".format(cluster_uuid)
881
882 1     def get_namespace(self, cluster_uuid: str) -> str:
883         """Get the namespace UUID
884         Gets the namespace's unique name
885
886         :param cluster_uuid str: The UUID of the cluster
887         :returns: The namespace UUID, or raises an exception
888         """
889 1         pass
890
891 1     @staticmethod
892 1     def generate_kdu_instance_name(**kwargs):
893 0         db_dict = kwargs.get("db_dict")
894 0         kdu_name = kwargs.get("kdu_name", None)
895 0         if kdu_name:
896 0             kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
897         else:
898 0             kdu_instance = db_dict["filter"]["_id"]
899 0         return kdu_instance
900
901 1     async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
902         """
903         Get libjuju object
904
905         :param: vca_id: VCA ID
906                         If None, get a libjuju object with a Connection to the default VCA
907                         Else, geta libjuju object with a Connection to the specified VCA
908         """
909 1         if not vca_id:
910 1             while self.loading_libjuju.locked():
911 0                 await asyncio.sleep(0.1)
912 1             if not self.libjuju:
913 0                 async with self.loading_libjuju:
914 0                     vca_connection = await get_connection(self._store)
915 0                     self.libjuju = Libjuju(vca_connection, log=self.log)
916 1             return self.libjuju
917         else:
918 0             vca_connection = await get_connection(self._store, vca_id)
919 0             return Libjuju(vca_connection, log=self.log, n2vc=self)
920
921 1     def _get_kubectl(self, credentials: str) -> Kubectl:
922         """
923         Get Kubectl object
924
925         :param: kubeconfig_credentials: Kubeconfig credentials
926         """
927 0         kubecfg = tempfile.NamedTemporaryFile()
928 0         with open(kubecfg.name, "w") as kubecfg_file:
929 0             kubecfg_file.write(credentials)
930 0         return Kubectl(config_file=kubecfg.name)
931
932 1     def _obtain_namespace(self, kdu_instance: str, namespace: str = None) -> str:
933         """
934         Obtain the namespace/model name to use in the instantiation of a Juju Bundle in K8s. The default namespace is
935         the kdu_instance name. However, if the user passes the namespace where he wants to deploy the bundle,
936         that namespace will be used.
937
938         :param kdu_instance: the default KDU instance name
939         :param namespace: the namespace passed by the User
940         """
941
942         # deault the namespace/model name to the kdu_instance name TODO -> this should be the real return... But
943         #  once the namespace is not passed in most methods, I had to do this in another way. But I think this should
944         #  be the procedure in the future return namespace if namespace else kdu_instance
945
946         # TODO -> has referred above, this should be avoided in the future, this is temporary, in order to avoid
947         #  compatibility issues
948 1         return (
949             namespace
950             if namespace
951             else self._obtain_namespace_from_db(kdu_instance=kdu_instance)
952         )
953
954 1     def _obtain_namespace_from_db(self, kdu_instance: str) -> str:
955 0         db_nsrs = self.db.get_one(
956             table="nsrs", q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance}
957         )
958 0         for k8s in db_nsrs["_admin"]["deployed"]["K8s"]:
959 0             if k8s.get("kdu-instance") == kdu_instance:
960 0                 return k8s.get("namespace")
961 0         return ""