1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from n2vc
.exceptions
import K8sException
25 from n2vc
.k8s_conn
import K8sConnector
26 from n2vc
.kubectl
import Kubectl
27 from .exceptions
import MethodNotImplemented
30 # from juju.bundle import BundleHandler
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector
):
39 kubectl_command
: str = "/usr/bin/kubectl",
40 juju_command
: str = "/usr/bin/juju",
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
53 K8sConnector
.__init
__(
54 self
, db
, log
=log
, on_update_db
=on_update_db
,
58 self
.log
.debug("Initializing K8S Juju connector")
60 self
.authenticated
= False
63 self
.juju_command
= juju_command
66 self
.log
.debug("K8S Juju connector initialized")
73 namespace
: str = "kube-system",
74 reuse_cluster_uuid
: str = None,
77 It prepares a given K8s cluster environment to run Juju bundles.
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
91 Bootstrapping cannot be done, by design, through the API. We need to
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
104 - Record it in the database
106 3. Connect to the Juju controller for this cloud
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
127 # This is a new cluster, so bootstrap it
129 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
131 # Is a local k8s cluster?
132 localk8s
= self
.is_local_k8s(k8s_creds
)
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer
= False if localk8s
else True
137 # Name the new k8s cloud
138 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
140 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
141 await self
.add_k8s(k8s_cloud
, k8s_creds
)
143 # Bootstrap Juju controller
144 self
.log
.debug("Bootstrapping...")
145 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
146 self
.log
.debug("Bootstrap done.")
148 # Get the controller information
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self
.log
.debug("Getting controller endpoints")
153 with
open(os
.path
.expanduser("~/.local/share/juju/controllers.yaml")) as f
:
154 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
155 controller
= controllers
["controllers"][cluster_uuid
]
156 endpoints
= controller
["api-endpoints"]
157 self
.juju_endpoint
= endpoints
[0]
158 self
.juju_ca_cert
= controller
["ca-cert"]
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self
.log
.debug("Getting accounts")
163 with
open(os
.path
.expanduser("~/.local/share/juju/accounts.yaml")) as f
:
164 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
165 controller
= controllers
["controllers"][cluster_uuid
]
167 self
.juju_user
= controller
["user"]
168 self
.juju_secret
= controller
["password"]
170 # raise Exception("EOL")
172 self
.juju_public_key
= None
175 "endpoint": self
.juju_endpoint
,
176 "username": self
.juju_user
,
177 "secret": self
.juju_secret
,
178 "cacert": self
.juju_ca_cert
,
179 "namespace": namespace
,
180 "loadbalancer": loadbalancer
,
183 # Store the cluster configuration so it
184 # can be used for subsequent calls
185 self
.log
.debug("Setting config")
186 await self
.set_config(cluster_uuid
, config
)
188 # Login to the k8s cluster
189 if not self
.authenticated
:
190 await self
.login(cluster_uuid
)
192 # We're creating a new cluster
193 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
194 # cluster_uuid=cluster_uuid))
195 # model = await self.get_model(
196 # self.get_namespace(cluster_uuid),
197 # cluster_uuid=cluster_uuid
200 # Disconnect from the model
201 # if model and model.is_connected():
202 # await model.disconnect()
204 return cluster_uuid
, True
206 """Repo Management"""
209 self
, name
: str, url
: str, _type
: str = "charm",
211 raise MethodNotImplemented()
213 async def repo_list(self
):
214 raise MethodNotImplemented()
216 async def repo_remove(
219 raise MethodNotImplemented()
221 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
223 Returns None as currently add_repo is not implemented
230 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
234 Resets the Kubernetes cluster by removing the model that represents it.
236 :param cluster_uuid str: The UUID of the cluster to reset
237 :return: Returns True if successful or raises an exception.
241 if not self
.authenticated
:
242 await self
.login(cluster_uuid
)
244 if self
.controller
.is_connected():
246 namespace
= self
.get_namespace(cluster_uuid
)
247 if await self
.has_model(namespace
):
248 self
.log
.debug("[reset] Destroying model")
249 await self
.controller
.destroy_model(namespace
, destroy_storage
=True)
251 # Disconnect from the controller
252 self
.log
.debug("[reset] Disconnecting controller")
255 # Destroy the controller (via CLI)
256 self
.log
.debug("[reset] Destroying controller")
257 await self
.destroy_controller(cluster_uuid
)
259 self
.log
.debug("[reset] Removing k8s cloud")
260 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
261 await self
.remove_cloud(k8s_cloud
)
263 except Exception as ex
:
264 self
.log
.debug("Caught exception during reset: {}".format(ex
))
275 timeout
: float = 300,
277 db_dict
: dict = None,
278 kdu_name
: str = None,
279 namespace
: str = None,
283 :param cluster_uuid str: The UUID of the cluster to install to
284 :param kdu_model str: The name or path of a bundle to install
285 :param atomic bool: If set, waits until the model is active and resets
286 the cluster on failure.
287 :param timeout int: The time, in seconds, to wait for the install
289 :param params dict: Key-value pairs of instantiation parameters
290 :param kdu_name: Name of the KDU instance to be installed
291 :param namespace: K8s namespace to use for the KDU instance
293 :return: If successful, returns ?
296 if not self
.authenticated
:
297 self
.log
.debug("[install] Logging in to the controller")
298 await self
.login(cluster_uuid
)
301 # Get or create the model, based on the NS
304 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
306 kdu_instance
= db_dict
["filter"]["_id"]
308 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
310 # Create the new model
311 self
.log
.debug("Adding model: {}".format(kdu_instance
))
312 model
= await self
.add_model(kdu_instance
, cluster_uuid
=cluster_uuid
)
315 # TODO: Instantiation parameters
318 "Juju bundle that models the KDU, in any of the following ways:
319 - <juju-repo>/<juju-bundle>
320 - <juju-bundle folder under k8s_models folder in the package>
321 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
323 - <URL_where_to_fetch_juju_bundle>
326 previous_workdir
= os
.getcwd()
327 except FileNotFoundError
:
328 previous_workdir
= "/app/storage"
331 if kdu_model
.startswith("cs:"):
333 elif kdu_model
.startswith("http"):
337 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
339 os
.chdir(new_workdir
)
341 bundle
= "local:{}".format(kdu_model
)
344 # Raise named exception that the bundle could not be found
347 self
.log
.debug("[install] deploying {}".format(bundle
))
348 await model
.deploy(bundle
)
350 # Get the application
352 # applications = model.applications
353 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
354 for name
in model
.applications
:
355 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
356 application
= model
.applications
[name
]
358 # It's not enough to wait for all units to be active;
359 # the application status needs to be active as well.
360 self
.log
.debug("Waiting for all units to be active...")
361 await model
.block_until(
363 unit
.agent_status
== "idle"
364 and application
.status
in ["active", "unknown"]
365 and unit
.workload_status
in ["active", "unknown"]
366 for unit
in application
.units
370 self
.log
.debug("All units active.")
372 # TODO use asyncio.TimeoutError
373 except concurrent
.futures
._base
.TimeoutError
:
374 os
.chdir(previous_workdir
)
375 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
376 await self
.reset(cluster_uuid
)
379 # Wait for the application to be active
380 if model
.is_connected():
381 self
.log
.debug("[install] Disconnecting model")
382 await model
.disconnect()
384 os
.chdir(previous_workdir
)
387 raise Exception("Unable to install")
389 async def instances_list(self
, cluster_uuid
: str) -> list:
391 returns a list of deployed releases in a cluster
393 :param cluster_uuid: the cluster
402 kdu_model
: str = None,
407 :param cluster_uuid str: The UUID of the cluster to upgrade
408 :param kdu_instance str: The unique name of the KDU instance
409 :param kdu_model str: The name or path of the bundle to upgrade to
410 :param params dict: Key-value pairs of instantiation parameters
412 :return: If successful, reference to the new revision number of the
416 # TODO: Loop through the bundle and upgrade each charm individually
419 The API doesn't have a concept of bundle upgrades, because there are
420 many possible changes: charm revision, disk, number of units, etc.
422 As such, we are only supporting a limited subset of upgrades. We'll
423 upgrade the charm revision but leave storage and scale untouched.
425 Scale changes should happen through OSM constructs, and changes to
426 storage would require a redeployment of the service, at least in this
429 namespace
= self
.get_namespace(cluster_uuid
)
430 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
432 with
open(kdu_model
, "r") as f
:
433 bundle
= yaml
.safe_load(f
)
437 'description': 'Test bundle',
438 'bundle': 'kubernetes',
441 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
444 'password': 'manopw',
445 'root_password': 'osm4u',
448 'series': 'kubernetes'
453 # TODO: This should be returned in an agreed-upon format
454 for name
in bundle
["applications"]:
455 self
.log
.debug(model
.applications
)
456 application
= model
.applications
[name
]
457 self
.log
.debug(application
)
459 path
= bundle
["applications"][name
]["charm"]
462 await application
.upgrade_charm(switch
=path
)
463 except juju
.errors
.JujuError
as ex
:
464 if "already running charm" in str(ex
):
465 # We're already running this version
468 await model
.disconnect()
471 raise MethodNotImplemented()
476 self
, cluster_uuid
: str, kdu_instance
: str, revision
: int = 0,
480 :param cluster_uuid str: The UUID of the cluster to rollback
481 :param kdu_instance str: The unique name of the KDU instance
482 :param revision int: The revision to revert to. If omitted, rolls back
483 the previous upgrade.
485 :return: If successful, returns the revision of active KDU instance,
486 or raises an exception
488 raise MethodNotImplemented()
492 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
493 """Uninstall a KDU instance
495 :param cluster_uuid str: The UUID of the cluster
496 :param kdu_instance str: The unique name of the KDU instance
498 :return: Returns True if successful, or raises an exception
500 if not self
.authenticated
:
501 self
.log
.debug("[uninstall] Connecting to controller")
502 await self
.login(cluster_uuid
)
504 self
.log
.debug("[uninstall] Destroying model")
506 await self
.controller
.destroy_models(kdu_instance
)
508 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
513 async def exec_primitive(
515 cluster_uuid
: str = None,
516 kdu_instance
: str = None,
517 primitive_name
: str = None,
518 timeout
: float = 300,
520 db_dict
: dict = None,
522 """Exec primitive (Juju action)
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
526 :param primitive_name: Name of action that will be executed
527 :param timeout: Timeout for action execution
528 :param params: Dictionary of all the parameters needed for the action
529 :db_dict: Dictionary for any additional data
531 :return: Returns the output of the action
533 if not self
.authenticated
:
534 self
.log
.debug("[exec_primitive] Connecting to controller")
535 await self
.login(cluster_uuid
)
537 if not params
or "application-name" not in params
:
539 "Missing application-name argument, \
540 argument needed for K8s actions"
544 "[exec_primitive] Getting model "
545 "kdu_instance: {}".format(kdu_instance
)
548 model
= await self
.get_model(kdu_instance
, cluster_uuid
)
550 application_name
= params
["application-name"]
551 application
= model
.applications
[application_name
]
553 actions
= await application
.get_actions()
554 if primitive_name
not in actions
:
555 raise K8sException("Primitive {} not found".format(primitive_name
))
558 for u
in application
.units
:
559 if await u
.is_leader_from_status():
564 raise K8sException("No leader unit found to execute action")
566 self
.log
.debug("[exec_primitive] Running action: {}".format(primitive_name
))
567 action
= await unit
.run_action(primitive_name
, **params
)
569 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
570 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
573 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
576 if status
!= "completed":
578 "status is not completed: {} output: {}".format(status
, output
)
583 except Exception as e
:
584 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
585 self
.log
.error(error_msg
)
586 raise K8sException(message
=error_msg
)
590 async def inspect_kdu(self
, kdu_model
: str,) -> dict:
593 Inspects a bundle and returns a dictionary of config parameters and
594 their default values.
596 :param kdu_model str: The name or path of the bundle to inspect.
598 :return: If successful, returns a dictionary of available parameters
599 and their default values.
603 with
open(kdu_model
, "r") as f
:
604 bundle
= yaml
.safe_load(f
)
608 'description': 'Test bundle',
609 'bundle': 'kubernetes',
612 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
615 'password': 'manopw',
616 'root_password': 'osm4u',
619 'series': 'kubernetes'
624 # TODO: This should be returned in an agreed-upon format
625 kdu
= bundle
["applications"]
629 async def help_kdu(self
, kdu_model
: str,) -> str:
632 If available, returns the README of the bundle.
634 :param kdu_model str: The name or path of a bundle
636 :return: If found, returns the contents of the README.
640 files
= ["README", "README.txt", "README.md"]
641 path
= os
.path
.dirname(kdu_model
)
642 for file in os
.listdir(path
):
644 with
open(file, "r") as f
:
650 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str,) -> dict:
651 """Get the status of the KDU
653 Get the current status of the KDU instance.
655 :param cluster_uuid str: The UUID of the cluster
656 :param kdu_instance str: The unique id of the KDU instance
658 :return: Returns a dictionary containing namespace, state, resources,
663 model
= await self
.get_model(
664 self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
667 # model = await self.get_model_by_uuid(cluster_uuid)
669 model_status
= await model
.get_status()
670 status
= model_status
.applications
672 for name
in model_status
.applications
:
673 application
= model_status
.applications
[name
]
674 status
[name
] = {"status": application
["status"]["status"]}
676 if model
.is_connected():
677 await model
.disconnect()
681 async def get_services(
682 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
684 """Return a list of services of a kdu_instance"""
686 config_file
= self
.get_config_file(cluster_uuid
=cluster_uuid
)
687 kubectl
= Kubectl(config_file
=config_file
)
688 return kubectl
.get_services(
689 field_selector
="metadata.namespace={}".format(kdu_instance
)
692 async def get_service(
693 self
, cluster_uuid
: str, service_name
: str, namespace
: str
695 """Return data for a specific service inside a namespace"""
697 config_file
= self
.get_config_file(cluster_uuid
=cluster_uuid
)
698 kubectl
= Kubectl(config_file
=config_file
)
700 return kubectl
.get_services(
701 field_selector
="metadata.name={},metadata.namespace={}".format(
702 service_name
, namespace
707 async def add_k8s(self
, cloud_name
: str, credentials
: str,) -> bool:
708 """Add a k8s cloud to Juju
710 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
713 :param cloud_name str: The name of the cloud to add.
714 :param credentials dict: A dictionary representing the output of
715 `kubectl config view --raw`.
717 :returns: True if successful, otherwise raises an exception.
720 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
723 process
= await asyncio
.create_subprocess_exec(
725 stdout
=asyncio
.subprocess
.PIPE
,
726 stderr
=asyncio
.subprocess
.PIPE
,
727 stdin
=asyncio
.subprocess
.PIPE
,
730 # Feed the process the credentials
731 process
.stdin
.write(credentials
.encode("utf-8"))
732 await process
.stdin
.drain()
733 process
.stdin
.close()
735 _stdout
, stderr
= await process
.communicate()
737 return_code
= process
.returncode
739 self
.log
.debug("add-k8s return code: {}".format(return_code
))
742 raise Exception(stderr
)
746 async def add_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
747 """Adds a model to the controller
749 Adds a new model to the Juju controller
751 :param model_name str: The name of the model to add.
752 :returns: The juju.model.Model object of the new model upon success or
755 if not self
.authenticated
:
756 await self
.login(cluster_uuid
)
759 "Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
)
763 if self
.juju_public_key
is not None:
764 model
= await self
.controller
.add_model(
765 model_name
, config
={"authorized-keys": self
.juju_public_key
}
768 model
= await self
.controller
.add_model(model_name
)
769 except Exception as ex
:
771 self
.log
.debug("Caught exception: {}".format(ex
))
777 self
, cloud_name
: str, cluster_uuid
: str, loadbalancer
: bool
779 """Bootstrap a Kubernetes controller
781 Bootstrap a Juju controller inside the Kubernetes cluster
783 :param cloud_name str: The name of the cloud.
784 :param cluster_uuid str: The UUID of the cluster to bootstrap.
785 :param loadbalancer bool: If the controller should use loadbalancer or not.
786 :returns: True upon success or raises an exception.
790 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
793 For public clusters, specify that the controller service is using a
802 "controller-service-type=loadbalancer",
806 "Bootstrapping controller {} in cloud {}".format(cluster_uuid
, cloud_name
)
809 process
= await asyncio
.create_subprocess_exec(
810 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
813 _stdout
, stderr
= await process
.communicate()
815 return_code
= process
.returncode
819 if b
"already exists" not in stderr
:
820 raise Exception(stderr
)
824 async def destroy_controller(self
, cluster_uuid
: str) -> bool:
825 """Destroy a Kubernetes controller
827 Destroy an existing Kubernetes controller.
829 :param cluster_uuid str: The UUID of the cluster to bootstrap.
830 :returns: True upon success or raises an exception.
834 "destroy-controller",
835 "--destroy-all-models",
841 process
= await asyncio
.create_subprocess_exec(
842 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
845 _stdout
, stderr
= await process
.communicate()
847 return_code
= process
.returncode
851 if "already exists" not in stderr
:
852 raise Exception(stderr
)
854 def get_config_file(self
, cluster_uuid
: str) -> str:
856 Get Cluster Kubeconfig location
858 return "{}/{}/.kube/config".format(self
.fs
.path
, cluster_uuid
)
860 def get_config(self
, cluster_uuid
: str,) -> dict:
861 """Get the cluster configuration
863 Gets the configuration of the cluster
865 :param cluster_uuid str: The UUID of the cluster.
866 :return: A dict upon success, or raises an exception.
868 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
869 if os
.path
.exists(cluster_config
):
870 with
open(cluster_config
, "r") as f
:
871 config
= yaml
.safe_load(f
.read())
875 "Unable to locate configuration for cluster {}".format(cluster_uuid
)
878 async def get_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
879 """Get a model from the Juju Controller.
881 Note: Model objects returned must call disconnected() before it goes
884 :param model_name str: The name of the model to get
885 :return The juju.model.Model object if found, or None.
887 if not self
.authenticated
:
888 await self
.login(cluster_uuid
)
891 models
= await self
.controller
.list_models()
892 if model_name
in models
:
893 self
.log
.debug("Found model: {}".format(model_name
))
894 model
= await self
.controller
.get_model(model_name
)
897 def get_namespace(self
, cluster_uuid
: str,) -> str:
898 """Get the namespace UUID
899 Gets the namespace's unique name
901 :param cluster_uuid str: The UUID of the cluster
902 :returns: The namespace UUID, or raises an exception
904 config
= self
.get_config(cluster_uuid
)
906 # Make sure the name is in the config
907 if "namespace" not in config
:
908 raise Exception("Namespace not found.")
910 # TODO: We want to make sure this is unique to the cluster, in case
911 # the cluster is being reused.
912 # Consider pre/appending the cluster id to the namespace string
913 return config
["namespace"]
915 async def has_model(self
, model_name
: str) -> bool:
916 """Check if a model exists in the controller
918 Checks to see if a model exists in the connected Juju controller.
920 :param model_name str: The name of the model
921 :return: A boolean indicating if the model exists
923 models
= await self
.controller
.list_models()
925 if model_name
in models
:
929 def is_local_k8s(self
, credentials
: str,) -> bool:
930 """Check if a cluster is local
932 Checks if a cluster is running in the local host
934 :param credentials dict: A dictionary containing the k8s credentials
935 :returns: A boolean if the cluster is running locally
938 creds
= yaml
.safe_load(credentials
)
940 if creds
and os
.getenv("OSMLCM_VCA_APIPROXY"):
941 for cluster
in creds
["clusters"]:
942 if "server" in cluster
["cluster"]:
943 if os
.getenv("OSMLCM_VCA_APIPROXY") in cluster
["cluster"]["server"]:
948 async def login(self
, cluster_uuid
):
949 """Login to the Juju controller."""
951 if self
.authenticated
:
954 self
.connecting
= True
956 # Test: Make sure we have the credentials loaded
957 config
= self
.get_config(cluster_uuid
)
959 self
.juju_endpoint
= config
["endpoint"]
960 self
.juju_user
= config
["username"]
961 self
.juju_secret
= config
["secret"]
962 self
.juju_ca_cert
= config
["cacert"]
963 self
.juju_public_key
= None
965 self
.controller
= Controller()
969 "Connecting to controller... ws://{} as {}/{}".format(
970 self
.juju_endpoint
, self
.juju_user
, self
.juju_secret
,
974 await self
.controller
.connect(
975 endpoint
=self
.juju_endpoint
,
976 username
=self
.juju_user
,
977 password
=self
.juju_secret
,
978 cacert
=self
.juju_ca_cert
,
980 self
.authenticated
= True
981 self
.log
.debug("JujuApi: Logged into controller")
982 except Exception as ex
:
984 self
.log
.debug("Caught exception: {}".format(ex
))
987 self
.log
.fatal("VCA credentials not configured.")
988 self
.authenticated
= False
990 async def logout(self
):
991 """Logout of the Juju controller."""
992 self
.log
.debug("[logout]")
993 if not self
.authenticated
:
996 for model
in self
.models
:
997 self
.log
.debug("Logging out of model {}".format(model
))
998 await self
.models
[model
].disconnect()
1001 self
.log
.debug("Disconnecting controller {}".format(self
.controller
))
1002 await self
.controller
.disconnect()
1003 self
.controller
= None
1005 self
.authenticated
= False
1007 async def remove_cloud(self
, cloud_name
: str,) -> bool:
1008 """Remove a k8s cloud from Juju
1010 Removes a Kubernetes cloud from Juju.
1012 :param cloud_name str: The name of the cloud to add.
1014 :returns: True if successful, otherwise raises an exception.
1017 # Remove the bootstrapped controller
1018 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
1019 process
= await asyncio
.create_subprocess_exec(
1020 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1023 _stdout
, stderr
= await process
.communicate()
1025 return_code
= process
.returncode
1028 raise Exception(stderr
)
1030 # Remove the cloud from the local config
1031 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1032 process
= await asyncio
.create_subprocess_exec(
1033 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1036 _stdout
, stderr
= await process
.communicate()
1038 return_code
= process
.returncode
1041 raise Exception(stderr
)
1045 async def set_config(self
, cluster_uuid
: str, config
: dict,) -> bool:
1046 """Save the cluster configuration
1048 Saves the cluster information to the file store
1050 :param cluster_uuid str: The UUID of the cluster
1051 :param config dict: A dictionary containing the cluster configuration
1052 :returns: Boolean upon success or raises an exception.
1055 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1056 if not os
.path
.exists(cluster_config
):
1057 self
.log
.debug("Writing config to {}".format(cluster_config
))
1058 with
open(cluster_config
, "w") as f
:
1059 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))