1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from n2vc
.exceptions
import K8sException
, N2VCBadArgumentsException
22 from n2vc
.k8s_conn
import K8sConnector
23 from n2vc
.kubectl
import Kubectl
24 from .exceptions
import MethodNotImplemented
25 from n2vc
.utils
import base64_to_cacert
26 from n2vc
.libjuju
import Libjuju
29 # from juju.bundle import BundleHandler
32 # from .vnf import N2VC
33 class K8sJujuConnector(K8sConnector
):
38 kubectl_command
: str = "/usr/bin/kubectl",
39 juju_command
: str = "/usr/bin/juju",
43 vca_config
: dict = None,
46 :param fs: file system for kubernetes and helm configuration
47 :param db: Database object
48 :param kubectl_command: path to kubectl executable
49 :param helm_command: path to helm executable
51 :param: loop: Asyncio loop
55 K8sConnector
.__init
__(
59 on_update_db
=on_update_db
,
63 self
.loop
= loop
or asyncio
.get_event_loop()
64 self
.log
.debug("Initializing K8S Juju connector")
66 required_vca_config
= [
72 if not vca_config
or not all(k
in vca_config
for k
in required_vca_config
):
73 raise N2VCBadArgumentsException(
74 message
="Missing arguments in vca_config: {}".format(vca_config
),
75 bad_args
=required_vca_config
,
77 port
= vca_config
["port"] if "port" in vca_config
else 17070
78 url
= "{}:{}".format(vca_config
["host"], port
)
79 enable_os_upgrade
= vca_config
.get("enable_os_upgrade", True)
80 apt_mirror
= vca_config
.get("apt_mirror", None)
81 username
= vca_config
["user"]
82 secret
= vca_config
["secret"]
83 ca_cert
= base64_to_cacert(vca_config
["ca_cert"])
85 self
.libjuju
= Libjuju(
87 api_proxy
=None, # Not needed for k8s charms
88 enable_os_upgrade
=enable_os_upgrade
,
89 apt_mirror
=apt_mirror
,
97 self
.log
.debug("K8S Juju connector initialized")
98 # TODO: Remove these commented lines:
99 # self.authenticated = False
101 # self.juju_secret = ""
108 namespace
: str = "kube-system",
109 reuse_cluster_uuid
: str = None,
112 It prepares a given K8s cluster environment to run Juju bundles.
114 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
116 :param namespace: optional namespace to be used for juju. By default,
117 'kube-system' will be used
118 :param reuse_cluster_uuid: existing cluster uuid for reuse
119 :return: uuid of the K8s cluster and True if connector has installed some
120 software in the cluster
121 (on error, an exception will be raised)
126 # Bootstrapping cannot be done, by design, through the API. We need to
133 # 1. Has the environment already been bootstrapped?
134 # - Check the database to see if we have a record for this env
136 # 2. If this is a new env, create it
137 # - Add the k8s cloud to Juju
139 # - Record it in the database
141 # 3. Connect to the Juju controller for this cloud
144 # cluster_uuid = reuse_cluster_uuid
145 # if not cluster_uuid:
146 # cluster_uuid = str(uuid4())
148 ##################################################
149 # TODO: Pull info from db based on the namespace #
150 ##################################################
152 ###################################################
153 # TODO: Make it idempotent, calling add-k8s and #
154 # bootstrap whenever reuse_cluster_uuid is passed #
156 # `init_env` is called to initialize the K8s #
157 # cluster for juju. If this initialization fails, #
158 # it can be called again by LCM with the param #
159 # reuse_cluster_uuid, e.g. to try to fix it. #
160 ###################################################
162 # This is a new cluster, so bootstrap it
164 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
166 # Is a local k8s cluster?
167 # localk8s = self.is_local_k8s(k8s_creds)
169 # If the k8s is external, the juju controller needs a loadbalancer
170 # loadbalancer = False if localk8s else True
172 # Name the new k8s cloud
173 # k8s_cloud = "k8s-{}".format(cluster_uuid)
175 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
176 # await self.add_k8s(k8s_cloud, k8s_creds)
178 # Bootstrap Juju controller
179 # self.log.debug("Bootstrapping...")
180 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
181 # self.log.debug("Bootstrap done.")
183 # Get the controller information
185 # Parse ~/.local/share/juju/controllers.yaml
186 # controllers.testing.api-endpoints|ca-cert|uuid
187 # self.log.debug("Getting controller endpoints")
188 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
189 # controllers = yaml.load(f, Loader=yaml.Loader)
190 # controller = controllers["controllers"][cluster_uuid]
191 # endpoints = controller["api-endpoints"]
192 # juju_endpoint = endpoints[0]
193 # juju_ca_cert = controller["ca-cert"]
195 # Parse ~/.local/share/juju/accounts
196 # controllers.testing.user|password
197 # self.log.debug("Getting accounts")
198 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
199 # controllers = yaml.load(f, Loader=yaml.Loader)
200 # controller = controllers["controllers"][cluster_uuid]
202 # juju_user = controller["user"]
203 # juju_secret = controller["password"]
206 # "endpoint": juju_endpoint,
207 # "username": juju_user,
208 # "secret": juju_secret,
209 # "cacert": juju_ca_cert,
210 # "loadbalancer": loadbalancer,
213 # Store the cluster configuration so it
214 # can be used for subsequent calls
216 kubecfg
= tempfile
.NamedTemporaryFile()
217 with
open(kubecfg
.name
, "w") as kubecfg_file
:
218 kubecfg_file
.write(k8s_creds
)
219 kubectl
= Kubectl(config_file
=kubecfg
.name
)
220 configuration
= kubectl
.get_configuration()
221 default_storage_class
= kubectl
.get_default_storage_class()
222 await self
.libjuju
.add_k8s(
224 configuration
=configuration
,
225 storage_class
=default_storage_class
,
226 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
228 # self.log.debug("Setting config")
229 # await self.set_config(cluster_uuid, config)
232 # controller = await self.get_controller(cluster_uuid)
233 # await controller.disconnect()
235 # TODO: Remove these commented lines
236 # raise Exception("EOL")
237 # self.juju_public_key = None
238 # Login to the k8s cluster
239 # if not self.authenticated:
240 # await self.login(cluster_uuid)
242 # We're creating a new cluster
243 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
244 # cluster_uuid=cluster_uuid))
245 # model = await self.get_model(
246 # self.get_namespace(cluster_uuid),
247 # cluster_uuid=cluster_uuid
250 # Disconnect from the model
251 # if model and model.is_connected():
252 # await model.disconnect()
254 return cluster_uuid
, True
256 """Repo Management"""
262 _type
: str = "charm",
264 raise MethodNotImplemented()
266 async def repo_list(self
):
267 raise MethodNotImplemented()
269 async def repo_remove(
273 raise MethodNotImplemented()
275 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
277 Returns None as currently add_repo is not implemented
284 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
288 Resets the Kubernetes cluster by removing the model that represents it.
290 :param cluster_uuid str: The UUID of the cluster to reset
291 :return: Returns True if successful or raises an exception.
296 # Remove k8scluster from database
297 # self.log.debug("[reset] Removing k8scluster from juju database")
298 # juju_db = self.db.get_one("admin", {"_id": "juju"})
300 # for k in juju_db["k8sclusters"]:
301 # if k["_id"] == cluster_uuid:
302 # juju_db["k8sclusters"].remove(k)
305 # q_filter={"_id": "juju"},
306 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
310 # Destroy the controller (via CLI)
311 # self.log.debug("[reset] Destroying controller")
312 # await self.destroy_controller(cluster_uuid)
314 self
.log
.debug("[reset] Removing k8s cloud")
315 # k8s_cloud = "k8s-{}".format(cluster_uuid)
316 # await self.remove_cloud(k8s_cloud)
317 await self
.libjuju
.remove_cloud(cluster_uuid
)
319 except Exception as e
:
320 self
.log
.debug("Caught exception during reset: {}".format(e
))
323 # TODO: Remove these commented lines
324 # if not self.authenticated:
325 # await self.login(cluster_uuid)
327 # if self.controller.is_connected():
328 # # Destroy the model
329 # namespace = self.get_namespace(cluster_uuid)
330 # if await self.has_model(namespace):
331 # self.log.debug("[reset] Destroying model")
332 # await self.controller.destroy_model(namespace, destroy_storage=True)
334 # # Disconnect from the controller
335 # self.log.debug("[reset] Disconnecting controller")
336 # await self.logout()
345 timeout
: float = 1800,
347 db_dict
: dict = None,
348 kdu_name
: str = None,
349 namespace
: str = None,
353 :param cluster_uuid str: The UUID of the cluster to install to
354 :param kdu_model str: The name or path of a bundle to install
355 :param atomic bool: If set, waits until the model is active and resets
356 the cluster on failure.
357 :param timeout int: The time, in seconds, to wait for the install
359 :param params dict: Key-value pairs of instantiation parameters
360 :param kdu_name: Name of the KDU instance to be installed
361 :param namespace: K8s namespace to use for the KDU instance
363 :return: If successful, returns ?
367 # controller = await self.get_controller(cluster_uuid)
370 # Get or create the model, based on the NS
374 raise K8sException("db_dict must be set")
376 raise K8sException("bundle must be set")
378 if bundle
.startswith("cs:"):
380 elif bundle
.startswith("http"):
384 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
385 os
.chdir(new_workdir
)
386 bundle
= "local:{}".format(kdu_model
)
389 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
391 kdu_instance
= db_dict
["filter"]["_id"]
393 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
395 # Create the new model
396 self
.log
.debug("Adding model: {}".format(kdu_instance
))
397 await self
.libjuju
.add_model(
398 model_name
=kdu_instance
,
399 cloud_name
=cluster_uuid
,
400 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
404 # TODO: Instantiation parameters
407 "Juju bundle that models the KDU, in any of the following ways:
408 - <juju-repo>/<juju-bundle>
409 - <juju-bundle folder under k8s_models folder in the package>
410 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
412 - <URL_where_to_fetch_juju_bundle>
415 previous_workdir
= os
.getcwd()
416 except FileNotFoundError
:
417 previous_workdir
= "/app/storage"
419 self
.log
.debug("[install] deploying {}".format(bundle
))
420 await self
.libjuju
.deploy(
421 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
424 # Get the application
426 # # applications = model.applications
427 # self.log.debug("[install] Applications: {}".format(model.applications))
428 # for name in model.applications:
429 # self.log.debug("[install] Waiting for {} to settle".format(name))
430 # application = model.applications[name]
432 # # It's not enough to wait for all units to be active;
433 # # the application status needs to be active as well.
434 # self.log.debug("Waiting for all units to be active...")
435 # await model.block_until(
437 # unit.agent_status == "idle"
438 # and application.status in ["active", "unknown"]
439 # and unit.workload_status in ["active", "unknown"]
440 # for unit in application.units
444 # self.log.debug("All units active.")
446 # # TODO use asyncio.TimeoutError
447 # except concurrent.futures._base.TimeoutError:
448 # os.chdir(previous_workdir)
449 # self.log.debug("[install] Timeout exceeded; resetting cluster")
450 # await self.reset(cluster_uuid)
453 # Wait for the application to be active
454 # if model.is_connected():
455 # self.log.debug("[install] Disconnecting model")
456 # await model.disconnect()
457 # await controller.disconnect()
458 os
.chdir(previous_workdir
)
462 async def instances_list(self
, cluster_uuid
: str) -> list:
464 returns a list of deployed releases in a cluster
466 :param cluster_uuid: the cluster
475 kdu_model
: str = None,
480 :param cluster_uuid str: The UUID of the cluster to upgrade
481 :param kdu_instance str: The unique name of the KDU instance
482 :param kdu_model str: The name or path of the bundle to upgrade to
483 :param params dict: Key-value pairs of instantiation parameters
485 :return: If successful, reference to the new revision number of the
489 # TODO: Loop through the bundle and upgrade each charm individually
492 The API doesn't have a concept of bundle upgrades, because there are
493 many possible changes: charm revision, disk, number of units, etc.
495 As such, we are only supporting a limited subset of upgrades. We'll
496 upgrade the charm revision but leave storage and scale untouched.
498 Scale changes should happen through OSM constructs, and changes to
499 storage would require a redeployment of the service, at least in this
502 raise MethodNotImplemented()
503 # TODO: Remove these commented lines
505 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
508 # namespace = self.get_namespace(cluster_uuid)
509 # controller = await self.get_controller(cluster_uuid)
512 # if namespace not in await controller.list_models():
513 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
515 # model = await controller.get_model(namespace)
516 # with open(kdu_model, "r") as f:
517 # bundle = yaml.safe_load(f)
521 # 'description': 'Test bundle',
522 # 'bundle': 'kubernetes',
525 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
528 # 'password': 'manopw',
529 # 'root_password': 'osm4u',
532 # 'series': 'kubernetes'
537 # # TODO: This should be returned in an agreed-upon format
538 # for name in bundle["applications"]:
539 # self.log.debug(model.applications)
540 # application = model.applications[name]
541 # self.log.debug(application)
543 # path = bundle["applications"][name]["charm"]
546 # await application.upgrade_charm(switch=path)
547 # except juju.errors.JujuError as ex:
548 # if "already running charm" in str(ex):
549 # # We're already running this version
553 # await model.disconnect()
554 # await controller.disconnect()
567 :param cluster_uuid str: The UUID of the cluster to rollback
568 :param kdu_instance str: The unique name of the KDU instance
569 :param revision int: The revision to revert to. If omitted, rolls back
570 the previous upgrade.
572 :return: If successful, returns the revision of active KDU instance,
573 or raises an exception
575 raise MethodNotImplemented()
579 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
580 """Uninstall a KDU instance
582 :param cluster_uuid str: The UUID of the cluster
583 :param kdu_instance str: The unique name of the KDU instance
585 :return: Returns True if successful, or raises an exception
588 # controller = await self.get_controller(cluster_uuid)
590 self
.log
.debug("[uninstall] Destroying model")
592 await self
.libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
594 # self.log.debug("[uninstall] Model destroyed and disconnecting")
595 # await controller.disconnect()
598 # TODO: Remove these commented lines
599 # if not self.authenticated:
600 # self.log.debug("[uninstall] Connecting to controller")
601 # await self.login(cluster_uuid)
603 async def exec_primitive(
605 cluster_uuid
: str = None,
606 kdu_instance
: str = None,
607 primitive_name
: str = None,
608 timeout
: float = 300,
610 db_dict
: dict = None,
612 """Exec primitive (Juju action)
614 :param cluster_uuid str: The UUID of the cluster
615 :param kdu_instance str: The unique name of the KDU instance
616 :param primitive_name: Name of action that will be executed
617 :param timeout: Timeout for action execution
618 :param params: Dictionary of all the parameters needed for the action
619 :db_dict: Dictionary for any additional data
621 :return: Returns the output of the action
624 # controller = await self.get_controller(cluster_uuid)
626 if not params
or "application-name" not in params
:
628 "Missing application-name argument, \
629 argument needed for K8s actions"
633 "[exec_primitive] Getting model "
634 "kdu_instance: {}".format(kdu_instance
)
636 application_name
= params
["application-name"]
637 actions
= await self
.libjuju
.get_actions(application_name
, kdu_instance
)
638 if primitive_name
not in actions
:
639 raise K8sException("Primitive {} not found".format(primitive_name
))
640 output
, status
= await self
.libjuju
.execute_action(
641 application_name
, kdu_instance
, primitive_name
, **params
643 # model = await self.get_model(kdu_instance, controller=controller)
645 # application_name = params["application-name"]
646 # application = model.applications[application_name]
648 # actions = await application.get_actions()
649 # if primitive_name not in actions:
650 # raise K8sException("Primitive {} not found".format(primitive_name))
653 # for u in application.units:
654 # if await u.is_leader_from_status():
659 # raise K8sException("No leader unit found to execute action")
661 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
662 # action = await unit.run_action(primitive_name, **params)
664 # output = await model.get_action_output(action_uuid=action.entity_id)
665 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
668 # status[action.entity_id] if action.entity_id in status else "failed"
671 if status
!= "completed":
673 "status is not completed: {} output: {}".format(status
, output
)
678 except Exception as e
:
679 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
680 self
.log
.error(error_msg
)
681 raise K8sException(message
=error_msg
)
683 # await controller.disconnect()
684 # TODO: Remove these commented lines:
685 # if not self.authenticated:
686 # self.log.debug("[exec_primitive] Connecting to controller")
687 # await self.login(cluster_uuid)
691 async def inspect_kdu(
697 Inspects a bundle and returns a dictionary of config parameters and
698 their default values.
700 :param kdu_model str: The name or path of the bundle to inspect.
702 :return: If successful, returns a dictionary of available parameters
703 and their default values.
707 if not os
.path
.exists(kdu_model
):
708 raise K8sException("file {} not found".format(kdu_model
))
710 with
open(kdu_model
, "r") as f
:
711 bundle
= yaml
.safe_load(f
.read())
715 'description': 'Test bundle',
716 'bundle': 'kubernetes',
719 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
722 'password': 'manopw',
723 'root_password': 'osm4u',
726 'series': 'kubernetes'
731 # TODO: This should be returned in an agreed-upon format
732 kdu
= bundle
["applications"]
742 If available, returns the README of the bundle.
744 :param kdu_model str: The name or path of a bundle
746 :return: If found, returns the contents of the README.
750 files
= ["README", "README.txt", "README.md"]
751 path
= os
.path
.dirname(kdu_model
)
752 for file in os
.listdir(path
):
754 with
open(file, "r") as f
:
760 async def status_kdu(
765 """Get the status of the KDU
767 Get the current status of the KDU instance.
769 :param cluster_uuid str: The UUID of the cluster
770 :param kdu_instance str: The unique id of the KDU instance
772 :return: Returns a dictionary containing namespace, state, resources,
776 # controller = await self.get_controller(cluster_uuid)
777 # model = await self.get_model(kdu_instance, controller=controller)
779 # model_status = await model.get_status()
780 # status = model_status.applications
781 model_status
= await self
.libjuju
.get_model_status(kdu_instance
)
782 for name
in model_status
.applications
:
783 application
= model_status
.applications
[name
]
784 status
[name
] = {"status": application
["status"]["status"]}
786 # await model.disconnect()
787 # await controller.disconnect()
791 async def get_services(
792 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
794 """Return a list of services of a kdu_instance"""
796 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
798 # config_path = "/tmp/{}".format(cluster_uuid)
799 # config_file = "{}/config".format(config_path)
801 # if not os.path.exists(config_path):
802 # os.makedirs(config_path)
803 # with open(config_file, "w") as f:
804 # f.write(credentials)
806 kubecfg
= tempfile
.NamedTemporaryFile()
807 with
open(kubecfg
.name
, "w") as kubecfg_file
:
808 kubecfg_file
.write(credentials
)
809 kubectl
= Kubectl(config_file
=kubecfg
.name
)
811 return kubectl
.get_services(
812 field_selector
="metadata.namespace={}".format(kdu_instance
)
815 async def get_service(
816 self
, cluster_uuid
: str, service_name
: str, namespace
: str
818 """Return data for a specific service inside a namespace"""
820 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
822 # config_path = "/tmp/{}".format(cluster_uuid)
823 # config_file = "{}/config".format(config_path)
825 # if not os.path.exists(config_path):
826 # os.makedirs(config_path)
827 # with open(config_file, "w") as f:
828 # f.write(credentials)
830 kubecfg
= tempfile
.NamedTemporaryFile()
831 with
open(kubecfg
.name
, "w") as kubecfg_file
:
832 kubecfg_file
.write(credentials
)
833 kubectl
= Kubectl(config_file
=kubecfg
.name
)
835 return kubectl
.get_services(
836 field_selector
="metadata.name={},metadata.namespace={}".format(
837 service_name
, namespace
842 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
843 # """Add a k8s cloud to Juju
845 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
848 # :param cloud_name str: The name of the cloud to add.
849 # :param credentials dict: A dictionary representing the output of
850 # `kubectl config view --raw`.
852 # :returns: True if successful, otherwise raises an exception.
855 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
856 # self.log.debug(cmd)
858 # process = await asyncio.create_subprocess_exec(
860 # stdout=asyncio.subprocess.PIPE,
861 # stderr=asyncio.subprocess.PIPE,
862 # stdin=asyncio.subprocess.PIPE,
865 # # Feed the process the credentials
866 # process.stdin.write(credentials.encode("utf-8"))
867 # await process.stdin.drain()
868 # process.stdin.close()
870 # _stdout, stderr = await process.communicate()
872 # return_code = process.returncode
874 # self.log.debug("add-k8s return code: {}".format(return_code))
876 # if return_code > 0:
877 # raise Exception(stderr)
881 # async def add_model(
882 # self, model_name: str, cluster_uuid: str, controller: Controller
884 # """Adds a model to the controller
886 # Adds a new model to the Juju controller
888 # :param model_name str: The name of the model to add.
889 # :param cluster_uuid str: ID of the cluster.
890 # :param controller: Controller object in which the model will be added
891 # :returns: The juju.model.Model object of the new model upon success or
892 # raises an exception.
896 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
900 # if self.juju_public_key is not None:
901 # model = await controller.add_model(
902 # model_name, config={"authorized-keys": self.juju_public_key}
905 # model = await controller.add_model(model_name)
906 # except Exception as ex:
908 # self.log.debug("Caught exception: {}".format(ex))
913 # async def bootstrap(
914 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
916 # """Bootstrap a Kubernetes controller
918 # Bootstrap a Juju controller inside the Kubernetes cluster
920 # :param cloud_name str: The name of the cloud.
921 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
922 # :param loadbalancer bool: If the controller should use loadbalancer or not.
923 # :returns: True upon success or raises an exception.
926 # if not loadbalancer:
927 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
930 # For public clusters, specify that the controller service is using a
939 # "controller-service-type=loadbalancer",
943 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
946 # process = await asyncio.create_subprocess_exec(
947 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
950 # _stdout, stderr = await process.communicate()
952 # return_code = process.returncode
954 # if return_code > 0:
956 # if b"already exists" not in stderr:
957 # raise Exception(stderr)
961 # async def destroy_controller(self, cluster_uuid: str) -> bool:
962 # """Destroy a Kubernetes controller
964 # Destroy an existing Kubernetes controller.
966 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
967 # :returns: True upon success or raises an exception.
971 # "destroy-controller",
972 # "--destroy-all-models",
973 # "--destroy-storage",
978 # process = await asyncio.create_subprocess_exec(
979 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
982 # _stdout, stderr = await process.communicate()
984 # return_code = process.returncode
986 # if return_code > 0:
988 # if "already exists" not in stderr:
989 # raise Exception(stderr)
991 def get_credentials(self
, cluster_uuid
: str) -> str:
993 Get Cluster Kubeconfig
995 k8scluster
= self
.db
.get_one(
996 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
999 self
.db
.encrypt_decrypt_fields(
1000 k8scluster
.get("credentials"),
1002 ["password", "secret"],
1003 schema_version
=k8scluster
["schema_version"],
1004 salt
=k8scluster
["_id"],
1007 return yaml
.safe_dump(k8scluster
.get("credentials"))
1009 def _get_credential_name(self
, cluster_uuid
: str) -> str:
1011 Get credential name for a k8s cloud
1013 We cannot use the cluster_uuid for the credential name directly,
1014 because it cannot start with a number, it must start with a letter.
1015 Therefore, the k8s cloud credential name will be "cred-" followed
1016 by the cluster uuid.
1018 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1020 :return: Name to use for the credential name.
1022 return "cred-{}".format(cluster_uuid
)
1024 # def get_config(self, cluster_uuid: str,) -> dict:
1025 # """Get the cluster configuration
1027 # Gets the configuration of the cluster
1029 # :param cluster_uuid str: The UUID of the cluster.
1030 # :return: A dict upon success, or raises an exception.
1033 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1035 # for k in juju_db["k8sclusters"]:
1036 # if k["_id"] == cluster_uuid:
1037 # config = k["config"]
1038 # self.db.encrypt_decrypt_fields(
1041 # ["secret", "cacert"],
1042 # schema_version="1.1",
1048 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1052 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1053 # """Get a model from the Juju Controller.
1055 # Note: Model objects returned must call disconnected() before it goes
1058 # :param model_name str: The name of the model to get
1059 # :param controller Controller: Controller object
1060 # :return The juju.model.Model object if found, or None.
1063 # models = await controller.list_models()
1064 # if model_name not in models:
1065 # raise N2VCNotFound("Model {} not found".format(model_name))
1066 # self.log.debug("Found model: {}".format(model_name))
1067 # return await controller.get_model(model_name)
1073 """Get the namespace UUID
1074 Gets the namespace's unique name
1076 :param cluster_uuid str: The UUID of the cluster
1077 :returns: The namespace UUID, or raises an exception
1079 # config = self.get_config(cluster_uuid)
1081 # Make sure the name is in the config
1082 # if "namespace" not in config:
1083 # raise Exception("Namespace not found.")
1085 # TODO: We want to make sure this is unique to the cluster, in case
1086 # the cluster is being reused.
1087 # Consider pre/appending the cluster id to the namespace string
1090 # TODO: Remove these lines of code
1091 # async def has_model(self, model_name: str) -> bool:
1092 # """Check if a model exists in the controller
1094 # Checks to see if a model exists in the connected Juju controller.
1096 # :param model_name str: The name of the model
1097 # :return: A boolean indicating if the model exists
1099 # models = await self.controller.list_models()
1101 # if model_name in models:
1105 # def is_local_k8s(self, credentials: str,) -> bool:
1106 # """Check if a cluster is local
1108 # Checks if a cluster is running in the local host
1110 # :param credentials dict: A dictionary containing the k8s credentials
1111 # :returns: A boolean if the cluster is running locally
1114 # creds = yaml.safe_load(credentials)
1116 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1117 # for cluster in creds["clusters"]:
1118 # if "server" in cluster["cluster"]:
1119 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1124 # async def get_controller(self, cluster_uuid):
1125 # """Login to the Juju controller."""
1127 # config = self.get_config(cluster_uuid)
1129 # juju_endpoint = config["endpoint"]
1130 # juju_user = config["username"]
1131 # juju_secret = config["secret"]
1132 # juju_ca_cert = config["cacert"]
1134 # controller = Controller()
1138 # "Connecting to controller... ws://{} as {}".format(
1139 # juju_endpoint, juju_user,
1143 # await controller.connect(
1144 # endpoint=juju_endpoint,
1145 # username=juju_user,
1146 # password=juju_secret,
1147 # cacert=juju_ca_cert,
1149 # self.log.debug("JujuApi: Logged into controller")
1151 # except Exception as ex:
1152 # self.log.debug(ex)
1153 # self.log.debug("Caught exception: {}".format(ex))
1155 # self.log.fatal("VCA credentials not configured.")
1157 # TODO: Remove these commented lines
1158 # self.authenticated = False
1159 # if self.authenticated:
1162 # self.connecting = True
1163 # juju_public_key = None
1164 # self.authenticated = True
1165 # Test: Make sure we have the credentials loaded
1166 # async def logout(self):
1167 # """Logout of the Juju controller."""
1168 # self.log.debug("[logout]")
1169 # if not self.authenticated:
1172 # for model in self.models:
1173 # self.log.debug("Logging out of model {}".format(model))
1174 # await self.models[model].disconnect()
1176 # if self.controller:
1177 # self.log.debug("Disconnecting controller {}".format(self.controller))
1178 # await self.controller.disconnect()
1179 # self.controller = None
1181 # self.authenticated = False
1183 # async def remove_cloud(self, cloud_name: str,) -> bool:
1184 # """Remove a k8s cloud from Juju
1186 # Removes a Kubernetes cloud from Juju.
1188 # :param cloud_name str: The name of the cloud to add.
1190 # :returns: True if successful, otherwise raises an exception.
1193 # # Remove the bootstrapped controller
1194 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1195 # process = await asyncio.create_subprocess_exec(
1196 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1199 # _stdout, stderr = await process.communicate()
1201 # return_code = process.returncode
1203 # if return_code > 0:
1204 # raise Exception(stderr)
1206 # # Remove the cloud from the local config
1207 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1208 # process = await asyncio.create_subprocess_exec(
1209 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1212 # _stdout, stderr = await process.communicate()
1214 # return_code = process.returncode
1216 # if return_code > 0:
1217 # raise Exception(stderr)
1221 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1222 # """Save the cluster configuration
1224 # Saves the cluster information to the Mongo database
1226 # :param cluster_uuid str: The UUID of the cluster
1227 # :param config dict: A dictionary containing the cluster configuration
1230 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1232 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1233 # self.db.encrypt_decrypt_fields(
1236 # ["secret", "cacert"],
1237 # schema_version="1.1",
1238 # salt=cluster_uuid,
1240 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1243 # q_filter={"_id": "juju"},
1244 # update_dict={"k8sclusters": k8sclusters},