1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from n2vc
.exceptions
import K8sException
25 from n2vc
.k8s_conn
import K8sConnector
27 from .exceptions
import MethodNotImplemented
30 # from juju.bundle import BundleHandler
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector
):
39 kubectl_command
: str = "/usr/bin/kubectl",
40 juju_command
: str = "/usr/bin/juju",
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
53 K8sConnector
.__init
__(
54 self
, db
, log
=log
, on_update_db
=on_update_db
,
58 self
.log
.debug("Initializing K8S Juju connector")
60 self
.authenticated
= False
63 self
.juju_command
= juju_command
66 self
.log
.debug("K8S Juju connector initialized")
73 namespace
: str = "kube-system",
74 reuse_cluster_uuid
: str = None,
77 It prepares a given K8s cluster environment to run Juju bundles.
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
91 Bootstrapping cannot be done, by design, through the API. We need to
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
104 - Record it in the database
106 3. Connect to the Juju controller for this cloud
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
127 if not reuse_cluster_uuid
:
128 # This is a new cluster, so bootstrap it
130 cluster_uuid
= str(uuid
.uuid4())
132 # Is a local k8s cluster?
133 localk8s
= self
.is_local_k8s(k8s_creds
)
135 # If the k8s is external, the juju controller needs a loadbalancer
136 loadbalancer
= False if localk8s
else True
138 # Name the new k8s cloud
139 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
141 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
142 await self
.add_k8s(k8s_cloud
, k8s_creds
)
144 # Bootstrap Juju controller
145 self
.log
.debug("Bootstrapping...")
146 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
147 self
.log
.debug("Bootstrap done.")
149 # Get the controller information
151 # Parse ~/.local/share/juju/controllers.yaml
152 # controllers.testing.api-endpoints|ca-cert|uuid
153 self
.log
.debug("Getting controller endpoints")
154 with
open(os
.path
.expanduser("~/.local/share/juju/controllers.yaml")) as f
:
155 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
156 controller
= controllers
["controllers"][cluster_uuid
]
157 endpoints
= controller
["api-endpoints"]
158 self
.juju_endpoint
= endpoints
[0]
159 self
.juju_ca_cert
= controller
["ca-cert"]
161 # Parse ~/.local/share/juju/accounts
162 # controllers.testing.user|password
163 self
.log
.debug("Getting accounts")
164 with
open(os
.path
.expanduser("~/.local/share/juju/accounts.yaml")) as f
:
165 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
166 controller
= controllers
["controllers"][cluster_uuid
]
168 self
.juju_user
= controller
["user"]
169 self
.juju_secret
= controller
["password"]
171 # raise Exception("EOL")
173 self
.juju_public_key
= None
176 "endpoint": self
.juju_endpoint
,
177 "username": self
.juju_user
,
178 "secret": self
.juju_secret
,
179 "cacert": self
.juju_ca_cert
,
180 "namespace": namespace
,
181 "loadbalancer": loadbalancer
,
184 # Store the cluster configuration so it
185 # can be used for subsequent calls
186 self
.log
.debug("Setting config")
187 await self
.set_config(cluster_uuid
, config
)
190 # This is an existing cluster, so get its config
191 cluster_uuid
= reuse_cluster_uuid
193 config
= self
.get_config(cluster_uuid
)
195 self
.juju_endpoint
= config
["endpoint"]
196 self
.juju_user
= config
["username"]
197 self
.juju_secret
= config
["secret"]
198 self
.juju_ca_cert
= config
["cacert"]
199 self
.juju_public_key
= None
201 # Login to the k8s cluster
202 if not self
.authenticated
:
203 await self
.login(cluster_uuid
)
205 # We're creating a new cluster
206 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
207 # cluster_uuid=cluster_uuid))
208 # model = await self.get_model(
209 # self.get_namespace(cluster_uuid),
210 # cluster_uuid=cluster_uuid
213 # Disconnect from the model
214 # if model and model.is_connected():
215 # await model.disconnect()
217 return cluster_uuid
, True
219 """Repo Management"""
222 self
, name
: str, url
: str, _type
: str = "charm",
224 raise MethodNotImplemented()
226 async def repo_list(self
):
227 raise MethodNotImplemented()
229 async def repo_remove(
232 raise MethodNotImplemented()
234 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
236 Returns None as currently add_repo is not implemented
243 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
247 Resets the Kubernetes cluster by removing the model that represents it.
249 :param cluster_uuid str: The UUID of the cluster to reset
250 :return: Returns True if successful or raises an exception.
254 if not self
.authenticated
:
255 await self
.login(cluster_uuid
)
257 if self
.controller
.is_connected():
259 namespace
= self
.get_namespace(cluster_uuid
)
260 if await self
.has_model(namespace
):
261 self
.log
.debug("[reset] Destroying model")
262 await self
.controller
.destroy_model(namespace
, destroy_storage
=True)
264 # Disconnect from the controller
265 self
.log
.debug("[reset] Disconnecting controller")
268 # Destroy the controller (via CLI)
269 self
.log
.debug("[reset] Destroying controller")
270 await self
.destroy_controller(cluster_uuid
)
272 self
.log
.debug("[reset] Removing k8s cloud")
273 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
274 await self
.remove_cloud(k8s_cloud
)
276 except Exception as ex
:
277 self
.log
.debug("Caught exception during reset: {}".format(ex
))
288 timeout
: float = 300,
290 db_dict
: dict = None,
291 kdu_name
: str = None,
292 namespace
: str = None,
296 :param cluster_uuid str: The UUID of the cluster to install to
297 :param kdu_model str: The name or path of a bundle to install
298 :param atomic bool: If set, waits until the model is active and resets
299 the cluster on failure.
300 :param timeout int: The time, in seconds, to wait for the install
302 :param params dict: Key-value pairs of instantiation parameters
303 :param kdu_name: Name of the KDU instance to be installed
304 :param namespace: K8s namespace to use for the KDU instance
306 :return: If successful, returns ?
309 if not self
.authenticated
:
310 self
.log
.debug("[install] Logging in to the controller")
311 await self
.login(cluster_uuid
)
314 # Get or create the model, based on the NS
317 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
319 kdu_instance
= db_dict
["filter"]["_id"]
321 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
323 # Create the new model
324 self
.log
.debug("Adding model: {}".format(kdu_instance
))
325 model
= await self
.add_model(kdu_instance
, cluster_uuid
=cluster_uuid
)
328 # TODO: Instantiation parameters
331 "Juju bundle that models the KDU, in any of the following ways:
332 - <juju-repo>/<juju-bundle>
333 - <juju-bundle folder under k8s_models folder in the package>
334 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
336 - <URL_where_to_fetch_juju_bundle>
339 previous_workdir
= os
.getcwd()
340 except FileNotFoundError
:
341 previous_workdir
= "/app/storage"
344 if kdu_model
.startswith("cs:"):
346 elif kdu_model
.startswith("http"):
350 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
352 os
.chdir(new_workdir
)
354 bundle
= "local:{}".format(kdu_model
)
357 # Raise named exception that the bundle could not be found
360 self
.log
.debug("[install] deploying {}".format(bundle
))
361 await model
.deploy(bundle
)
363 # Get the application
365 # applications = model.applications
366 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
367 for name
in model
.applications
:
368 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
369 application
= model
.applications
[name
]
371 # It's not enough to wait for all units to be active;
372 # the application status needs to be active as well.
373 self
.log
.debug("Waiting for all units to be active...")
374 await model
.block_until(
376 unit
.agent_status
== "idle"
377 and application
.status
in ["active", "unknown"]
378 and unit
.workload_status
in ["active", "unknown"]
379 for unit
in application
.units
383 self
.log
.debug("All units active.")
385 # TODO use asyncio.TimeoutError
386 except concurrent
.futures
._base
.TimeoutError
:
387 os
.chdir(previous_workdir
)
388 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
389 await self
.reset(cluster_uuid
)
392 # Wait for the application to be active
393 if model
.is_connected():
394 self
.log
.debug("[install] Disconnecting model")
395 await model
.disconnect()
397 os
.chdir(previous_workdir
)
400 raise Exception("Unable to install")
402 async def instances_list(self
, cluster_uuid
: str) -> list:
404 returns a list of deployed releases in a cluster
406 :param cluster_uuid: the cluster
415 kdu_model
: str = None,
420 :param cluster_uuid str: The UUID of the cluster to upgrade
421 :param kdu_instance str: The unique name of the KDU instance
422 :param kdu_model str: The name or path of the bundle to upgrade to
423 :param params dict: Key-value pairs of instantiation parameters
425 :return: If successful, reference to the new revision number of the
429 # TODO: Loop through the bundle and upgrade each charm individually
432 The API doesn't have a concept of bundle upgrades, because there are
433 many possible changes: charm revision, disk, number of units, etc.
435 As such, we are only supporting a limited subset of upgrades. We'll
436 upgrade the charm revision but leave storage and scale untouched.
438 Scale changes should happen through OSM constructs, and changes to
439 storage would require a redeployment of the service, at least in this
442 namespace
= self
.get_namespace(cluster_uuid
)
443 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
445 with
open(kdu_model
, "r") as f
:
446 bundle
= yaml
.safe_load(f
)
450 'description': 'Test bundle',
451 'bundle': 'kubernetes',
454 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
457 'password': 'manopw',
458 'root_password': 'osm4u',
461 'series': 'kubernetes'
466 # TODO: This should be returned in an agreed-upon format
467 for name
in bundle
["applications"]:
468 self
.log
.debug(model
.applications
)
469 application
= model
.applications
[name
]
470 self
.log
.debug(application
)
472 path
= bundle
["applications"][name
]["charm"]
475 await application
.upgrade_charm(switch
=path
)
476 except juju
.errors
.JujuError
as ex
:
477 if "already running charm" in str(ex
):
478 # We're already running this version
481 await model
.disconnect()
484 raise MethodNotImplemented()
489 self
, cluster_uuid
: str, kdu_instance
: str, revision
: int = 0,
493 :param cluster_uuid str: The UUID of the cluster to rollback
494 :param kdu_instance str: The unique name of the KDU instance
495 :param revision int: The revision to revert to. If omitted, rolls back
496 the previous upgrade.
498 :return: If successful, returns the revision of active KDU instance,
499 or raises an exception
501 raise MethodNotImplemented()
505 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
506 """Uninstall a KDU instance
508 :param cluster_uuid str: The UUID of the cluster
509 :param kdu_instance str: The unique name of the KDU instance
511 :return: Returns True if successful, or raises an exception
513 if not self
.authenticated
:
514 self
.log
.debug("[uninstall] Connecting to controller")
515 await self
.login(cluster_uuid
)
517 self
.log
.debug("[uninstall] Destroying model")
519 await self
.controller
.destroy_models(kdu_instance
)
521 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
526 async def exec_primitive(
528 cluster_uuid
: str = None,
529 kdu_instance
: str = None,
530 primitive_name
: str = None,
531 timeout
: float = 300,
533 db_dict
: dict = None,
535 """Exec primitive (Juju action)
537 :param cluster_uuid str: The UUID of the cluster
538 :param kdu_instance str: The unique name of the KDU instance
539 :param primitive_name: Name of action that will be executed
540 :param timeout: Timeout for action execution
541 :param params: Dictionary of all the parameters needed for the action
542 :db_dict: Dictionary for any additional data
544 :return: Returns the output of the action
546 if not self
.authenticated
:
547 self
.log
.debug("[exec_primitive] Connecting to controller")
548 await self
.login(cluster_uuid
)
550 if not params
or "application-name" not in params
:
552 "Missing application-name argument, \
553 argument needed for K8s actions"
557 "[exec_primitive] Getting model "
558 "kdu_instance: {}".format(kdu_instance
)
561 model
= await self
.get_model(kdu_instance
, cluster_uuid
)
563 application_name
= params
["application-name"]
564 application
= model
.applications
[application_name
]
566 actions
= await application
.get_actions()
567 if primitive_name
not in actions
:
568 raise K8sException("Primitive {} not found".format(primitive_name
))
571 for u
in application
.units
:
572 if await u
.is_leader_from_status():
577 raise K8sException("No leader unit found to execute action")
579 self
.log
.debug("[exec_primitive] Running action: {}".format(primitive_name
))
580 action
= await unit
.run_action(primitive_name
, **params
)
582 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
583 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
586 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
589 if status
!= "completed":
591 "status is not completed: {} output: {}".format(status
, output
)
596 except Exception as e
:
597 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
598 self
.log
.error(error_msg
)
599 raise K8sException(message
=error_msg
)
603 async def inspect_kdu(self
, kdu_model
: str,) -> dict:
606 Inspects a bundle and returns a dictionary of config parameters and
607 their default values.
609 :param kdu_model str: The name or path of the bundle to inspect.
611 :return: If successful, returns a dictionary of available parameters
612 and their default values.
616 with
open(kdu_model
, "r") as f
:
617 bundle
= yaml
.safe_load(f
)
621 'description': 'Test bundle',
622 'bundle': 'kubernetes',
625 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
628 'password': 'manopw',
629 'root_password': 'osm4u',
632 'series': 'kubernetes'
637 # TODO: This should be returned in an agreed-upon format
638 kdu
= bundle
["applications"]
642 async def help_kdu(self
, kdu_model
: str,) -> str:
645 If available, returns the README of the bundle.
647 :param kdu_model str: The name or path of a bundle
649 :return: If found, returns the contents of the README.
653 files
= ["README", "README.txt", "README.md"]
654 path
= os
.path
.dirname(kdu_model
)
655 for file in os
.listdir(path
):
657 with
open(file, "r") as f
:
663 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str,) -> dict:
664 """Get the status of the KDU
666 Get the current status of the KDU instance.
668 :param cluster_uuid str: The UUID of the cluster
669 :param kdu_instance str: The unique id of the KDU instance
671 :return: Returns a dictionary containing namespace, state, resources,
676 model
= await self
.get_model(
677 self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
680 # model = await self.get_model_by_uuid(cluster_uuid)
682 model_status
= await model
.get_status()
683 status
= model_status
.applications
685 for name
in model_status
.applications
:
686 application
= model_status
.applications
[name
]
687 status
[name
] = {"status": application
["status"]["status"]}
689 if model
.is_connected():
690 await model
.disconnect()
694 async def get_services(self
,
697 namespace
: str = None) -> list:
699 Returns empty list as currently add_repo is not implemented
701 raise MethodNotImplemented
703 async def get_service(self
,
706 namespace
: str = None) -> object:
708 Returns empty list as currently add_repo is not implemented
710 raise MethodNotImplemented
713 async def add_k8s(self
, cloud_name
: str, credentials
: str,) -> bool:
714 """Add a k8s cloud to Juju
716 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
719 :param cloud_name str: The name of the cloud to add.
720 :param credentials dict: A dictionary representing the output of
721 `kubectl config view --raw`.
723 :returns: True if successful, otherwise raises an exception.
726 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
729 process
= await asyncio
.create_subprocess_exec(
731 stdout
=asyncio
.subprocess
.PIPE
,
732 stderr
=asyncio
.subprocess
.PIPE
,
733 stdin
=asyncio
.subprocess
.PIPE
,
736 # Feed the process the credentials
737 process
.stdin
.write(credentials
.encode("utf-8"))
738 await process
.stdin
.drain()
739 process
.stdin
.close()
741 _stdout
, stderr
= await process
.communicate()
743 return_code
= process
.returncode
745 self
.log
.debug("add-k8s return code: {}".format(return_code
))
748 raise Exception(stderr
)
752 async def add_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
753 """Adds a model to the controller
755 Adds a new model to the Juju controller
757 :param model_name str: The name of the model to add.
758 :returns: The juju.model.Model object of the new model upon success or
761 if not self
.authenticated
:
762 await self
.login(cluster_uuid
)
765 "Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
)
768 if self
.juju_public_key
is not None:
769 model
= await self
.controller
.add_model(
770 model_name
, config
={"authorized-keys": self
.juju_public_key
}
773 model
= await self
.controller
.add_model(model_name
)
774 except Exception as ex
:
776 self
.log
.debug("Caught exception: {}".format(ex
))
782 self
, cloud_name
: str, cluster_uuid
: str, loadbalancer
: bool
784 """Bootstrap a Kubernetes controller
786 Bootstrap a Juju controller inside the Kubernetes cluster
788 :param cloud_name str: The name of the cloud.
789 :param cluster_uuid str: The UUID of the cluster to bootstrap.
790 :param loadbalancer bool: If the controller should use loadbalancer or not.
791 :returns: True upon success or raises an exception.
795 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
798 For public clusters, specify that the controller service is using a
807 "controller-service-type=loadbalancer",
811 "Bootstrapping controller {} in cloud {}".format(cluster_uuid
, cloud_name
)
814 process
= await asyncio
.create_subprocess_exec(
815 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
818 _stdout
, stderr
= await process
.communicate()
820 return_code
= process
.returncode
824 if b
"already exists" not in stderr
:
825 raise Exception(stderr
)
829 async def destroy_controller(self
, cluster_uuid
: str) -> bool:
830 """Destroy a Kubernetes controller
832 Destroy an existing Kubernetes controller.
834 :param cluster_uuid str: The UUID of the cluster to bootstrap.
835 :returns: True upon success or raises an exception.
839 "destroy-controller",
840 "--destroy-all-models",
846 process
= await asyncio
.create_subprocess_exec(
847 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
850 _stdout
, stderr
= await process
.communicate()
852 return_code
= process
.returncode
856 if "already exists" not in stderr
:
857 raise Exception(stderr
)
859 def get_config(self
, cluster_uuid
: str,) -> dict:
860 """Get the cluster configuration
862 Gets the configuration of the cluster
864 :param cluster_uuid str: The UUID of the cluster.
865 :return: A dict upon success, or raises an exception.
867 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
868 if os
.path
.exists(cluster_config
):
869 with
open(cluster_config
, "r") as f
:
870 config
= yaml
.safe_load(f
.read())
874 "Unable to locate configuration for cluster {}".format(cluster_uuid
)
877 async def get_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
878 """Get a model from the Juju Controller.
880 Note: Model objects returned must call disconnected() before it goes
883 :param model_name str: The name of the model to get
884 :return The juju.model.Model object if found, or None.
886 if not self
.authenticated
:
887 await self
.login(cluster_uuid
)
890 models
= await self
.controller
.list_models()
891 if model_name
in models
:
892 self
.log
.debug("Found model: {}".format(model_name
))
893 model
= await self
.controller
.get_model(model_name
)
896 def get_namespace(self
, cluster_uuid
: str,) -> str:
897 """Get the namespace UUID
898 Gets the namespace's unique name
900 :param cluster_uuid str: The UUID of the cluster
901 :returns: The namespace UUID, or raises an exception
903 config
= self
.get_config(cluster_uuid
)
905 # Make sure the name is in the config
906 if "namespace" not in config
:
907 raise Exception("Namespace not found.")
909 # TODO: We want to make sure this is unique to the cluster, in case
910 # the cluster is being reused.
911 # Consider pre/appending the cluster id to the namespace string
912 return config
["namespace"]
914 async def has_model(self
, model_name
: str) -> bool:
915 """Check if a model exists in the controller
917 Checks to see if a model exists in the connected Juju controller.
919 :param model_name str: The name of the model
920 :return: A boolean indicating if the model exists
922 models
= await self
.controller
.list_models()
924 if model_name
in models
:
928 def is_local_k8s(self
, credentials
: str,) -> bool:
929 """Check if a cluster is local
931 Checks if a cluster is running in the local host
933 :param credentials dict: A dictionary containing the k8s credentials
934 :returns: A boolean if the cluster is running locally
936 creds
= yaml
.safe_load(credentials
)
937 if os
.getenv("OSMLCM_VCA_APIPROXY"):
938 host_ip
= os
.getenv("OSMLCM_VCA_APIPROXY")
940 if creds
and host_ip
:
941 for cluster
in creds
["clusters"]:
942 if "server" in cluster
["cluster"]:
943 if host_ip
in cluster
["cluster"]["server"]:
948 async def login(self
, cluster_uuid
):
949 """Login to the Juju controller."""
951 if self
.authenticated
:
954 self
.connecting
= True
956 # Test: Make sure we have the credentials loaded
957 config
= self
.get_config(cluster_uuid
)
959 self
.juju_endpoint
= config
["endpoint"]
960 self
.juju_user
= config
["username"]
961 self
.juju_secret
= config
["secret"]
962 self
.juju_ca_cert
= config
["cacert"]
963 self
.juju_public_key
= None
965 self
.controller
= Controller()
969 "Connecting to controller... ws://{} as {}/{}".format(
970 self
.juju_endpoint
, self
.juju_user
, self
.juju_secret
,
974 await self
.controller
.connect(
975 endpoint
=self
.juju_endpoint
,
976 username
=self
.juju_user
,
977 password
=self
.juju_secret
,
978 cacert
=self
.juju_ca_cert
,
980 self
.authenticated
= True
981 self
.log
.debug("JujuApi: Logged into controller")
982 except Exception as ex
:
984 self
.log
.debug("Caught exception: {}".format(ex
))
987 self
.log
.fatal("VCA credentials not configured.")
988 self
.authenticated
= False
990 async def logout(self
):
991 """Logout of the Juju controller."""
992 self
.log
.debug("[logout]")
993 if not self
.authenticated
:
996 for model
in self
.models
:
997 self
.log
.debug("Logging out of model {}".format(model
))
998 await self
.models
[model
].disconnect()
1001 self
.log
.debug("Disconnecting controller {}".format(self
.controller
))
1002 await self
.controller
.disconnect()
1003 self
.controller
= None
1005 self
.authenticated
= False
1007 async def remove_cloud(self
, cloud_name
: str,) -> bool:
1008 """Remove a k8s cloud from Juju
1010 Removes a Kubernetes cloud from Juju.
1012 :param cloud_name str: The name of the cloud to add.
1014 :returns: True if successful, otherwise raises an exception.
1017 # Remove the bootstrapped controller
1018 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
1019 process
= await asyncio
.create_subprocess_exec(
1020 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1023 _stdout
, stderr
= await process
.communicate()
1025 return_code
= process
.returncode
1028 raise Exception(stderr
)
1030 # Remove the cloud from the local config
1031 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1032 process
= await asyncio
.create_subprocess_exec(
1033 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1036 _stdout
, stderr
= await process
.communicate()
1038 return_code
= process
.returncode
1041 raise Exception(stderr
)
1045 async def set_config(self
, cluster_uuid
: str, config
: dict,) -> bool:
1046 """Save the cluster configuration
1048 Saves the cluster information to the file store
1050 :param cluster_uuid str: The UUID of the cluster
1051 :param config dict: A dictionary containing the cluster configuration
1052 :returns: Boolean upon success or raises an exception.
1055 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1056 if not os
.path
.exists(cluster_config
):
1057 self
.log
.debug("Writing config to {}".format(cluster_config
))
1058 with
open(cluster_config
, "w") as f
:
1059 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))