1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from n2vc
.exceptions
import K8sException
25 from n2vc
.k8s_conn
import K8sConnector
26 from n2vc
.kubectl
import Kubectl
27 from .exceptions
import MethodNotImplemented
30 # from juju.bundle import BundleHandler
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector
):
39 kubectl_command
: str = "/usr/bin/kubectl",
40 juju_command
: str = "/usr/bin/juju",
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
53 K8sConnector
.__init
__(
54 self
, db
, log
=log
, on_update_db
=on_update_db
,
58 self
.log
.debug("Initializing K8S Juju connector")
60 self
.authenticated
= False
63 self
.juju_command
= juju_command
66 self
.log
.debug("K8S Juju connector initialized")
73 namespace
: str = "kube-system",
74 reuse_cluster_uuid
: str = None,
77 It prepares a given K8s cluster environment to run Juju bundles.
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
91 Bootstrapping cannot be done, by design, through the API. We need to
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
104 - Record it in the database
106 3. Connect to the Juju controller for this cloud
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
127 # This is a new cluster, so bootstrap it
129 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
131 # Is a local k8s cluster?
132 localk8s
= self
.is_local_k8s(k8s_creds
)
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer
= False if localk8s
else True
137 # Name the new k8s cloud
138 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
140 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
141 await self
.add_k8s(k8s_cloud
, k8s_creds
)
143 # Bootstrap Juju controller
144 self
.log
.debug("Bootstrapping...")
145 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
146 self
.log
.debug("Bootstrap done.")
148 # Get the controller information
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self
.log
.debug("Getting controller endpoints")
153 with
open(os
.path
.expanduser("~/.local/share/juju/controllers.yaml")) as f
:
154 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
155 controller
= controllers
["controllers"][cluster_uuid
]
156 endpoints
= controller
["api-endpoints"]
157 self
.juju_endpoint
= endpoints
[0]
158 self
.juju_ca_cert
= controller
["ca-cert"]
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self
.log
.debug("Getting accounts")
163 with
open(os
.path
.expanduser("~/.local/share/juju/accounts.yaml")) as f
:
164 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
165 controller
= controllers
["controllers"][cluster_uuid
]
167 self
.juju_user
= controller
["user"]
168 self
.juju_secret
= controller
["password"]
170 # raise Exception("EOL")
172 self
.juju_public_key
= None
175 "endpoint": self
.juju_endpoint
,
176 "username": self
.juju_user
,
177 "secret": self
.juju_secret
,
178 "cacert": self
.juju_ca_cert
,
179 "namespace": namespace
,
180 "loadbalancer": loadbalancer
,
183 # Store the cluster configuration so it
184 # can be used for subsequent calls
185 self
.log
.debug("Setting config")
186 await self
.set_config(cluster_uuid
, config
)
188 # Login to the k8s cluster
189 if not self
.authenticated
:
190 await self
.login(cluster_uuid
)
192 # We're creating a new cluster
193 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
194 # cluster_uuid=cluster_uuid))
195 # model = await self.get_model(
196 # self.get_namespace(cluster_uuid),
197 # cluster_uuid=cluster_uuid
200 # Disconnect from the model
201 # if model and model.is_connected():
202 # await model.disconnect()
204 return cluster_uuid
, True
206 """Repo Management"""
209 self
, name
: str, url
: str, _type
: str = "charm",
211 raise MethodNotImplemented()
213 async def repo_list(self
):
214 raise MethodNotImplemented()
216 async def repo_remove(
219 raise MethodNotImplemented()
221 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
223 Returns None as currently add_repo is not implemented
230 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
234 Resets the Kubernetes cluster by removing the model that represents it.
236 :param cluster_uuid str: The UUID of the cluster to reset
237 :return: Returns True if successful or raises an exception.
241 if not self
.authenticated
:
242 await self
.login(cluster_uuid
)
244 if self
.controller
.is_connected():
246 namespace
= self
.get_namespace(cluster_uuid
)
247 if await self
.has_model(namespace
):
248 self
.log
.debug("[reset] Destroying model")
249 await self
.controller
.destroy_model(namespace
, destroy_storage
=True)
251 # Disconnect from the controller
252 self
.log
.debug("[reset] Disconnecting controller")
255 # Destroy the controller (via CLI)
256 self
.log
.debug("[reset] Destroying controller")
257 await self
.destroy_controller(cluster_uuid
)
259 self
.log
.debug("[reset] Removing k8s cloud")
260 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
261 await self
.remove_cloud(k8s_cloud
)
263 except Exception as ex
:
264 self
.log
.debug("Caught exception during reset: {}".format(ex
))
275 timeout
: float = 300,
277 db_dict
: dict = None,
278 kdu_name
: str = None,
279 namespace
: str = None,
283 :param cluster_uuid str: The UUID of the cluster to install to
284 :param kdu_model str: The name or path of a bundle to install
285 :param atomic bool: If set, waits until the model is active and resets
286 the cluster on failure.
287 :param timeout int: The time, in seconds, to wait for the install
289 :param params dict: Key-value pairs of instantiation parameters
290 :param kdu_name: Name of the KDU instance to be installed
291 :param namespace: K8s namespace to use for the KDU instance
293 :return: If successful, returns ?
296 if not self
.authenticated
:
297 self
.log
.debug("[install] Logging in to the controller")
298 await self
.login(cluster_uuid
)
301 # Get or create the model, based on the NS
304 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
306 kdu_instance
= db_dict
["filter"]["_id"]
308 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
310 # Create the new model
311 self
.log
.debug("Adding model: {}".format(kdu_instance
))
312 model
= await self
.add_model(kdu_instance
, cluster_uuid
=cluster_uuid
)
315 # TODO: Instantiation parameters
318 "Juju bundle that models the KDU, in any of the following ways:
319 - <juju-repo>/<juju-bundle>
320 - <juju-bundle folder under k8s_models folder in the package>
321 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
323 - <URL_where_to_fetch_juju_bundle>
326 previous_workdir
= os
.getcwd()
327 except FileNotFoundError
:
328 previous_workdir
= "/app/storage"
331 if kdu_model
.startswith("cs:"):
333 elif kdu_model
.startswith("http"):
337 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
339 os
.chdir(new_workdir
)
341 bundle
= "local:{}".format(kdu_model
)
344 # Raise named exception that the bundle could not be found
347 self
.log
.debug("[install] deploying {}".format(bundle
))
348 await model
.deploy(bundle
)
350 # Get the application
352 # applications = model.applications
353 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
354 for name
in model
.applications
:
355 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
356 application
= model
.applications
[name
]
358 # It's not enough to wait for all units to be active;
359 # the application status needs to be active as well.
360 self
.log
.debug("Waiting for all units to be active...")
361 await model
.block_until(
363 unit
.agent_status
== "idle"
364 and application
.status
in ["active", "unknown"]
365 and unit
.workload_status
in ["active", "unknown"]
366 for unit
in application
.units
370 self
.log
.debug("All units active.")
372 # TODO use asyncio.TimeoutError
373 except concurrent
.futures
._base
.TimeoutError
:
374 os
.chdir(previous_workdir
)
375 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
376 await self
.reset(cluster_uuid
)
379 # Wait for the application to be active
380 if model
.is_connected():
381 self
.log
.debug("[install] Disconnecting model")
382 await model
.disconnect()
384 os
.chdir(previous_workdir
)
387 raise Exception("Unable to install")
389 async def instances_list(self
, cluster_uuid
: str) -> list:
391 returns a list of deployed releases in a cluster
393 :param cluster_uuid: the cluster
402 kdu_model
: str = None,
407 :param cluster_uuid str: The UUID of the cluster to upgrade
408 :param kdu_instance str: The unique name of the KDU instance
409 :param kdu_model str: The name or path of the bundle to upgrade to
410 :param params dict: Key-value pairs of instantiation parameters
412 :return: If successful, reference to the new revision number of the
416 # TODO: Loop through the bundle and upgrade each charm individually
419 The API doesn't have a concept of bundle upgrades, because there are
420 many possible changes: charm revision, disk, number of units, etc.
422 As such, we are only supporting a limited subset of upgrades. We'll
423 upgrade the charm revision but leave storage and scale untouched.
425 Scale changes should happen through OSM constructs, and changes to
426 storage would require a redeployment of the service, at least in this
429 namespace
= self
.get_namespace(cluster_uuid
)
430 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
432 with
open(kdu_model
, "r") as f
:
433 bundle
= yaml
.safe_load(f
)
437 'description': 'Test bundle',
438 'bundle': 'kubernetes',
441 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
444 'password': 'manopw',
445 'root_password': 'osm4u',
448 'series': 'kubernetes'
453 # TODO: This should be returned in an agreed-upon format
454 for name
in bundle
["applications"]:
455 self
.log
.debug(model
.applications
)
456 application
= model
.applications
[name
]
457 self
.log
.debug(application
)
459 path
= bundle
["applications"][name
]["charm"]
462 await application
.upgrade_charm(switch
=path
)
463 except juju
.errors
.JujuError
as ex
:
464 if "already running charm" in str(ex
):
465 # We're already running this version
468 await model
.disconnect()
471 raise MethodNotImplemented()
476 self
, cluster_uuid
: str, kdu_instance
: str, revision
: int = 0,
480 :param cluster_uuid str: The UUID of the cluster to rollback
481 :param kdu_instance str: The unique name of the KDU instance
482 :param revision int: The revision to revert to. If omitted, rolls back
483 the previous upgrade.
485 :return: If successful, returns the revision of active KDU instance,
486 or raises an exception
488 raise MethodNotImplemented()
492 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
493 """Uninstall a KDU instance
495 :param cluster_uuid str: The UUID of the cluster
496 :param kdu_instance str: The unique name of the KDU instance
498 :return: Returns True if successful, or raises an exception
500 if not self
.authenticated
:
501 self
.log
.debug("[uninstall] Connecting to controller")
502 await self
.login(cluster_uuid
)
504 self
.log
.debug("[uninstall] Destroying model")
506 await self
.controller
.destroy_models(kdu_instance
)
508 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
513 async def exec_primitive(
515 cluster_uuid
: str = None,
516 kdu_instance
: str = None,
517 primitive_name
: str = None,
518 timeout
: float = 300,
520 db_dict
: dict = None,
522 """Exec primitive (Juju action)
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
526 :param primitive_name: Name of action that will be executed
527 :param timeout: Timeout for action execution
528 :param params: Dictionary of all the parameters needed for the action
529 :db_dict: Dictionary for any additional data
531 :return: Returns the output of the action
533 if not self
.authenticated
:
534 self
.log
.debug("[exec_primitive] Connecting to controller")
535 await self
.login(cluster_uuid
)
537 if not params
or "application-name" not in params
:
539 "Missing application-name argument, \
540 argument needed for K8s actions"
544 "[exec_primitive] Getting model "
545 "kdu_instance: {}".format(kdu_instance
)
548 model
= await self
.get_model(kdu_instance
, cluster_uuid
)
550 application_name
= params
["application-name"]
551 application
= model
.applications
[application_name
]
553 actions
= await application
.get_actions()
554 if primitive_name
not in actions
:
555 raise K8sException("Primitive {} not found".format(primitive_name
))
558 for u
in application
.units
:
559 if await u
.is_leader_from_status():
564 raise K8sException("No leader unit found to execute action")
566 self
.log
.debug("[exec_primitive] Running action: {}".format(primitive_name
))
567 action
= await unit
.run_action(primitive_name
, **params
)
569 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
570 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
573 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
576 if status
!= "completed":
578 "status is not completed: {} output: {}".format(status
, output
)
583 except Exception as e
:
584 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
585 self
.log
.error(error_msg
)
586 raise K8sException(message
=error_msg
)
590 async def inspect_kdu(self
, kdu_model
: str,) -> dict:
593 Inspects a bundle and returns a dictionary of config parameters and
594 their default values.
596 :param kdu_model str: The name or path of the bundle to inspect.
598 :return: If successful, returns a dictionary of available parameters
599 and their default values.
603 with
open(kdu_model
, "r") as f
:
604 bundle
= yaml
.safe_load(f
)
608 'description': 'Test bundle',
609 'bundle': 'kubernetes',
612 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
615 'password': 'manopw',
616 'root_password': 'osm4u',
619 'series': 'kubernetes'
624 # TODO: This should be returned in an agreed-upon format
625 kdu
= bundle
["applications"]
629 async def help_kdu(self
, kdu_model
: str,) -> str:
632 If available, returns the README of the bundle.
634 :param kdu_model str: The name or path of a bundle
636 :return: If found, returns the contents of the README.
640 files
= ["README", "README.txt", "README.md"]
641 path
= os
.path
.dirname(kdu_model
)
642 for file in os
.listdir(path
):
644 with
open(file, "r") as f
:
650 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str,) -> dict:
651 """Get the status of the KDU
653 Get the current status of the KDU instance.
655 :param cluster_uuid str: The UUID of the cluster
656 :param kdu_instance str: The unique id of the KDU instance
658 :return: Returns a dictionary containing namespace, state, resources,
663 model
= await self
.get_model(
664 self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
667 # model = await self.get_model_by_uuid(cluster_uuid)
669 model_status
= await model
.get_status()
670 status
= model_status
.applications
672 for name
in model_status
.applications
:
673 application
= model_status
.applications
[name
]
674 status
[name
] = {"status": application
["status"]["status"]}
676 if model
.is_connected():
677 await model
.disconnect()
681 async def get_services(
682 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
684 """Return a list of services of a kdu_instance"""
686 config_file
= self
.get_config_file(cluster_uuid
=cluster_uuid
)
687 kubectl
= Kubectl(config_file
=config_file
)
688 return kubectl
.get_services(
689 field_selector
="metadata.namespace={}".format(kdu_instance
)
692 async def get_service(
693 self
, cluster_uuid
: str, service_name
: str, namespace
: str
695 """Return data for a specific service inside a namespace"""
697 config_file
= self
.get_config_file(cluster_uuid
=cluster_uuid
)
698 kubectl
= Kubectl(config_file
=config_file
)
700 return kubectl
.get_services(
701 field_selector
="metadata.name={},metadata.namespace={}".format(
702 service_name
, namespace
707 async def add_k8s(self
, cloud_name
: str, credentials
: str,) -> bool:
708 """Add a k8s cloud to Juju
710 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
713 :param cloud_name str: The name of the cloud to add.
714 :param credentials dict: A dictionary representing the output of
715 `kubectl config view --raw`.
717 :returns: True if successful, otherwise raises an exception.
720 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
723 process
= await asyncio
.create_subprocess_exec(
725 stdout
=asyncio
.subprocess
.PIPE
,
726 stderr
=asyncio
.subprocess
.PIPE
,
727 stdin
=asyncio
.subprocess
.PIPE
,
730 # Feed the process the credentials
731 process
.stdin
.write(credentials
.encode("utf-8"))
732 await process
.stdin
.drain()
733 process
.stdin
.close()
735 _stdout
, stderr
= await process
.communicate()
737 return_code
= process
.returncode
739 self
.log
.debug("add-k8s return code: {}".format(return_code
))
742 raise Exception(stderr
)
746 async def add_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
747 """Adds a model to the controller
749 Adds a new model to the Juju controller
751 :param model_name str: The name of the model to add.
752 :returns: The juju.model.Model object of the new model upon success or
755 if not self
.authenticated
:
756 await self
.login(cluster_uuid
)
759 "Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
)
762 if self
.juju_public_key
is not None:
763 model
= await self
.controller
.add_model(
764 model_name
, config
={"authorized-keys": self
.juju_public_key
}
767 model
= await self
.controller
.add_model(model_name
)
768 except Exception as ex
:
770 self
.log
.debug("Caught exception: {}".format(ex
))
776 self
, cloud_name
: str, cluster_uuid
: str, loadbalancer
: bool
778 """Bootstrap a Kubernetes controller
780 Bootstrap a Juju controller inside the Kubernetes cluster
782 :param cloud_name str: The name of the cloud.
783 :param cluster_uuid str: The UUID of the cluster to bootstrap.
784 :param loadbalancer bool: If the controller should use loadbalancer or not.
785 :returns: True upon success or raises an exception.
789 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
792 For public clusters, specify that the controller service is using a
801 "controller-service-type=loadbalancer",
805 "Bootstrapping controller {} in cloud {}".format(cluster_uuid
, cloud_name
)
808 process
= await asyncio
.create_subprocess_exec(
809 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
812 _stdout
, stderr
= await process
.communicate()
814 return_code
= process
.returncode
818 if b
"already exists" not in stderr
:
819 raise Exception(stderr
)
823 async def destroy_controller(self
, cluster_uuid
: str) -> bool:
824 """Destroy a Kubernetes controller
826 Destroy an existing Kubernetes controller.
828 :param cluster_uuid str: The UUID of the cluster to bootstrap.
829 :returns: True upon success or raises an exception.
833 "destroy-controller",
834 "--destroy-all-models",
840 process
= await asyncio
.create_subprocess_exec(
841 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
844 _stdout
, stderr
= await process
.communicate()
846 return_code
= process
.returncode
850 if "already exists" not in stderr
:
851 raise Exception(stderr
)
853 def get_config_file(self
, cluster_uuid
: str) -> str:
855 Get Cluster Kubeconfig location
857 return "{}/{}/.kube/config".format(self
.fs
.path
, cluster_uuid
)
859 def get_config(self
, cluster_uuid
: str,) -> dict:
860 """Get the cluster configuration
862 Gets the configuration of the cluster
864 :param cluster_uuid str: The UUID of the cluster.
865 :return: A dict upon success, or raises an exception.
867 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
868 if os
.path
.exists(cluster_config
):
869 with
open(cluster_config
, "r") as f
:
870 config
= yaml
.safe_load(f
.read())
874 "Unable to locate configuration for cluster {}".format(cluster_uuid
)
877 async def get_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
878 """Get a model from the Juju Controller.
880 Note: Model objects returned must call disconnected() before it goes
883 :param model_name str: The name of the model to get
884 :return The juju.model.Model object if found, or None.
886 if not self
.authenticated
:
887 await self
.login(cluster_uuid
)
890 models
= await self
.controller
.list_models()
891 if model_name
in models
:
892 self
.log
.debug("Found model: {}".format(model_name
))
893 model
= await self
.controller
.get_model(model_name
)
896 def get_namespace(self
, cluster_uuid
: str,) -> str:
897 """Get the namespace UUID
898 Gets the namespace's unique name
900 :param cluster_uuid str: The UUID of the cluster
901 :returns: The namespace UUID, or raises an exception
903 config
= self
.get_config(cluster_uuid
)
905 # Make sure the name is in the config
906 if "namespace" not in config
:
907 raise Exception("Namespace not found.")
909 # TODO: We want to make sure this is unique to the cluster, in case
910 # the cluster is being reused.
911 # Consider pre/appending the cluster id to the namespace string
912 return config
["namespace"]
914 async def has_model(self
, model_name
: str) -> bool:
915 """Check if a model exists in the controller
917 Checks to see if a model exists in the connected Juju controller.
919 :param model_name str: The name of the model
920 :return: A boolean indicating if the model exists
922 models
= await self
.controller
.list_models()
924 if model_name
in models
:
928 def is_local_k8s(self
, credentials
: str,) -> bool:
929 """Check if a cluster is local
931 Checks if a cluster is running in the local host
933 :param credentials dict: A dictionary containing the k8s credentials
934 :returns: A boolean if the cluster is running locally
937 creds
= yaml
.safe_load(credentials
)
939 if creds
and os
.getenv("OSMLCM_VCA_APIPROXY"):
940 for cluster
in creds
["clusters"]:
941 if "server" in cluster
["cluster"]:
942 if os
.getenv("OSMLCM_VCA_APIPROXY") in cluster
["cluster"]["server"]:
947 async def login(self
, cluster_uuid
):
948 """Login to the Juju controller."""
950 if self
.authenticated
:
953 self
.connecting
= True
955 # Test: Make sure we have the credentials loaded
956 config
= self
.get_config(cluster_uuid
)
958 self
.juju_endpoint
= config
["endpoint"]
959 self
.juju_user
= config
["username"]
960 self
.juju_secret
= config
["secret"]
961 self
.juju_ca_cert
= config
["cacert"]
962 self
.juju_public_key
= None
964 self
.controller
= Controller()
968 "Connecting to controller... ws://{} as {}/{}".format(
969 self
.juju_endpoint
, self
.juju_user
, self
.juju_secret
,
973 await self
.controller
.connect(
974 endpoint
=self
.juju_endpoint
,
975 username
=self
.juju_user
,
976 password
=self
.juju_secret
,
977 cacert
=self
.juju_ca_cert
,
979 self
.authenticated
= True
980 self
.log
.debug("JujuApi: Logged into controller")
981 except Exception as ex
:
983 self
.log
.debug("Caught exception: {}".format(ex
))
986 self
.log
.fatal("VCA credentials not configured.")
987 self
.authenticated
= False
989 async def logout(self
):
990 """Logout of the Juju controller."""
991 self
.log
.debug("[logout]")
992 if not self
.authenticated
:
995 for model
in self
.models
:
996 self
.log
.debug("Logging out of model {}".format(model
))
997 await self
.models
[model
].disconnect()
1000 self
.log
.debug("Disconnecting controller {}".format(self
.controller
))
1001 await self
.controller
.disconnect()
1002 self
.controller
= None
1004 self
.authenticated
= False
1006 async def remove_cloud(self
, cloud_name
: str,) -> bool:
1007 """Remove a k8s cloud from Juju
1009 Removes a Kubernetes cloud from Juju.
1011 :param cloud_name str: The name of the cloud to add.
1013 :returns: True if successful, otherwise raises an exception.
1016 # Remove the bootstrapped controller
1017 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
1018 process
= await asyncio
.create_subprocess_exec(
1019 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1022 _stdout
, stderr
= await process
.communicate()
1024 return_code
= process
.returncode
1027 raise Exception(stderr
)
1029 # Remove the cloud from the local config
1030 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1031 process
= await asyncio
.create_subprocess_exec(
1032 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1035 _stdout
, stderr
= await process
.communicate()
1037 return_code
= process
.returncode
1040 raise Exception(stderr
)
1044 async def set_config(self
, cluster_uuid
: str, config
: dict,) -> bool:
1045 """Save the cluster configuration
1047 Saves the cluster information to the file store
1049 :param cluster_uuid str: The UUID of the cluster
1050 :param config dict: A dictionary containing the cluster configuration
1051 :returns: Boolean upon success or raises an exception.
1054 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1055 if not os
.path
.exists(cluster_config
):
1056 self
.log
.debug("Writing config to {}".format(cluster_config
))
1057 with
open(cluster_config
, "w") as f
:
1058 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))