808201d23fa92dadcd1ec9d417352328fafbafb4
1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.controller
import Controller
23 from juju
.model
import Model
24 from n2vc
.exceptions
import K8sException
25 from n2vc
.k8s_conn
import K8sConnector
26 from n2vc
.kubectl
import Kubectl
27 from .exceptions
import MethodNotImplemented
30 # from juju.bundle import BundleHandler
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector
):
39 kubectl_command
: str = "/usr/bin/kubectl",
40 juju_command
: str = "/usr/bin/juju",
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
53 K8sConnector
.__init
__(
54 self
, db
, log
=log
, on_update_db
=on_update_db
,
58 self
.log
.debug("Initializing K8S Juju connector")
60 self
.authenticated
= False
63 self
.juju_command
= juju_command
66 self
.log
.debug("K8S Juju connector initialized")
73 namespace
: str = "kube-system",
74 reuse_cluster_uuid
: str = None,
77 It prepares a given K8s cluster environment to run Juju bundles.
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
91 Bootstrapping cannot be done, by design, through the API. We need to
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
104 - Record it in the database
106 3. Connect to the Juju controller for this cloud
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
127 # This is a new cluster, so bootstrap it
129 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
131 # Is a local k8s cluster?
132 localk8s
= self
.is_local_k8s(k8s_creds
)
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer
= False if localk8s
else True
137 # Name the new k8s cloud
138 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
140 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
141 await self
.add_k8s(k8s_cloud
, k8s_creds
)
143 # Bootstrap Juju controller
144 self
.log
.debug("Bootstrapping...")
145 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
146 self
.log
.debug("Bootstrap done.")
148 # Get the controller information
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self
.log
.debug("Getting controller endpoints")
153 with
open(os
.path
.expanduser("~/.local/share/juju/controllers.yaml")) as f
:
154 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
155 controller
= controllers
["controllers"][cluster_uuid
]
156 endpoints
= controller
["api-endpoints"]
157 self
.juju_endpoint
= endpoints
[0]
158 self
.juju_ca_cert
= controller
["ca-cert"]
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self
.log
.debug("Getting accounts")
163 with
open(os
.path
.expanduser("~/.local/share/juju/accounts.yaml")) as f
:
164 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
165 controller
= controllers
["controllers"][cluster_uuid
]
167 self
.juju_user
= controller
["user"]
168 self
.juju_secret
= controller
["password"]
170 # raise Exception("EOL")
172 self
.juju_public_key
= None
175 "endpoint": self
.juju_endpoint
,
176 "username": self
.juju_user
,
177 "secret": self
.juju_secret
,
178 "cacert": self
.juju_ca_cert
,
179 "namespace": namespace
,
180 "loadbalancer": loadbalancer
,
183 # Store the cluster configuration so it
184 # can be used for subsequent calls
185 self
.log
.debug("Setting config")
186 await self
.set_config(cluster_uuid
, config
)
188 # Login to the k8s cluster
189 if not self
.authenticated
:
190 await self
.login(cluster_uuid
)
192 # We're creating a new cluster
193 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
194 # cluster_uuid=cluster_uuid))
195 # model = await self.get_model(
196 # self.get_namespace(cluster_uuid),
197 # cluster_uuid=cluster_uuid
200 # Disconnect from the model
201 # if model and model.is_connected():
202 # await model.disconnect()
204 return cluster_uuid
, True
206 """Repo Management"""
209 self
, name
: str, url
: str, _type
: str = "charm",
211 raise MethodNotImplemented()
213 async def repo_list(self
):
214 raise MethodNotImplemented()
216 async def repo_remove(
219 raise MethodNotImplemented()
221 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
223 Returns None as currently add_repo is not implemented
230 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
234 Resets the Kubernetes cluster by removing the model that represents it.
236 :param cluster_uuid str: The UUID of the cluster to reset
237 :return: Returns True if successful or raises an exception.
241 if not self
.authenticated
:
242 await self
.login(cluster_uuid
)
244 if self
.controller
.is_connected():
246 namespace
= self
.get_namespace(cluster_uuid
)
247 if await self
.has_model(namespace
):
248 self
.log
.debug("[reset] Destroying model")
249 await self
.controller
.destroy_model(namespace
, destroy_storage
=True)
251 # Disconnect from the controller
252 self
.log
.debug("[reset] Disconnecting controller")
255 # Destroy the controller (via CLI)
256 self
.log
.debug("[reset] Destroying controller")
257 await self
.destroy_controller(cluster_uuid
)
259 self
.log
.debug("[reset] Removing k8s cloud")
260 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
261 await self
.remove_cloud(k8s_cloud
)
263 except Exception as ex
:
264 self
.log
.debug("Caught exception during reset: {}".format(ex
))
275 timeout
: float = 300,
277 db_dict
: dict = None,
278 kdu_name
: str = None,
279 namespace
: str = None,
283 :param cluster_uuid str: The UUID of the cluster to install to
284 :param kdu_model str: The name or path of a bundle to install
285 :param atomic bool: If set, waits until the model is active and resets
286 the cluster on failure.
287 :param timeout int: The time, in seconds, to wait for the install
289 :param params dict: Key-value pairs of instantiation parameters
290 :param kdu_name: Name of the KDU instance to be installed
291 :param namespace: K8s namespace to use for the KDU instance
293 :return: If successful, returns ?
296 if not self
.authenticated
:
297 self
.log
.debug("[install] Logging in to the controller")
298 await self
.login(cluster_uuid
)
301 # Get or create the model, based on the NS
304 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
306 kdu_instance
= db_dict
["filter"]["_id"]
308 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
310 # Create the new model
311 self
.log
.debug("Adding model: {}".format(kdu_instance
))
312 model
= await self
.add_model(kdu_instance
, cluster_uuid
=cluster_uuid
)
315 # TODO: Instantiation parameters
318 "Juju bundle that models the KDU, in any of the following ways:
319 - <juju-repo>/<juju-bundle>
320 - <juju-bundle folder under k8s_models folder in the package>
321 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
323 - <URL_where_to_fetch_juju_bundle>
326 previous_workdir
= os
.getcwd()
327 except FileNotFoundError
:
328 previous_workdir
= "/app/storage"
331 if kdu_model
.startswith("cs:"):
333 elif kdu_model
.startswith("http"):
337 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
339 os
.chdir(new_workdir
)
341 bundle
= "local:{}".format(kdu_model
)
344 # Raise named exception that the bundle could not be found
347 self
.log
.debug("[install] deploying {}".format(bundle
))
348 await model
.deploy(bundle
)
350 # Get the application
352 # applications = model.applications
353 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
354 for name
in model
.applications
:
355 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
356 application
= model
.applications
[name
]
358 # It's not enough to wait for all units to be active;
359 # the application status needs to be active as well.
360 self
.log
.debug("Waiting for all units to be active...")
361 await model
.block_until(
363 unit
.agent_status
== "idle"
364 and application
.status
in ["active", "unknown"]
365 and unit
.workload_status
in ["active", "unknown"]
366 for unit
in application
.units
370 self
.log
.debug("All units active.")
372 # TODO use asyncio.TimeoutError
373 except concurrent
.futures
._base
.TimeoutError
:
374 os
.chdir(previous_workdir
)
375 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
376 await self
.reset(cluster_uuid
)
379 # Wait for the application to be active
380 if model
.is_connected():
381 self
.log
.debug("[install] Disconnecting model")
382 await model
.disconnect()
384 os
.chdir(previous_workdir
)
387 raise Exception("Unable to install")
389 async def instances_list(self
, cluster_uuid
: str) -> list:
391 returns a list of deployed releases in a cluster
393 :param cluster_uuid: the cluster
402 kdu_model
: str = None,
407 :param cluster_uuid str: The UUID of the cluster to upgrade
408 :param kdu_instance str: The unique name of the KDU instance
409 :param kdu_model str: The name or path of the bundle to upgrade to
410 :param params dict: Key-value pairs of instantiation parameters
412 :return: If successful, reference to the new revision number of the
416 # TODO: Loop through the bundle and upgrade each charm individually
419 The API doesn't have a concept of bundle upgrades, because there are
420 many possible changes: charm revision, disk, number of units, etc.
422 As such, we are only supporting a limited subset of upgrades. We'll
423 upgrade the charm revision but leave storage and scale untouched.
425 Scale changes should happen through OSM constructs, and changes to
426 storage would require a redeployment of the service, at least in this
429 namespace
= self
.get_namespace(cluster_uuid
)
430 model
= await self
.get_model(namespace
, cluster_uuid
=cluster_uuid
)
432 with
open(kdu_model
, "r") as f
:
433 bundle
= yaml
.safe_load(f
)
437 'description': 'Test bundle',
438 'bundle': 'kubernetes',
441 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
444 'password': 'manopw',
445 'root_password': 'osm4u',
448 'series': 'kubernetes'
453 # TODO: This should be returned in an agreed-upon format
454 for name
in bundle
["applications"]:
455 self
.log
.debug(model
.applications
)
456 application
= model
.applications
[name
]
457 self
.log
.debug(application
)
459 path
= bundle
["applications"][name
]["charm"]
462 await application
.upgrade_charm(switch
=path
)
463 except juju
.errors
.JujuError
as ex
:
464 if "already running charm" in str(ex
):
465 # We're already running this version
468 await model
.disconnect()
471 raise MethodNotImplemented()
476 self
, cluster_uuid
: str, kdu_instance
: str, revision
: int = 0,
480 :param cluster_uuid str: The UUID of the cluster to rollback
481 :param kdu_instance str: The unique name of the KDU instance
482 :param revision int: The revision to revert to. If omitted, rolls back
483 the previous upgrade.
485 :return: If successful, returns the revision of active KDU instance,
486 or raises an exception
488 raise MethodNotImplemented()
492 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
493 """Uninstall a KDU instance
495 :param cluster_uuid str: The UUID of the cluster
496 :param kdu_instance str: The unique name of the KDU instance
498 :return: Returns True if successful, or raises an exception
500 if not self
.authenticated
:
501 self
.log
.debug("[uninstall] Connecting to controller")
502 await self
.login(cluster_uuid
)
504 self
.log
.debug("[uninstall] Destroying model")
506 await self
.controller
.destroy_models(kdu_instance
)
508 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
513 async def exec_primitive(
515 cluster_uuid
: str = None,
516 kdu_instance
: str = None,
517 primitive_name
: str = None,
518 timeout
: float = 300,
520 db_dict
: dict = None,
522 """Exec primitive (Juju action)
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
526 :param primitive_name: Name of action that will be executed
527 :param timeout: Timeout for action execution
528 :param params: Dictionary of all the parameters needed for the action
529 :db_dict: Dictionary for any additional data
531 :return: Returns the output of the action
533 if not self
.authenticated
:
534 self
.log
.debug("[exec_primitive] Connecting to controller")
535 await self
.login(cluster_uuid
)
537 if not params
or "application-name" not in params
:
539 "Missing application-name argument, \
540 argument needed for K8s actions"
544 "[exec_primitive] Getting model "
545 "kdu_instance: {}".format(kdu_instance
)
548 model
= await self
.get_model(kdu_instance
, cluster_uuid
)
550 application_name
= params
["application-name"]
551 application
= model
.applications
[application_name
]
553 actions
= await application
.get_actions()
554 if primitive_name
not in actions
:
555 raise K8sException("Primitive {} not found".format(primitive_name
))
558 for u
in application
.units
:
559 if await u
.is_leader_from_status():
564 raise K8sException("No leader unit found to execute action")
566 self
.log
.debug("[exec_primitive] Running action: {}".format(primitive_name
))
567 action
= await unit
.run_action(primitive_name
, **params
)
569 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
570 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
573 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
576 if status
!= "completed":
578 "status is not completed: {} output: {}".format(status
, output
)
583 except Exception as e
:
584 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
585 self
.log
.error(error_msg
)
586 raise K8sException(message
=error_msg
)
590 async def inspect_kdu(self
, kdu_model
: str,) -> dict:
593 Inspects a bundle and returns a dictionary of config parameters and
594 their default values.
596 :param kdu_model str: The name or path of the bundle to inspect.
598 :return: If successful, returns a dictionary of available parameters
599 and their default values.
603 with
open(kdu_model
, "r") as f
:
604 bundle
= yaml
.safe_load(f
)
608 'description': 'Test bundle',
609 'bundle': 'kubernetes',
612 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
615 'password': 'manopw',
616 'root_password': 'osm4u',
619 'series': 'kubernetes'
624 # TODO: This should be returned in an agreed-upon format
625 kdu
= bundle
["applications"]
629 async def help_kdu(self
, kdu_model
: str,) -> str:
632 If available, returns the README of the bundle.
634 :param kdu_model str: The name or path of a bundle
636 :return: If found, returns the contents of the README.
640 files
= ["README", "README.txt", "README.md"]
641 path
= os
.path
.dirname(kdu_model
)
642 for file in os
.listdir(path
):
644 with
open(file, "r") as f
:
650 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str,) -> dict:
651 """Get the status of the KDU
653 Get the current status of the KDU instance.
655 :param cluster_uuid str: The UUID of the cluster
656 :param kdu_instance str: The unique id of the KDU instance
658 :return: Returns a dictionary containing namespace, state, resources,
663 model
= await self
.get_model(
664 self
.get_namespace(cluster_uuid
), cluster_uuid
=cluster_uuid
667 # model = await self.get_model_by_uuid(cluster_uuid)
669 model_status
= await model
.get_status()
670 status
= model_status
.applications
672 for name
in model_status
.applications
:
673 application
= model_status
.applications
[name
]
674 status
[name
] = {"status": application
["status"]["status"]}
676 if model
.is_connected():
677 await model
.disconnect()
681 async def get_services(
682 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
684 """Return a list of services of a kdu_instance"""
686 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
688 config_path
= "/tmp/{}".format(cluster_uuid
)
689 config_file
= "{}/config".format(config_path
)
691 if not os
.path
.exists(config_path
):
692 os
.makedirs(config_path
)
693 with
open(config_file
, "w") as f
:
696 kubectl
= Kubectl(config_file
=config_file
)
697 return kubectl
.get_services(
698 field_selector
="metadata.namespace={}".format(kdu_instance
)
701 async def get_service(
702 self
, cluster_uuid
: str, service_name
: str, namespace
: str
704 """Return data for a specific service inside a namespace"""
706 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
708 config_path
= "/tmp/{}".format(cluster_uuid
)
709 config_file
= "{}/config".format(config_path
)
711 if not os
.path
.exists(config_path
):
712 os
.makedirs(config_path
)
713 with
open(config_file
, "w") as f
:
716 kubectl
= Kubectl(config_file
=config_file
)
718 return kubectl
.get_services(
719 field_selector
="metadata.name={},metadata.namespace={}".format(
720 service_name
, namespace
725 async def add_k8s(self
, cloud_name
: str, credentials
: str,) -> bool:
726 """Add a k8s cloud to Juju
728 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
731 :param cloud_name str: The name of the cloud to add.
732 :param credentials dict: A dictionary representing the output of
733 `kubectl config view --raw`.
735 :returns: True if successful, otherwise raises an exception.
738 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
741 process
= await asyncio
.create_subprocess_exec(
743 stdout
=asyncio
.subprocess
.PIPE
,
744 stderr
=asyncio
.subprocess
.PIPE
,
745 stdin
=asyncio
.subprocess
.PIPE
,
748 # Feed the process the credentials
749 process
.stdin
.write(credentials
.encode("utf-8"))
750 await process
.stdin
.drain()
751 process
.stdin
.close()
753 _stdout
, stderr
= await process
.communicate()
755 return_code
= process
.returncode
757 self
.log
.debug("add-k8s return code: {}".format(return_code
))
760 raise Exception(stderr
)
764 async def add_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
765 """Adds a model to the controller
767 Adds a new model to the Juju controller
769 :param model_name str: The name of the model to add.
770 :returns: The juju.model.Model object of the new model upon success or
773 if not self
.authenticated
:
774 await self
.login(cluster_uuid
)
777 "Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
)
781 if self
.juju_public_key
is not None:
782 model
= await self
.controller
.add_model(
783 model_name
, config
={"authorized-keys": self
.juju_public_key
}
786 model
= await self
.controller
.add_model(model_name
)
787 except Exception as ex
:
789 self
.log
.debug("Caught exception: {}".format(ex
))
795 self
, cloud_name
: str, cluster_uuid
: str, loadbalancer
: bool
797 """Bootstrap a Kubernetes controller
799 Bootstrap a Juju controller inside the Kubernetes cluster
801 :param cloud_name str: The name of the cloud.
802 :param cluster_uuid str: The UUID of the cluster to bootstrap.
803 :param loadbalancer bool: If the controller should use loadbalancer or not.
804 :returns: True upon success or raises an exception.
808 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
811 For public clusters, specify that the controller service is using a
820 "controller-service-type=loadbalancer",
824 "Bootstrapping controller {} in cloud {}".format(cluster_uuid
, cloud_name
)
827 process
= await asyncio
.create_subprocess_exec(
828 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
831 _stdout
, stderr
= await process
.communicate()
833 return_code
= process
.returncode
837 if b
"already exists" not in stderr
:
838 raise Exception(stderr
)
842 async def destroy_controller(self
, cluster_uuid
: str) -> bool:
843 """Destroy a Kubernetes controller
845 Destroy an existing Kubernetes controller.
847 :param cluster_uuid str: The UUID of the cluster to bootstrap.
848 :returns: True upon success or raises an exception.
852 "destroy-controller",
853 "--destroy-all-models",
859 process
= await asyncio
.create_subprocess_exec(
860 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
863 _stdout
, stderr
= await process
.communicate()
865 return_code
= process
.returncode
869 if "already exists" not in stderr
:
870 raise Exception(stderr
)
872 def get_credentials(self
, cluster_uuid
: str) -> str:
874 Get Cluster Kubeconfig
876 k8scluster
= self
.db
.get_one(
877 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
880 self
.db
.encrypt_decrypt_fields(
881 k8scluster
.get("credentials"),
883 ["password", "secret"],
884 schema_version
=k8scluster
["schema_version"],
885 salt
=k8scluster
["_id"],
888 return yaml
.safe_dump(k8scluster
.get("credentials"))
890 def get_config(self
, cluster_uuid
: str,) -> dict:
891 """Get the cluster configuration
893 Gets the configuration of the cluster
895 :param cluster_uuid str: The UUID of the cluster.
896 :return: A dict upon success, or raises an exception.
898 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
899 if os
.path
.exists(cluster_config
):
900 with
open(cluster_config
, "r") as f
:
901 config
= yaml
.safe_load(f
.read())
905 "Unable to locate configuration for cluster {}".format(cluster_uuid
)
908 async def get_model(self
, model_name
: str, cluster_uuid
: str,) -> Model
:
909 """Get a model from the Juju Controller.
911 Note: Model objects returned must call disconnected() before it goes
914 :param model_name str: The name of the model to get
915 :return The juju.model.Model object if found, or None.
917 if not self
.authenticated
:
918 await self
.login(cluster_uuid
)
921 models
= await self
.controller
.list_models()
922 if model_name
in models
:
923 self
.log
.debug("Found model: {}".format(model_name
))
924 model
= await self
.controller
.get_model(model_name
)
927 def get_namespace(self
, cluster_uuid
: str,) -> str:
928 """Get the namespace UUID
929 Gets the namespace's unique name
931 :param cluster_uuid str: The UUID of the cluster
932 :returns: The namespace UUID, or raises an exception
934 config
= self
.get_config(cluster_uuid
)
936 # Make sure the name is in the config
937 if "namespace" not in config
:
938 raise Exception("Namespace not found.")
940 # TODO: We want to make sure this is unique to the cluster, in case
941 # the cluster is being reused.
942 # Consider pre/appending the cluster id to the namespace string
943 return config
["namespace"]
945 async def has_model(self
, model_name
: str) -> bool:
946 """Check if a model exists in the controller
948 Checks to see if a model exists in the connected Juju controller.
950 :param model_name str: The name of the model
951 :return: A boolean indicating if the model exists
953 models
= await self
.controller
.list_models()
955 if model_name
in models
:
959 def is_local_k8s(self
, credentials
: str,) -> bool:
960 """Check if a cluster is local
962 Checks if a cluster is running in the local host
964 :param credentials dict: A dictionary containing the k8s credentials
965 :returns: A boolean if the cluster is running locally
968 creds
= yaml
.safe_load(credentials
)
970 if creds
and os
.getenv("OSMLCM_VCA_APIPROXY"):
971 for cluster
in creds
["clusters"]:
972 if "server" in cluster
["cluster"]:
973 if os
.getenv("OSMLCM_VCA_APIPROXY") in cluster
["cluster"]["server"]:
978 async def login(self
, cluster_uuid
):
979 """Login to the Juju controller."""
981 if self
.authenticated
:
984 self
.connecting
= True
986 # Test: Make sure we have the credentials loaded
987 config
= self
.get_config(cluster_uuid
)
989 self
.juju_endpoint
= config
["endpoint"]
990 self
.juju_user
= config
["username"]
991 self
.juju_secret
= config
["secret"]
992 self
.juju_ca_cert
= config
["cacert"]
993 self
.juju_public_key
= None
995 self
.controller
= Controller()
999 "Connecting to controller... ws://{} as {}/{}".format(
1000 self
.juju_endpoint
, self
.juju_user
, self
.juju_secret
,
1004 await self
.controller
.connect(
1005 endpoint
=self
.juju_endpoint
,
1006 username
=self
.juju_user
,
1007 password
=self
.juju_secret
,
1008 cacert
=self
.juju_ca_cert
,
1010 self
.authenticated
= True
1011 self
.log
.debug("JujuApi: Logged into controller")
1012 except Exception as ex
:
1014 self
.log
.debug("Caught exception: {}".format(ex
))
1017 self
.log
.fatal("VCA credentials not configured.")
1018 self
.authenticated
= False
1020 async def logout(self
):
1021 """Logout of the Juju controller."""
1022 self
.log
.debug("[logout]")
1023 if not self
.authenticated
:
1026 for model
in self
.models
:
1027 self
.log
.debug("Logging out of model {}".format(model
))
1028 await self
.models
[model
].disconnect()
1031 self
.log
.debug("Disconnecting controller {}".format(self
.controller
))
1032 await self
.controller
.disconnect()
1033 self
.controller
= None
1035 self
.authenticated
= False
1037 async def remove_cloud(self
, cloud_name
: str,) -> bool:
1038 """Remove a k8s cloud from Juju
1040 Removes a Kubernetes cloud from Juju.
1042 :param cloud_name str: The name of the cloud to add.
1044 :returns: True if successful, otherwise raises an exception.
1047 # Remove the bootstrapped controller
1048 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
1049 process
= await asyncio
.create_subprocess_exec(
1050 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1053 _stdout
, stderr
= await process
.communicate()
1055 return_code
= process
.returncode
1058 raise Exception(stderr
)
1060 # Remove the cloud from the local config
1061 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1062 process
= await asyncio
.create_subprocess_exec(
1063 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1066 _stdout
, stderr
= await process
.communicate()
1068 return_code
= process
.returncode
1071 raise Exception(stderr
)
1075 async def set_config(self
, cluster_uuid
: str, config
: dict,) -> bool:
1076 """Save the cluster configuration
1078 Saves the cluster information to the file store
1080 :param cluster_uuid str: The UUID of the cluster
1081 :param config dict: A dictionary containing the cluster configuration
1082 :returns: Boolean upon success or raises an exception.
1085 cluster_config
= "{}/{}.yaml".format(self
.fs
.path
, cluster_uuid
)
1086 if not os
.path
.exists(cluster_config
):
1087 self
.log
.debug("Writing config to {}".format(cluster_config
))
1088 with
open(cluster_config
, "w") as f
:
1089 f
.write(yaml
.dump(config
, Dumper
=yaml
.Dumper
))