1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from juju
.controller
import Controller
22 from juju
.model
import Model
23 from n2vc
.exceptions
import K8sException
24 from n2vc
.k8s_conn
import K8sConnector
25 from n2vc
.kubectl
import Kubectl
26 from .exceptions
import MethodNotImplemented
, N2VCNotFound
29 # from juju.bundle import BundleHandler
32 # from .vnf import N2VC
33 class K8sJujuConnector(K8sConnector
):
38 kubectl_command
: str = "/usr/bin/kubectl",
39 juju_command
: str = "/usr/bin/juju",
45 :param kubectl_command: path to kubectl executable
46 :param helm_command: path to helm executable
47 :param fs: file system for kubernetes and helm configuration
52 K8sConnector
.__init
__(
53 self
, db
, log
=log
, on_update_db
=on_update_db
,
57 self
.log
.debug("Initializing K8S Juju connector")
59 self
.juju_command
= juju_command
60 self
.juju_public_key
= None
62 self
.log
.debug("K8S Juju connector initialized")
63 # TODO: Remove these commented lines:
64 # self.authenticated = False
66 # self.juju_secret = ""
73 namespace
: str = "kube-system",
74 reuse_cluster_uuid
: str = None,
77 It prepares a given K8s cluster environment to run Juju bundles.
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
91 Bootstrapping cannot be done, by design, through the API. We need to
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
104 - Record it in the database
106 3. Connect to the Juju controller for this cloud
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
127 # This is a new cluster, so bootstrap it
129 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
131 # Is a local k8s cluster?
132 localk8s
= self
.is_local_k8s(k8s_creds
)
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer
= False if localk8s
else True
137 # Name the new k8s cloud
138 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
140 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
141 await self
.add_k8s(k8s_cloud
, k8s_creds
)
143 # Bootstrap Juju controller
144 self
.log
.debug("Bootstrapping...")
145 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
146 self
.log
.debug("Bootstrap done.")
148 # Get the controller information
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self
.log
.debug("Getting controller endpoints")
153 with
open(os
.path
.expanduser("~/.local/share/juju/controllers.yaml")) as f
:
154 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
155 controller
= controllers
["controllers"][cluster_uuid
]
156 endpoints
= controller
["api-endpoints"]
157 juju_endpoint
= endpoints
[0]
158 juju_ca_cert
= controller
["ca-cert"]
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self
.log
.debug("Getting accounts")
163 with
open(os
.path
.expanduser("~/.local/share/juju/accounts.yaml")) as f
:
164 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
165 controller
= controllers
["controllers"][cluster_uuid
]
167 juju_user
= controller
["user"]
168 juju_secret
= controller
["password"]
171 "endpoint": juju_endpoint
,
172 "username": juju_user
,
173 "secret": juju_secret
,
174 "cacert": juju_ca_cert
,
175 "loadbalancer": loadbalancer
,
178 # Store the cluster configuration so it
179 # can be used for subsequent calls
180 self
.log
.debug("Setting config")
181 await self
.set_config(cluster_uuid
, config
)
184 controller
= await self
.get_controller(cluster_uuid
)
185 await controller
.disconnect()
187 # TODO: Remove these commented lines
188 # raise Exception("EOL")
189 # self.juju_public_key = None
190 # Login to the k8s cluster
191 # if not self.authenticated:
192 # await self.login(cluster_uuid)
194 # We're creating a new cluster
195 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
196 # cluster_uuid=cluster_uuid))
197 # model = await self.get_model(
198 # self.get_namespace(cluster_uuid),
199 # cluster_uuid=cluster_uuid
202 # Disconnect from the model
203 # if model and model.is_connected():
204 # await model.disconnect()
206 return cluster_uuid
, True
208 """Repo Management"""
211 self
, name
: str, url
: str, _type
: str = "charm",
213 raise MethodNotImplemented()
215 async def repo_list(self
):
216 raise MethodNotImplemented()
218 async def repo_remove(
221 raise MethodNotImplemented()
223 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
225 Returns None as currently add_repo is not implemented
232 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
236 Resets the Kubernetes cluster by removing the model that represents it.
238 :param cluster_uuid str: The UUID of the cluster to reset
239 :return: Returns True if successful or raises an exception.
244 # Remove k8scluster from database
245 self
.log
.debug("[reset] Removing k8scluster from juju database")
246 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
248 for k
in juju_db
["k8sclusters"]:
249 if k
["_id"] == cluster_uuid
:
250 juju_db
["k8sclusters"].remove(k
)
253 q_filter
={"_id": "juju"},
254 update_dict
={"k8sclusters": juju_db
["k8sclusters"]},
258 # Destroy the controller (via CLI)
259 self
.log
.debug("[reset] Destroying controller")
260 await self
.destroy_controller(cluster_uuid
)
262 self
.log
.debug("[reset] Removing k8s cloud")
263 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
264 await self
.remove_cloud(k8s_cloud
)
266 except Exception as ex
:
267 self
.log
.debug("Caught exception during reset: {}".format(ex
))
269 # TODO: Remove these commented lines
270 # if not self.authenticated:
271 # await self.login(cluster_uuid)
273 # if self.controller.is_connected():
274 # # Destroy the model
275 # namespace = self.get_namespace(cluster_uuid)
276 # if await self.has_model(namespace):
277 # self.log.debug("[reset] Destroying model")
278 # await self.controller.destroy_model(namespace, destroy_storage=True)
280 # # Disconnect from the controller
281 # self.log.debug("[reset] Disconnecting controller")
282 # await self.logout()
291 timeout
: float = 300,
293 db_dict
: dict = None,
294 kdu_name
: str = None,
295 namespace
: str = None,
299 :param cluster_uuid str: The UUID of the cluster to install to
300 :param kdu_model str: The name or path of a bundle to install
301 :param atomic bool: If set, waits until the model is active and resets
302 the cluster on failure.
303 :param timeout int: The time, in seconds, to wait for the install
305 :param params dict: Key-value pairs of instantiation parameters
306 :param kdu_name: Name of the KDU instance to be installed
307 :param namespace: K8s namespace to use for the KDU instance
309 :return: If successful, returns ?
312 controller
= await self
.get_controller(cluster_uuid
)
315 # Get or create the model, based on the NS
318 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
320 kdu_instance
= db_dict
["filter"]["_id"]
322 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
324 # Create the new model
325 self
.log
.debug("Adding model: {}".format(kdu_instance
))
326 model
= await self
.add_model(
327 kdu_instance
, cluster_uuid
=cluster_uuid
, controller
=controller
331 # TODO: Instantiation parameters
334 "Juju bundle that models the KDU, in any of the following ways:
335 - <juju-repo>/<juju-bundle>
336 - <juju-bundle folder under k8s_models folder in the package>
337 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
339 - <URL_where_to_fetch_juju_bundle>
342 previous_workdir
= os
.getcwd()
343 except FileNotFoundError
:
344 previous_workdir
= "/app/storage"
347 if kdu_model
.startswith("cs:"):
349 elif kdu_model
.startswith("http"):
353 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
355 os
.chdir(new_workdir
)
357 bundle
= "local:{}".format(kdu_model
)
360 # Raise named exception that the bundle could not be found
363 self
.log
.debug("[install] deploying {}".format(bundle
))
364 await model
.deploy(bundle
)
366 # Get the application
368 # applications = model.applications
369 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
370 for name
in model
.applications
:
371 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
372 application
= model
.applications
[name
]
374 # It's not enough to wait for all units to be active;
375 # the application status needs to be active as well.
376 self
.log
.debug("Waiting for all units to be active...")
377 await model
.block_until(
379 unit
.agent_status
== "idle"
380 and application
.status
in ["active", "unknown"]
381 and unit
.workload_status
in ["active", "unknown"]
382 for unit
in application
.units
386 self
.log
.debug("All units active.")
388 # TODO use asyncio.TimeoutError
389 except concurrent
.futures
._base
.TimeoutError
:
390 os
.chdir(previous_workdir
)
391 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
392 await self
.reset(cluster_uuid
)
395 # Wait for the application to be active
396 if model
.is_connected():
397 self
.log
.debug("[install] Disconnecting model")
398 await model
.disconnect()
399 await controller
.disconnect()
400 os
.chdir(previous_workdir
)
403 raise Exception("Unable to install")
405 async def instances_list(self
, cluster_uuid
: str) -> list:
407 returns a list of deployed releases in a cluster
409 :param cluster_uuid: the cluster
418 kdu_model
: str = None,
423 :param cluster_uuid str: The UUID of the cluster to upgrade
424 :param kdu_instance str: The unique name of the KDU instance
425 :param kdu_model str: The name or path of the bundle to upgrade to
426 :param params dict: Key-value pairs of instantiation parameters
428 :return: If successful, reference to the new revision number of the
432 # TODO: Loop through the bundle and upgrade each charm individually
435 The API doesn't have a concept of bundle upgrades, because there are
436 many possible changes: charm revision, disk, number of units, etc.
438 As such, we are only supporting a limited subset of upgrades. We'll
439 upgrade the charm revision but leave storage and scale untouched.
441 Scale changes should happen through OSM constructs, and changes to
442 storage would require a redeployment of the service, at least in this
445 raise MethodNotImplemented()
446 # TODO: Remove these commented lines
448 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
451 # namespace = self.get_namespace(cluster_uuid)
452 # controller = await self.get_controller(cluster_uuid)
455 # if namespace not in await controller.list_models():
456 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
458 # model = await controller.get_model(namespace)
459 # with open(kdu_model, "r") as f:
460 # bundle = yaml.safe_load(f)
464 # 'description': 'Test bundle',
465 # 'bundle': 'kubernetes',
468 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
471 # 'password': 'manopw',
472 # 'root_password': 'osm4u',
475 # 'series': 'kubernetes'
480 # # TODO: This should be returned in an agreed-upon format
481 # for name in bundle["applications"]:
482 # self.log.debug(model.applications)
483 # application = model.applications[name]
484 # self.log.debug(application)
486 # path = bundle["applications"][name]["charm"]
489 # await application.upgrade_charm(switch=path)
490 # except juju.errors.JujuError as ex:
491 # if "already running charm" in str(ex):
492 # # We're already running this version
496 # await model.disconnect()
497 # await controller.disconnect()
503 self
, cluster_uuid
: str, kdu_instance
: str, revision
: int = 0,
507 :param cluster_uuid str: The UUID of the cluster to rollback
508 :param kdu_instance str: The unique name of the KDU instance
509 :param revision int: The revision to revert to. If omitted, rolls back
510 the previous upgrade.
512 :return: If successful, returns the revision of active KDU instance,
513 or raises an exception
515 raise MethodNotImplemented()
519 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
520 """Uninstall a KDU instance
522 :param cluster_uuid str: The UUID of the cluster
523 :param kdu_instance str: The unique name of the KDU instance
525 :return: Returns True if successful, or raises an exception
528 controller
= await self
.get_controller(cluster_uuid
)
530 self
.log
.debug("[uninstall] Destroying model")
532 await controller
.destroy_models(kdu_instance
)
534 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
535 await controller
.disconnect()
538 # TODO: Remove these commented lines
539 # if not self.authenticated:
540 # self.log.debug("[uninstall] Connecting to controller")
541 # await self.login(cluster_uuid)
543 async def exec_primitive(
545 cluster_uuid
: str = None,
546 kdu_instance
: str = None,
547 primitive_name
: str = None,
548 timeout
: float = 300,
550 db_dict
: dict = None,
552 """Exec primitive (Juju action)
554 :param cluster_uuid str: The UUID of the cluster
555 :param kdu_instance str: The unique name of the KDU instance
556 :param primitive_name: Name of action that will be executed
557 :param timeout: Timeout for action execution
558 :param params: Dictionary of all the parameters needed for the action
559 :db_dict: Dictionary for any additional data
561 :return: Returns the output of the action
564 controller
= await self
.get_controller(cluster_uuid
)
566 if not params
or "application-name" not in params
:
568 "Missing application-name argument, \
569 argument needed for K8s actions"
573 "[exec_primitive] Getting model "
574 "kdu_instance: {}".format(kdu_instance
)
577 model
= await self
.get_model(kdu_instance
, controller
=controller
)
579 application_name
= params
["application-name"]
580 application
= model
.applications
[application_name
]
582 actions
= await application
.get_actions()
583 if primitive_name
not in actions
:
584 raise K8sException("Primitive {} not found".format(primitive_name
))
587 for u
in application
.units
:
588 if await u
.is_leader_from_status():
593 raise K8sException("No leader unit found to execute action")
595 self
.log
.debug("[exec_primitive] Running action: {}".format(primitive_name
))
596 action
= await unit
.run_action(primitive_name
, **params
)
598 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
599 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
602 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
605 if status
!= "completed":
607 "status: {}, output: {}".format(status
, output
)
612 except Exception as e
:
613 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
614 self
.log
.error(error_msg
)
615 raise K8sException(message
=error_msg
)
617 await controller
.disconnect()
618 # TODO: Remove these commented lines:
619 # if not self.authenticated:
620 # self.log.debug("[exec_primitive] Connecting to controller")
621 # await self.login(cluster_uuid)
625 async def inspect_kdu(self
, kdu_model
: str,) -> dict:
628 Inspects a bundle and returns a dictionary of config parameters and
629 their default values.
631 :param kdu_model str: The name or path of the bundle to inspect.
633 :return: If successful, returns a dictionary of available parameters
634 and their default values.
638 with
open(kdu_model
, "r") as f
:
639 bundle
= yaml
.safe_load(f
)
643 'description': 'Test bundle',
644 'bundle': 'kubernetes',
647 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
650 'password': 'manopw',
651 'root_password': 'osm4u',
654 'series': 'kubernetes'
659 # TODO: This should be returned in an agreed-upon format
660 kdu
= bundle
["applications"]
664 async def help_kdu(self
, kdu_model
: str,) -> str:
667 If available, returns the README of the bundle.
669 :param kdu_model str: The name or path of a bundle
671 :return: If found, returns the contents of the README.
675 files
= ["README", "README.txt", "README.md"]
676 path
= os
.path
.dirname(kdu_model
)
677 for file in os
.listdir(path
):
679 with
open(file, "r") as f
:
685 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str,) -> dict:
686 """Get the status of the KDU
688 Get the current status of the KDU instance.
690 :param cluster_uuid str: The UUID of the cluster
691 :param kdu_instance str: The unique id of the KDU instance
693 :return: Returns a dictionary containing namespace, state, resources,
697 controller
= await self
.get_controller(cluster_uuid
)
698 model
= await self
.get_model(kdu_instance
, controller
=controller
)
700 model_status
= await model
.get_status()
701 status
= model_status
.applications
703 for name
in model_status
.applications
:
704 application
= model_status
.applications
[name
]
705 status
[name
] = {"status": application
["status"]["status"]}
707 await model
.disconnect()
708 await controller
.disconnect()
712 async def get_services(
713 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
715 """Return a list of services of a kdu_instance"""
717 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
719 config_path
= "/tmp/{}".format(cluster_uuid
)
720 config_file
= "{}/config".format(config_path
)
722 if not os
.path
.exists(config_path
):
723 os
.makedirs(config_path
)
724 with
open(config_file
, "w") as f
:
727 kubectl
= Kubectl(config_file
=config_file
)
728 return kubectl
.get_services(
729 field_selector
="metadata.namespace={}".format(kdu_instance
)
732 async def get_service(
733 self
, cluster_uuid
: str, service_name
: str, namespace
: str
735 """Return data for a specific service inside a namespace"""
737 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
739 config_path
= "/tmp/{}".format(cluster_uuid
)
740 config_file
= "{}/config".format(config_path
)
742 if not os
.path
.exists(config_path
):
743 os
.makedirs(config_path
)
744 with
open(config_file
, "w") as f
:
747 kubectl
= Kubectl(config_file
=config_file
)
749 return kubectl
.get_services(
750 field_selector
="metadata.name={},metadata.namespace={}".format(
751 service_name
, namespace
756 async def add_k8s(self
, cloud_name
: str, credentials
: str,) -> bool:
757 """Add a k8s cloud to Juju
759 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
762 :param cloud_name str: The name of the cloud to add.
763 :param credentials dict: A dictionary representing the output of
764 `kubectl config view --raw`.
766 :returns: True if successful, otherwise raises an exception.
769 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
772 process
= await asyncio
.create_subprocess_exec(
774 stdout
=asyncio
.subprocess
.PIPE
,
775 stderr
=asyncio
.subprocess
.PIPE
,
776 stdin
=asyncio
.subprocess
.PIPE
,
779 # Feed the process the credentials
780 process
.stdin
.write(credentials
.encode("utf-8"))
781 await process
.stdin
.drain()
782 process
.stdin
.close()
784 _stdout
, stderr
= await process
.communicate()
786 return_code
= process
.returncode
788 self
.log
.debug("add-k8s return code: {}".format(return_code
))
791 raise Exception(stderr
)
796 self
, model_name
: str, cluster_uuid
: str, controller
: Controller
798 """Adds a model to the controller
800 Adds a new model to the Juju controller
802 :param model_name str: The name of the model to add.
803 :param cluster_uuid str: ID of the cluster.
804 :param controller: Controller object in which the model will be added
805 :returns: The juju.model.Model object of the new model upon success or
810 "Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
)
814 if self
.juju_public_key
is not None:
815 model
= await controller
.add_model(
816 model_name
, config
={"authorized-keys": self
.juju_public_key
}
819 model
= await controller
.add_model(model_name
)
820 except Exception as ex
:
822 self
.log
.debug("Caught exception: {}".format(ex
))
828 self
, cloud_name
: str, cluster_uuid
: str, loadbalancer
: bool
830 """Bootstrap a Kubernetes controller
832 Bootstrap a Juju controller inside the Kubernetes cluster
834 :param cloud_name str: The name of the cloud.
835 :param cluster_uuid str: The UUID of the cluster to bootstrap.
836 :param loadbalancer bool: If the controller should use loadbalancer or not.
837 :returns: True upon success or raises an exception.
841 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
844 For public clusters, specify that the controller service is using a
853 "controller-service-type=loadbalancer",
857 "Bootstrapping controller {} in cloud {}".format(cluster_uuid
, cloud_name
)
860 process
= await asyncio
.create_subprocess_exec(
861 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
864 _stdout
, stderr
= await process
.communicate()
866 return_code
= process
.returncode
870 if b
"already exists" not in stderr
:
871 raise Exception(stderr
)
875 async def destroy_controller(self
, cluster_uuid
: str) -> bool:
876 """Destroy a Kubernetes controller
878 Destroy an existing Kubernetes controller.
880 :param cluster_uuid str: The UUID of the cluster to bootstrap.
881 :returns: True upon success or raises an exception.
885 "destroy-controller",
886 "--destroy-all-models",
892 process
= await asyncio
.create_subprocess_exec(
893 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
896 _stdout
, stderr
= await process
.communicate()
898 return_code
= process
.returncode
902 if "already exists" not in stderr
:
903 raise Exception(stderr
)
905 def get_credentials(self
, cluster_uuid
: str) -> str:
907 Get Cluster Kubeconfig
909 k8scluster
= self
.db
.get_one(
910 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
913 self
.db
.encrypt_decrypt_fields(
914 k8scluster
.get("credentials"),
916 ["password", "secret"],
917 schema_version
=k8scluster
["schema_version"],
918 salt
=k8scluster
["_id"],
921 return yaml
.safe_dump(k8scluster
.get("credentials"))
923 def get_config(self
, cluster_uuid
: str,) -> dict:
924 """Get the cluster configuration
926 Gets the configuration of the cluster
928 :param cluster_uuid str: The UUID of the cluster.
929 :return: A dict upon success, or raises an exception.
932 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
934 for k
in juju_db
["k8sclusters"]:
935 if k
["_id"] == cluster_uuid
:
937 self
.db
.encrypt_decrypt_fields(
940 ["secret", "cacert"],
941 schema_version
="1.1",
947 "Unable to locate configuration for cluster {}".format(cluster_uuid
)
951 async def get_model(self
, model_name
: str, controller
: Controller
) -> Model
:
952 """Get a model from the Juju Controller.
954 Note: Model objects returned must call disconnected() before it goes
957 :param model_name str: The name of the model to get
958 :param controller Controller: Controller object
959 :return The juju.model.Model object if found, or None.
962 models
= await controller
.list_models()
963 if model_name
not in models
:
964 raise N2VCNotFound("Model {} not found".format(model_name
))
965 self
.log
.debug("Found model: {}".format(model_name
))
966 return await controller
.get_model(model_name
)
968 def get_namespace(self
, cluster_uuid
: str,) -> str:
969 """Get the namespace UUID
970 Gets the namespace's unique name
972 :param cluster_uuid str: The UUID of the cluster
973 :returns: The namespace UUID, or raises an exception
975 config
= self
.get_config(cluster_uuid
)
977 # Make sure the name is in the config
978 if "namespace" not in config
:
979 raise Exception("Namespace not found.")
981 # TODO: We want to make sure this is unique to the cluster, in case
982 # the cluster is being reused.
983 # Consider pre/appending the cluster id to the namespace string
984 return config
["namespace"]
986 # TODO: Remove these lines of code
987 # async def has_model(self, model_name: str) -> bool:
988 # """Check if a model exists in the controller
990 # Checks to see if a model exists in the connected Juju controller.
992 # :param model_name str: The name of the model
993 # :return: A boolean indicating if the model exists
995 # models = await self.controller.list_models()
997 # if model_name in models:
1001 def is_local_k8s(self
, credentials
: str,) -> bool:
1002 """Check if a cluster is local
1004 Checks if a cluster is running in the local host
1006 :param credentials dict: A dictionary containing the k8s credentials
1007 :returns: A boolean if the cluster is running locally
1010 creds
= yaml
.safe_load(credentials
)
1012 if creds
and os
.getenv("OSMLCM_VCA_APIPROXY"):
1013 for cluster
in creds
["clusters"]:
1014 if "server" in cluster
["cluster"]:
1015 if os
.getenv("OSMLCM_VCA_APIPROXY") in cluster
["cluster"]["server"]:
1020 async def get_controller(self
, cluster_uuid
):
1021 """Login to the Juju controller."""
1023 config
= self
.get_config(cluster_uuid
)
1025 juju_endpoint
= config
["endpoint"]
1026 juju_user
= config
["username"]
1027 juju_secret
= config
["secret"]
1028 juju_ca_cert
= config
["cacert"]
1030 controller
= Controller()
1034 "Connecting to controller... ws://{} as {}".format(
1035 juju_endpoint
, juju_user
,
1039 await controller
.connect(
1040 endpoint
=juju_endpoint
,
1042 password
=juju_secret
,
1043 cacert
=juju_ca_cert
,
1045 self
.log
.debug("JujuApi: Logged into controller")
1047 except Exception as ex
:
1049 self
.log
.debug("Caught exception: {}".format(ex
))
1051 self
.log
.fatal("VCA credentials not configured.")
1053 # TODO: Remove these commented lines
1054 # self.authenticated = False
1055 # if self.authenticated:
1058 # self.connecting = True
1059 # juju_public_key = None
1060 # self.authenticated = True
1061 # Test: Make sure we have the credentials loaded
1062 # async def logout(self):
1063 # """Logout of the Juju controller."""
1064 # self.log.debug("[logout]")
1065 # if not self.authenticated:
1068 # for model in self.models:
1069 # self.log.debug("Logging out of model {}".format(model))
1070 # await self.models[model].disconnect()
1072 # if self.controller:
1073 # self.log.debug("Disconnecting controller {}".format(self.controller))
1074 # await self.controller.disconnect()
1075 # self.controller = None
1077 # self.authenticated = False
1079 async def remove_cloud(self
, cloud_name
: str,) -> bool:
1080 """Remove a k8s cloud from Juju
1082 Removes a Kubernetes cloud from Juju.
1084 :param cloud_name str: The name of the cloud to add.
1086 :returns: True if successful, otherwise raises an exception.
1089 # Remove the bootstrapped controller
1090 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
1091 process
= await asyncio
.create_subprocess_exec(
1092 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1095 _stdout
, stderr
= await process
.communicate()
1097 return_code
= process
.returncode
1100 raise Exception(stderr
)
1102 # Remove the cloud from the local config
1103 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1104 process
= await asyncio
.create_subprocess_exec(
1105 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1108 _stdout
, stderr
= await process
.communicate()
1110 return_code
= process
.returncode
1113 raise Exception(stderr
)
1117 async def set_config(self
, cluster_uuid
: str, config
: dict,) -> bool:
1118 """Save the cluster configuration
1120 Saves the cluster information to the Mongo database
1122 :param cluster_uuid str: The UUID of the cluster
1123 :param config dict: A dictionary containing the cluster configuration
1126 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
1128 k8sclusters
= juju_db
["k8sclusters"] if "k8sclusters" in juju_db
else []
1129 self
.db
.encrypt_decrypt_fields(
1132 ["secret", "cacert"],
1133 schema_version
="1.1",
1136 k8sclusters
.append({"_id": cluster_uuid
, "config": config
})
1139 q_filter
={"_id": "juju"},
1140 update_dict
={"k8sclusters": k8sclusters
},