Code Coverage

Cobertura Coverage Report > n2vc >

k8s_juju_conn.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
k8s_juju_conn.py
0%
0/1
0%
0/407
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_juju_conn.py
0%
0/407
N/A

Source

n2vc/k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 0 import asyncio
16 0 import concurrent
17 0 import os
18 0 import uuid
19 0 import yaml
20
21 0 from juju.controller import Controller
22 0 from juju.model import Model
23 0 from n2vc.exceptions import K8sException, JujuError
24 0 from n2vc.k8s_conn import K8sConnector
25 0 from n2vc.kubectl import Kubectl
26 0 from .exceptions import MethodNotImplemented, N2VCNotFound
27 0 from n2vc.utils import obj_to_dict, obj_to_yaml
28
29
30 # from juju.bundle import BundleHandler
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34 0 class K8sJujuConnector(K8sConnector):
35 0     def __init__(
36         self,
37         fs: object,
38         db: object,
39         kubectl_command: str = "/usr/bin/kubectl",
40         juju_command: str = "/usr/bin/juju",
41         log: object = None,
42         on_update_db=None,
43     ):
44         """
45
46         :param kubectl_command: path to kubectl executable
47         :param helm_command: path to helm executable
48         :param fs: file system for kubernetes and helm configuration
49         :param log: logger
50         """
51
52         # parent class
53 0         K8sConnector.__init__(
54             self, db, log=log, on_update_db=on_update_db,
55         )
56
57 0         self.fs = fs
58 0         self.log.debug("Initializing K8S Juju connector")
59
60 0         self.juju_command = juju_command
61 0         self.juju_public_key = None
62
63 0         self.log.debug("K8S Juju connector initialized")
64         # TODO: Remove these commented lines:
65         # self.authenticated = False
66         # self.models = {}
67         # self.juju_secret = ""
68
69     """Initialization"""
70
71 0     async def init_env(
72         self,
73         k8s_creds: str,
74         namespace: str = "kube-system",
75         reuse_cluster_uuid: str = None,
76     ) -> (str, bool):
77         """
78         It prepares a given K8s cluster environment to run Juju bundles.
79
80         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
81             '.kube/config'
82         :param namespace: optional namespace to be used for juju. By default,
83             'kube-system' will be used
84         :param reuse_cluster_uuid: existing cluster uuid for reuse
85         :return: uuid of the K8s cluster and True if connector has installed some
86             software in the cluster
87             (on error, an exception will be raised)
88         """
89
90         """Bootstrapping
91
92         Bootstrapping cannot be done, by design, through the API. We need to
93         use the CLI tools.
94         """
95
96         """
97         WIP: Workflow
98
99         1. Has the environment already been bootstrapped?
100         - Check the database to see if we have a record for this env
101
102         2. If this is a new env, create it
103         - Add the k8s cloud to Juju
104         - Bootstrap
105         - Record it in the database
106
107         3. Connect to the Juju controller for this cloud
108
109         """
110         # cluster_uuid = reuse_cluster_uuid
111         # if not cluster_uuid:
112         #     cluster_uuid = str(uuid4())
113
114         ##################################################
115         # TODO: Pull info from db based on the namespace #
116         ##################################################
117
118         ###################################################
119         # TODO: Make it idempotent, calling add-k8s and   #
120         # bootstrap whenever reuse_cluster_uuid is passed #
121         # as parameter                                    #
122         # `init_env` is called to initialize the K8s      #
123         # cluster for juju. If this initialization fails, #
124         # it can be called again by LCM with the param    #
125         # reuse_cluster_uuid, e.g. to try to fix it.       #
126         ###################################################
127
128         # This is a new cluster, so bootstrap it
129
130 0         cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
131
132         # Is a local k8s cluster?
133 0         localk8s = self.is_local_k8s(k8s_creds)
134
135         # If the k8s is external, the juju controller needs a loadbalancer
136 0         loadbalancer = False if localk8s else True
137
138         # Name the new k8s cloud
139 0         k8s_cloud = "k8s-{}".format(cluster_uuid)
140
141 0         self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
142 0         await self.add_k8s(k8s_cloud, k8s_creds)
143
144         # Bootstrap Juju controller
145 0         self.log.debug("Bootstrapping...")
146 0         await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
147 0         self.log.debug("Bootstrap done.")
148
149         # Get the controller information
150
151         # Parse ~/.local/share/juju/controllers.yaml
152         # controllers.testing.api-endpoints|ca-cert|uuid
153 0         self.log.debug("Getting controller endpoints")
154 0         with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
155 0             controllers = yaml.load(f, Loader=yaml.Loader)
156 0             controller = controllers["controllers"][cluster_uuid]
157 0             endpoints = controller["api-endpoints"]
158 0             juju_endpoint = endpoints[0]
159 0             juju_ca_cert = controller["ca-cert"]
160
161         # Parse ~/.local/share/juju/accounts
162         # controllers.testing.user|password
163 0         self.log.debug("Getting accounts")
164 0         with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
165 0             controllers = yaml.load(f, Loader=yaml.Loader)
166 0             controller = controllers["controllers"][cluster_uuid]
167
168 0             juju_user = controller["user"]
169 0             juju_secret = controller["password"]
170
171 0         config = {
172             "endpoint": juju_endpoint,
173             "username": juju_user,
174             "secret": juju_secret,
175             "cacert": juju_ca_cert,
176             "loadbalancer": loadbalancer,
177         }
178
179         # Store the cluster configuration so it
180         # can be used for subsequent calls
181 0         self.log.debug("Setting config")
182 0         await self.set_config(cluster_uuid, config)
183
184         # Test connection
185 0         controller = await self.get_controller(cluster_uuid)
186 0         await controller.disconnect()
187
188         # TODO: Remove these commented lines
189         # raise Exception("EOL")
190         # self.juju_public_key = None
191         # Login to the k8s cluster
192         # if not self.authenticated:
193         #     await self.login(cluster_uuid)
194
195         # We're creating a new cluster
196         # print("Getting model {}".format(self.get_namespace(cluster_uuid),
197         #    cluster_uuid=cluster_uuid))
198         # model = await self.get_model(
199         #    self.get_namespace(cluster_uuid),
200         #    cluster_uuid=cluster_uuid
201         # )
202
203         # Disconnect from the model
204         # if model and model.is_connected():
205         #    await model.disconnect()
206
207 0         return cluster_uuid, True
208
209     """Repo Management"""
210
211 0     async def repo_add(
212         self, name: str, url: str, _type: str = "charm",
213     ):
214 0         raise MethodNotImplemented()
215
216 0     async def repo_list(self):
217 0         raise MethodNotImplemented()
218
219 0     async def repo_remove(
220         self, name: str,
221     ):
222 0         raise MethodNotImplemented()
223
224 0     async def synchronize_repos(self, cluster_uuid: str, name: str):
225         """
226         Returns None as currently add_repo is not implemented
227         """
228 0         return None
229
230     """Reset"""
231
232 0     async def reset(
233         self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
234     ) -> bool:
235         """Reset a cluster
236
237         Resets the Kubernetes cluster by removing the model that represents it.
238
239         :param cluster_uuid str: The UUID of the cluster to reset
240         :return: Returns True if successful or raises an exception.
241         """
242
243 0         try:
244
245             # Remove k8scluster from database
246 0             self.log.debug("[reset] Removing k8scluster from juju database")
247 0             juju_db = self.db.get_one("admin", {"_id": "juju"})
248
249 0             for k in juju_db["k8sclusters"]:
250 0                 if k["_id"] == cluster_uuid:
251 0                     juju_db["k8sclusters"].remove(k)
252 0                     self.db.set_one(
253                         table="admin",
254                         q_filter={"_id": "juju"},
255                         update_dict={"k8sclusters": juju_db["k8sclusters"]},
256                     )
257 0                     break
258
259             # Destroy the controller (via CLI)
260 0             self.log.debug("[reset] Destroying controller")
261 0             await self.destroy_controller(cluster_uuid)
262
263 0             self.log.debug("[reset] Removing k8s cloud")
264 0             k8s_cloud = "k8s-{}".format(cluster_uuid)
265 0             await self.remove_cloud(k8s_cloud)
266
267 0         except Exception as ex:
268 0             self.log.debug("Caught exception during reset: {}".format(ex))
269 0         return True
270         # TODO: Remove these commented lines
271         #     if not self.authenticated:
272         #         await self.login(cluster_uuid)
273
274         #     if self.controller.is_connected():
275         #         # Destroy the model
276         #         namespace = self.get_namespace(cluster_uuid)
277         #         if await self.has_model(namespace):
278         #             self.log.debug("[reset] Destroying model")
279         #             await self.controller.destroy_model(namespace, destroy_storage=True)
280
281         #         # Disconnect from the controller
282         #         self.log.debug("[reset] Disconnecting controller")
283         #         await self.logout()
284
285     """Deployment"""
286
287 0     async def install(
288         self,
289         cluster_uuid: str,
290         kdu_model: str,
291         atomic: bool = True,
292         timeout: float = 300,
293         params: dict = None,
294         db_dict: dict = None,
295         kdu_name: str = None,
296         namespace: str = None,
297     ) -> bool:
298         """Install a bundle
299
300         :param cluster_uuid str: The UUID of the cluster to install to
301         :param kdu_model str: The name or path of a bundle to install
302         :param atomic bool: If set, waits until the model is active and resets
303                             the cluster on failure.
304         :param timeout int: The time, in seconds, to wait for the install
305                             to finish
306         :param params dict: Key-value pairs of instantiation parameters
307         :param kdu_name: Name of the KDU instance to be installed
308         :param namespace: K8s namespace to use for the KDU instance
309
310         :return: If successful, returns ?
311         """
312
313 0         controller = await self.get_controller(cluster_uuid)
314
315         ##
316         # Get or create the model, based on the NS
317         # uuid.
318 0         if kdu_name:
319 0             kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
320         else:
321 0             kdu_instance = db_dict["filter"]["_id"]
322
323 0         self.log.debug("Checking for model named {}".format(kdu_instance))
324 0         try:
325 0             if self.on_update_db:
326 0                 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
327 0         except Exception as e:
328 0             self.log.debug("Error in updating vca status {}".format(str(e)))
329
330         # Create the new model
331 0         self.log.debug("Adding model: {}".format(kdu_instance))
332 0         model = await self.add_model(
333             kdu_instance, cluster_uuid=cluster_uuid, controller=controller
334         )
335
336 0         if model:
337             # TODO: Instantiation parameters
338
339             """
340             "Juju bundle that models the KDU, in any of the following ways:
341                 - <juju-repo>/<juju-bundle>
342                 - <juju-bundle folder under k8s_models folder in the package>
343                 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
344                     in the package>
345                 - <URL_where_to_fetch_juju_bundle>
346             """
347 0             try:
348 0                 previous_workdir = os.getcwd()
349 0             except FileNotFoundError:
350 0                 previous_workdir = "/app/storage"
351
352 0             bundle = kdu_model
353 0             if kdu_model.startswith("cs:"):
354 0                 bundle = kdu_model
355 0             elif kdu_model.startswith("http"):
356                 # Download the file
357 0                 pass
358             else:
359 0                 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
360
361 0                 os.chdir(new_workdir)
362
363 0                 bundle = "local:{}".format(kdu_model)
364
365 0             if not bundle:
366                 # Raise named exception that the bundle could not be found
367 0                 raise Exception()
368
369 0             self.log.debug("[install] deploying {}".format(bundle))
370 0             await model.deploy(bundle)
371
372             # Get the application
373 0             if atomic:
374                 # applications = model.applications
375 0                 self.log.debug("[install] Applications: {}".format(model.applications))
376 0                 for name in model.applications:
377 0                     self.log.debug("[install] Waiting for {} to settle".format(name))
378 0                     application = model.applications[name]
379 0                     try:
380                         # It's not enough to wait for all units to be active;
381                         # the application status needs to be active as well.
382 0                         self.log.debug("Waiting for all units to be active...")
383 0                         await model.block_until(
384                             lambda: all(
385                                 unit.agent_status == "idle"
386                                 and application.status in ["active", "unknown"]
387                                 and unit.workload_status in ["active", "unknown"]
388                                 for unit in application.units
389                             ),
390                             timeout=timeout,
391                         )
392 0                         self.log.debug("All units active.")
393
394                     # TODO use asyncio.TimeoutError
395 0                     except concurrent.futures._base.TimeoutError:
396 0                         os.chdir(previous_workdir)
397 0                         self.log.debug("[install] Timeout exceeded; resetting cluster")
398 0                         await self.reset(cluster_uuid)
399 0                         return False
400
401             # Wait for the application to be active
402 0             if model.is_connected():
403 0                 self.log.debug("[install] Disconnecting model")
404 0                 await model.disconnect()
405 0             await controller.disconnect()
406 0             os.chdir(previous_workdir)
407 0             if self.on_update_db:
408 0                 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
409 0             return kdu_instance
410 0         raise Exception("Unable to install")
411
412 0     async def instances_list(self, cluster_uuid: str) -> list:
413         """
414         returns a list of deployed releases in a cluster
415
416         :param cluster_uuid: the cluster
417         :return:
418         """
419 0         return []
420
421 0     async def upgrade(
422         self,
423         cluster_uuid: str,
424         kdu_instance: str,
425         kdu_model: str = None,
426         params: dict = None,
427     ) -> str:
428         """Upgrade a model
429
430         :param cluster_uuid str: The UUID of the cluster to upgrade
431         :param kdu_instance str: The unique name of the KDU instance
432         :param kdu_model str: The name or path of the bundle to upgrade to
433         :param params dict: Key-value pairs of instantiation parameters
434
435         :return: If successful, reference to the new revision number of the
436                  KDU instance.
437         """
438
439         # TODO: Loop through the bundle and upgrade each charm individually
440
441         """
442         The API doesn't have a concept of bundle upgrades, because there are
443         many possible changes: charm revision, disk, number of units, etc.
444
445         As such, we are only supporting a limited subset of upgrades. We'll
446         upgrade the charm revision but leave storage and scale untouched.
447
448         Scale changes should happen through OSM constructs, and changes to
449         storage would require a redeployment of the service, at least in this
450         initial release.
451         """
452 0         raise MethodNotImplemented()
453         # TODO: Remove these commented lines
454
455         # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
456
457         # model = None
458         # namespace = self.get_namespace(cluster_uuid)
459         # controller = await self.get_controller(cluster_uuid)
460
461         # try:
462         #     if namespace not in await controller.list_models():
463         #         raise N2VCNotFound(message="Model {} does not exist".format(namespace))
464
465         #     model = await controller.get_model(namespace)
466         #     with open(kdu_model, "r") as f:
467         #         bundle = yaml.safe_load(f)
468
469         #         """
470         #         {
471         #             'description': 'Test bundle',
472         #             'bundle': 'kubernetes',
473         #             'applications': {
474         #                 'mariadb-k8s': {
475         #                     'charm': 'cs:~charmed-osm/mariadb-k8s-20',
476         #                     'scale': 1,
477         #                     'options': {
478         #                         'password': 'manopw',
479         #                         'root_password': 'osm4u',
480         #                         'user': 'mano'
481         #                     },
482         #                     'series': 'kubernetes'
483         #                 }
484         #             }
485         #         }
486         #         """
487         #         # TODO: This should be returned in an agreed-upon format
488         #         for name in bundle["applications"]:
489         #             self.log.debug(model.applications)
490         #             application = model.applications[name]
491         #             self.log.debug(application)
492
493         #             path = bundle["applications"][name]["charm"]
494
495         #             try:
496         #                 await application.upgrade_charm(switch=path)
497         #             except juju.errors.JujuError as ex:
498         #                 if "already running charm" in str(ex):
499         #                     # We're already running this version
500         #                     pass
501         # finally:
502         #     if model:
503         #         await model.disconnect()
504         #     await controller.disconnect()
505         # return True
506
507     """Rollback"""
508
509 0     async def rollback(
510         self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
511     ) -> str:
512         """Rollback a model
513
514         :param cluster_uuid str: The UUID of the cluster to rollback
515         :param kdu_instance str: The unique name of the KDU instance
516         :param revision int: The revision to revert to. If omitted, rolls back
517                              the previous upgrade.
518
519         :return: If successful, returns the revision of active KDU instance,
520                  or raises an exception
521         """
522 0         raise MethodNotImplemented()
523
524     """Deletion"""
525
526 0     async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
527         """Uninstall a KDU instance
528
529         :param cluster_uuid str: The UUID of the cluster
530         :param kdu_instance str: The unique name of the KDU instance
531
532         :return: Returns True if successful, or raises an exception
533         """
534
535 0         controller = await self.get_controller(cluster_uuid)
536
537 0         self.log.debug("[uninstall] Destroying model")
538
539 0         await controller.destroy_models(kdu_instance)
540
541 0         self.log.debug("[uninstall] Model destroyed and disconnecting")
542 0         await controller.disconnect()
543
544 0         return True
545         # TODO: Remove these commented lines
546         # if not self.authenticated:
547         #     self.log.debug("[uninstall] Connecting to controller")
548         #     await self.login(cluster_uuid)
549
550 0     async def exec_primitive(
551         self,
552         cluster_uuid: str = None,
553         kdu_instance: str = None,
554         primitive_name: str = None,
555         timeout: float = 300,
556         params: dict = None,
557         db_dict: dict = None,
558     ) -> str:
559         """Exec primitive (Juju action)
560
561         :param cluster_uuid str: The UUID of the cluster
562         :param kdu_instance str: The unique name of the KDU instance
563         :param primitive_name: Name of action that will be executed
564         :param timeout: Timeout for action execution
565         :param params: Dictionary of all the parameters needed for the action
566         :db_dict: Dictionary for any additional data
567
568         :return: Returns the output of the action
569         """
570
571 0         controller = await self.get_controller(cluster_uuid)
572
573 0         if not params or "application-name" not in params:
574 0             raise K8sException(
575                 "Missing application-name argument, \
576                                 argument needed for K8s actions"
577             )
578 0         try:
579 0             self.log.debug(
580                 "[exec_primitive] Getting model "
581                 "kdu_instance: {}".format(kdu_instance)
582             )
583
584 0             model = await self.get_model(kdu_instance, controller=controller)
585
586 0             application_name = params["application-name"]
587 0             application = model.applications[application_name]
588
589 0             actions = await application.get_actions()
590 0             if primitive_name not in actions:
591 0                 raise K8sException("Primitive {} not found".format(primitive_name))
592
593 0             unit = None
594 0             for u in application.units:
595 0                 if await u.is_leader_from_status():
596 0                     unit = u
597 0                     break
598
599 0             if unit is None:
600 0                 raise K8sException("No leader unit found to execute action")
601
602 0             self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
603 0             action = await unit.run_action(primitive_name, **params)
604
605 0             output = await model.get_action_output(action_uuid=action.entity_id)
606 0             status = await model.get_action_status(uuid_or_prefix=action.entity_id)
607
608 0             status = (
609                 status[action.entity_id] if action.entity_id in status else "failed"
610             )
611
612 0             if status != "completed":
613 0                 raise K8sException(
614                     "status: {}, output: {}".format(status, output)
615                 )
616 0             if self.on_update_db:
617 0                 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
618
619 0             return output
620
621 0         except Exception as e:
622 0             error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
623 0             self.log.error(error_msg)
624 0             raise K8sException(message=error_msg)
625         finally:
626 0             await controller.disconnect()
627         # TODO: Remove these commented lines:
628         # if not self.authenticated:
629         #     self.log.debug("[exec_primitive] Connecting to controller")
630         #     await self.login(cluster_uuid)
631
632     """Introspection"""
633
634 0     async def inspect_kdu(self, kdu_model: str,) -> dict:
635         """Inspect a KDU
636
637         Inspects a bundle and returns a dictionary of config parameters and
638         their default values.
639
640         :param kdu_model str: The name or path of the bundle to inspect.
641
642         :return: If successful, returns a dictionary of available parameters
643                  and their default values.
644         """
645
646 0         kdu = {}
647 0         with open(kdu_model, "r") as f:
648 0             bundle = yaml.safe_load(f)
649
650             """
651             {
652                 'description': 'Test bundle',
653                 'bundle': 'kubernetes',
654                 'applications': {
655                     'mariadb-k8s': {
656                         'charm': 'cs:~charmed-osm/mariadb-k8s-20',
657                         'scale': 1,
658                         'options': {
659                             'password': 'manopw',
660                             'root_password': 'osm4u',
661                             'user': 'mano'
662                         },
663                         'series': 'kubernetes'
664                     }
665                 }
666             }
667             """
668             # TODO: This should be returned in an agreed-upon format
669 0             kdu = bundle["applications"]
670
671 0         return kdu
672
673 0     async def help_kdu(self, kdu_model: str,) -> str:
674         """View the README
675
676         If available, returns the README of the bundle.
677
678         :param kdu_model str: The name or path of a bundle
679
680         :return: If found, returns the contents of the README.
681         """
682 0         readme = None
683
684 0         files = ["README", "README.txt", "README.md"]
685 0         path = os.path.dirname(kdu_model)
686 0         for file in os.listdir(path):
687 0             if file in files:
688 0                 with open(file, "r") as f:
689 0                     readme = f.read()
690 0                     break
691
692 0         return readme
693
694 0     async def status_kdu(
695         self,
696         cluster_uuid: str,
697         kdu_instance: str,
698         complete_status: bool = False,
699         yaml_format: bool = False
700     ) -> dict:
701         """Get the status of the KDU
702
703         Get the current status of the KDU instance.
704
705         :param cluster_uuid str: The UUID of the cluster
706         :param kdu_instance str: The unique id of the KDU instance
707         :param complete_status: To get the complete_status of the KDU
708         :param yaml_format: To get the status in proper format for NSR record
709
710         :return: Returns a dictionary containing namespace, state, resources,
711                  and deployment_time and returns complete_status if complete_status is True
712         """
713 0         status = {}
714 0         controller = await self.get_controller(cluster_uuid)
715 0         try:
716 0             model = await self.get_model(kdu_instance, controller=controller)
717 0             model_status = await model.get_status()
718 0             status = model_status.applications
719
720 0             if not complete_status:
721 0                 for name in model_status.applications:
722 0                     application = model_status.applications[name]
723 0                     status[name] = {"status": application["status"]["status"]}
724             else:
725 0                 if yaml_format:
726 0                     return obj_to_yaml(model_status)
727                 else:
728 0                     return obj_to_dict(model_status)
729 0         except Exception as e:
730 0             self.log.debug("Error in getting model_status for kdu_instance: {}. Error: {}"
731                            .format(kdu_instance, str(e)))
732         finally:
733 0             if model:
734 0                 await model.disconnect()
735 0             await controller.disconnect()
736 0         return status
737
738 0     async def get_application_actions(
739             self,
740             application_name: str,
741             model_name: str,
742             cluster_uuid: str,
743             kdu_instance: str
744     ) -> dict:
745         """
746         Get available actions for an application
747
748         :param application_name str: Application name
749         :model_name str: Model name
750         :param cluster_uuid str: The UUID of the cluster
751         :param kdu_instance str: The unique id of the KDU instance
752
753         :return: Returns a dictionary which has action list of the Application
754         """
755 0         model = None
756 0         application_actions = {}
757 0         controller = await self.get_controller(cluster_uuid)
758 0         try:
759 0             model = await self.get_model(kdu_instance, controller=controller)
760 0             application = model.applications[application_name]
761 0             application_actions = await application.get_actions()
762 0         except Exception as e:
763 0             raise JujuError("Error in getting actions for application: {} in model: {}. Error: {}"
764                             .format(application_name, model_name, str(e)))
765         finally:
766 0             if model:
767 0                 await model.disconnect()
768 0             await controller.disconnect()
769
770 0         return application_actions
771
772 0     async def get_application_configs(
773             self,
774             application_name: str,
775             model_name: str,
776             cluster_uuid: str,
777             kdu_instance: str
778     ) -> dict:
779         """
780         Get available configs for an application.
781
782         :param application_name str: Application name
783         :model_name str: Model name
784         :param cluster_uuid str: The UUID of the cluster
785         :param kdu_instance str: The unique id of the KDU instance
786
787         :return: Returns a dictionary which has config list of the Application
788         """
789 0         model = None
790 0         application_configs = {}
791 0         controller = await self.get_controller(cluster_uuid)
792 0         try:
793 0             model = await self.get_model(kdu_instance, controller=controller)
794 0             application = model.applications[application_name]
795 0             application_configs = await application.get_config()
796 0         except Exception as e:
797 0             raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
798                             .format(application_name, model_name, str(e)))
799         finally:
800 0             if model:
801 0                 await model.disconnect()
802 0             await controller.disconnect()
803 0         return application_configs
804
805 0     async def get_executed_actions(
806             self,
807             model_name: str,
808             cluster_uuid: str,
809             kdu_instance: str
810     ) -> list:
811         """
812         Get executed/history of actions for a model.
813         :model_name str: Model name
814         :param cluster_uuid str: The UUID of the cluster
815         :param kdu_instance str: The unique id of the KDU instance
816
817         :return: List of executed actions for a model.
818         """
819 0         model = None
820 0         executed_actions = []
821 0         controller = await self.get_controller(cluster_uuid)
822 0         try:
823 0             model = await self.get_model(kdu_instance, controller=controller)
824             # Get all unique action names
825 0             actions = {}
826 0             for application in model.applications:
827 0                 application_actions = await self.get_application_actions(application, model,
828                                                                          cluster_uuid, kdu_instance)
829 0                 actions.update(application_actions)
830             # Get status of all actions
831 0             for application_action in actions:
832 0                 application_action_status_list = \
833                     await model.get_action_status(name=application_action)
834 0                 for action_id, action_status in application_action_status_list.items():
835 0                     executed_action = {"id": action_id,
836                                        "action": application_action,
837                                        "status": action_status}
838                     # Get action output by id
839 0                     action_status = await model.get_action_output(executed_action["id"])
840 0                     for k, v in action_status.items():
841 0                         executed_action[k] = v
842 0                     executed_actions.append(executed_action)
843 0         except Exception as e:
844 0             raise JujuError("Error in getting executed actions for model: {}. Error: {}"
845                             .format(model_name, str(e)))
846         finally:
847 0             if model:
848 0                 await model.disconnect()
849 0             await controller.disconnect()
850 0         return executed_actions
851
852 0     async def update_vca_status(self, vcastatus: dict, cluster_uuid: str, kdu_instance: str):
853         """
854         Add all configs, actions, executed actions of all applications in a model to vcastatus dict
855
856         :param vcastatus dict: dict containing vcastatus
857         :param cluster_uuid str: The UUID of the cluster
858         :param kdu_instance str: The unique id of the KDU instance
859         :return: None
860         """
861 0         try:
862 0             for model_name in vcastatus:
863                 # Adding executed actions
864 0                 vcastatus[model_name]["executedActions"] = \
865                     await self.get_executed_actions(model_name, cluster_uuid, kdu_instance)
866
867 0                 for application in vcastatus[model_name]["applications"]:
868                     # Adding application actions
869 0                     vcastatus[model_name]["applications"][application]["actions"] = \
870                         await self.get_application_actions(application, model_name,
871                                                            cluster_uuid, kdu_instance)
872                     # Adding application configs
873 0                     vcastatus[model_name]["applications"][application]["configs"] = \
874                         await self.get_application_configs(application, model_name,
875                                                            cluster_uuid, kdu_instance)
876 0         except Exception as e:
877 0             self.log.debug("Error in updating vca status: {}".format(str(e)))
878
879 0     async def get_services(
880         self, cluster_uuid: str, kdu_instance: str, namespace: str
881     ) -> list:
882         """Return a list of services of a kdu_instance"""
883
884 0         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
885
886 0         config_path = "/tmp/{}".format(cluster_uuid)
887 0         config_file = "{}/config".format(config_path)
888
889 0         if not os.path.exists(config_path):
890 0             os.makedirs(config_path)
891 0         with open(config_file, "w") as f:
892 0             f.write(credentials)
893
894 0         kubectl = Kubectl(config_file=config_file)
895 0         return kubectl.get_services(
896             field_selector="metadata.namespace={}".format(kdu_instance)
897         )
898
899 0     async def get_service(
900         self, cluster_uuid: str, service_name: str, namespace: str
901     ) -> object:
902         """Return data for a specific service inside a namespace"""
903
904 0         credentials = self.get_credentials(cluster_uuid=cluster_uuid)
905
906 0         config_path = "/tmp/{}".format(cluster_uuid)
907 0         config_file = "{}/config".format(config_path)
908
909 0         if not os.path.exists(config_path):
910 0             os.makedirs(config_path)
911 0         with open(config_file, "w") as f:
912 0             f.write(credentials)
913
914 0         kubectl = Kubectl(config_file=config_file)
915
916 0         return kubectl.get_services(
917             field_selector="metadata.name={},metadata.namespace={}".format(
918                 service_name, namespace
919             )
920         )[0]
921
922     # Private methods
923 0     async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
924         """Add a k8s cloud to Juju
925
926         Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
927         Juju Controller.
928
929         :param cloud_name str: The name of the cloud to add.
930         :param credentials dict: A dictionary representing the output of
931             `kubectl config view --raw`.
932
933         :returns: True if successful, otherwise raises an exception.
934         """
935
936 0         cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
937 0         self.log.debug(cmd)
938
939 0         process = await asyncio.create_subprocess_exec(
940             *cmd,
941             stdout=asyncio.subprocess.PIPE,
942             stderr=asyncio.subprocess.PIPE,
943             stdin=asyncio.subprocess.PIPE,
944         )
945
946         # Feed the process the credentials
947 0         process.stdin.write(credentials.encode("utf-8"))
948 0         await process.stdin.drain()
949 0         process.stdin.close()
950
951 0         _stdout, stderr = await process.communicate()
952
953 0         return_code = process.returncode
954
955 0         self.log.debug("add-k8s return code: {}".format(return_code))
956
957 0         if return_code > 0:
958 0             raise Exception(stderr)
959
960 0         return True
961
962 0     async def add_model(
963         self, model_name: str, cluster_uuid: str, controller: Controller
964     ) -> Model:
965         """Adds a model to the controller
966
967         Adds a new model to the Juju controller
968
969         :param model_name str: The name of the model to add.
970         :param cluster_uuid str: ID of the cluster.
971         :param controller: Controller object in which the model will be added
972         :returns: The juju.model.Model object of the new model upon success or
973                   raises an exception.
974         """
975
976 0         self.log.debug(
977             "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
978         )
979 0         model = None
980 0         try:
981 0             if self.juju_public_key is not None:
982 0                 model = await controller.add_model(
983                     model_name, config={"authorized-keys": self.juju_public_key}
984                 )
985             else:
986 0                 model = await controller.add_model(model_name)
987 0         except Exception as ex:
988 0             self.log.debug(ex)
989 0             self.log.debug("Caught exception: {}".format(ex))
990 0             pass
991
992 0         return model
993
994 0     async def bootstrap(
995         self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
996     ) -> bool:
997         """Bootstrap a Kubernetes controller
998
999         Bootstrap a Juju controller inside the Kubernetes cluster
1000
1001         :param cloud_name str: The name of the cloud.
1002         :param cluster_uuid str: The UUID of the cluster to bootstrap.
1003         :param loadbalancer bool: If the controller should use loadbalancer or not.
1004         :returns: True upon success or raises an exception.
1005         """
1006
1007 0         if not loadbalancer:
1008 0             cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1009         else:
1010             """
1011             For public clusters, specify that the controller service is using a
1012             LoadBalancer.
1013             """
1014 0             cmd = [
1015                 self.juju_command,
1016                 "bootstrap",
1017                 cloud_name,
1018                 cluster_uuid,
1019                 "--config",
1020                 "controller-service-type=loadbalancer",
1021             ]
1022
1023 0         self.log.debug(
1024             "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1025         )
1026
1027 0         process = await asyncio.create_subprocess_exec(
1028             *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1029         )
1030
1031 0         _stdout, stderr = await process.communicate()
1032
1033 0         return_code = process.returncode
1034
1035 0         if return_code > 0:
1036             #
1037 0             if b"already exists" not in stderr:
1038 0                 raise Exception(stderr)
1039
1040 0         return True
1041
1042 0     async def destroy_controller(self, cluster_uuid: str) -> bool:
1043         """Destroy a Kubernetes controller
1044
1045         Destroy an existing Kubernetes controller.
1046
1047         :param cluster_uuid str: The UUID of the cluster to bootstrap.
1048         :returns: True upon success or raises an exception.
1049         """
1050 0         cmd = [
1051             self.juju_command,
1052             "destroy-controller",
1053             "--destroy-all-models",
1054             "--destroy-storage",
1055             "-y",
1056             cluster_uuid,
1057         ]
1058
1059 0         process = await asyncio.create_subprocess_exec(
1060             *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1061         )
1062
1063 0         _stdout, stderr = await process.communicate()
1064
1065 0         return_code = process.returncode
1066
1067 0         if return_code > 0:
1068             #
1069 0             if "already exists" not in stderr:
1070 0                 raise Exception(stderr)
1071
1072 0     def get_credentials(self, cluster_uuid: str) -> str:
1073         """
1074         Get Cluster Kubeconfig
1075         """
1076 0         k8scluster = self.db.get_one(
1077             "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
1078         )
1079
1080 0         self.db.encrypt_decrypt_fields(
1081             k8scluster.get("credentials"),
1082             "decrypt",
1083             ["password", "secret"],
1084             schema_version=k8scluster["schema_version"],
1085             salt=k8scluster["_id"],
1086         )
1087
1088 0         return yaml.safe_dump(k8scluster.get("credentials"))
1089
1090 0     def get_config(self, cluster_uuid: str,) -> dict:
1091         """Get the cluster configuration
1092
1093         Gets the configuration of the cluster
1094
1095         :param cluster_uuid str: The UUID of the cluster.
1096         :return: A dict upon success, or raises an exception.
1097         """
1098
1099 0         juju_db = self.db.get_one("admin", {"_id": "juju"})
1100 0         config = None
1101 0         for k in juju_db["k8sclusters"]:
1102 0             if k["_id"] == cluster_uuid:
1103 0                 config = k["config"]
1104 0                 self.db.encrypt_decrypt_fields(
1105                     config,
1106                     "decrypt",
1107                     ["secret", "cacert"],
1108                     schema_version="1.1",
1109                     salt=k["_id"],
1110                 )
1111 0                 break
1112 0         if not config:
1113 0             raise Exception(
1114                 "Unable to locate configuration for cluster {}".format(cluster_uuid)
1115             )
1116 0         return config
1117
1118 0     async def get_model(self, model_name: str, controller: Controller) -> Model:
1119         """Get a model from the Juju Controller.
1120
1121         Note: Model objects returned must call disconnected() before it goes
1122         out of scope.
1123
1124         :param model_name str: The name of the model to get
1125         :param controller Controller: Controller object
1126         :return The juju.model.Model object if found, or None.
1127         """
1128
1129 0         models = await controller.list_models()
1130 0         if model_name not in models:
1131 0             raise N2VCNotFound("Model {} not found".format(model_name))
1132 0         self.log.debug("Found model: {}".format(model_name))
1133 0         return await controller.get_model(model_name)
1134
1135 0     def get_namespace(self, cluster_uuid: str,) -> str:
1136         """Get the namespace UUID
1137         Gets the namespace's unique name
1138
1139         :param cluster_uuid str: The UUID of the cluster
1140         :returns: The namespace UUID, or raises an exception
1141         """
1142 0         config = self.get_config(cluster_uuid)
1143
1144         # Make sure the name is in the config
1145 0         if "namespace" not in config:
1146 0             raise Exception("Namespace not found.")
1147
1148         # TODO: We want to make sure this is unique to the cluster, in case
1149         # the cluster is being reused.
1150         # Consider pre/appending the cluster id to the namespace string
1151 0         return config["namespace"]
1152
1153     # TODO: Remove these lines of code
1154     # async def has_model(self, model_name: str) -> bool:
1155     #     """Check if a model exists in the controller
1156
1157     #     Checks to see if a model exists in the connected Juju controller.
1158
1159     #     :param model_name str: The name of the model
1160     #     :return: A boolean indicating if the model exists
1161     #     """
1162     #     models = await self.controller.list_models()
1163
1164     #     if model_name in models:
1165     #         return True
1166     #     return False
1167
1168 0     def is_local_k8s(self, credentials: str,) -> bool:
1169         """Check if a cluster is local
1170
1171         Checks if a cluster is running in the local host
1172
1173         :param credentials dict: A dictionary containing the k8s credentials
1174         :returns: A boolean if the cluster is running locally
1175         """
1176
1177 0         creds = yaml.safe_load(credentials)
1178
1179 0         if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1180 0             for cluster in creds["clusters"]:
1181 0                 if "server" in cluster["cluster"]:
1182 0                     if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1183 0                         return True
1184
1185 0         return False
1186
1187 0     async def get_controller(self, cluster_uuid):
1188         """Login to the Juju controller."""
1189
1190 0         config = self.get_config(cluster_uuid)
1191
1192 0         juju_endpoint = config["endpoint"]
1193 0         juju_user = config["username"]
1194 0         juju_secret = config["secret"]
1195 0         juju_ca_cert = config["cacert"]
1196
1197 0         controller = Controller()
1198
1199 0         if juju_secret:
1200 0             self.log.debug(
1201                 "Connecting to controller... ws://{} as {}".format(
1202                     juju_endpoint, juju_user,
1203                 )
1204             )
1205 0             try:
1206 0                 await controller.connect(
1207                     endpoint=juju_endpoint,
1208                     username=juju_user,
1209                     password=juju_secret,
1210                     cacert=juju_ca_cert,
1211                 )
1212 0                 self.log.debug("JujuApi: Logged into controller")
1213 0                 return controller
1214 0             except Exception as ex:
1215 0                 self.log.debug(ex)
1216 0                 self.log.debug("Caught exception: {}".format(ex))
1217         else:
1218 0             self.log.fatal("VCA credentials not configured.")
1219
1220     # TODO: Remove these commented lines
1221     #         self.authenticated = False
1222     # if self.authenticated:
1223     #         return
1224
1225     #     self.connecting = True
1226     #     juju_public_key = None
1227     #     self.authenticated = True
1228     #     Test: Make sure we have the credentials loaded
1229     # async def logout(self):
1230     #     """Logout of the Juju controller."""
1231     #     self.log.debug("[logout]")
1232     #     if not self.authenticated:
1233     #         return False
1234
1235     #     for model in self.models:
1236     #         self.log.debug("Logging out of model {}".format(model))
1237     #         await self.models[model].disconnect()
1238
1239     #     if self.controller:
1240     #         self.log.debug("Disconnecting controller {}".format(self.controller))
1241     #         await self.controller.disconnect()
1242     #         self.controller = None
1243
1244     #     self.authenticated = False
1245
1246 0     async def remove_cloud(self, cloud_name: str,) -> bool:
1247         """Remove a k8s cloud from Juju
1248
1249         Removes a Kubernetes cloud from Juju.
1250
1251         :param cloud_name str: The name of the cloud to add.
1252
1253         :returns: True if successful, otherwise raises an exception.
1254         """
1255
1256         # Remove the bootstrapped controller
1257 0         cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1258 0         process = await asyncio.create_subprocess_exec(
1259             *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1260         )
1261
1262 0         _stdout, stderr = await process.communicate()
1263
1264 0         return_code = process.returncode
1265
1266 0         if return_code > 0:
1267 0             raise Exception(stderr)
1268
1269         # Remove the cloud from the local config
1270 0         cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1271 0         process = await asyncio.create_subprocess_exec(
1272             *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1273         )
1274
1275 0         _stdout, stderr = await process.communicate()
1276
1277 0         return_code = process.returncode
1278
1279 0         if return_code > 0:
1280 0             raise Exception(stderr)
1281
1282 0         return True
1283
1284 0     async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1285         """Save the cluster configuration
1286
1287         Saves the cluster information to the Mongo database
1288
1289         :param cluster_uuid str: The UUID of the cluster
1290         :param config dict: A dictionary containing the cluster configuration
1291         """
1292
1293 0         juju_db = self.db.get_one("admin", {"_id": "juju"})
1294
1295 0         k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1296 0         self.db.encrypt_decrypt_fields(
1297             config,
1298             "encrypt",
1299             ["secret", "cacert"],
1300             schema_version="1.1",
1301             salt=cluster_uuid,
1302         )
1303 0         k8sclusters.append({"_id": cluster_uuid, "config": config})
1304 0         self.db.set_one(
1305             table="admin",
1306             q_filter={"_id": "juju"},
1307             update_dict={"k8sclusters": k8sclusters},
1308         )