1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from n2vc
.exceptions
import K8sException
, N2VCBadArgumentsException
24 from n2vc
.k8s_conn
import K8sConnector
25 from n2vc
.kubectl
import Kubectl
, CORE_CLIENT
, RBAC_CLIENT
26 from .exceptions
import MethodNotImplemented
27 from n2vc
.utils
import base64_to_cacert
28 from n2vc
.libjuju
import Libjuju
30 from kubernetes
.client
.models
import (
40 from typing
import Dict
42 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
44 RBAC_LABEL_KEY_NAME
= "rbac-id"
46 ADMIN_NAMESPACE
= "kube-system"
47 RBAC_STACK_PREFIX
= "juju-credential"
49 # from juju.bundle import BundleHandler
52 # from .vnf import N2VC
55 def generate_rbac_id():
56 return binascii
.hexlify(os
.urandom(4)).decode()
59 class K8sJujuConnector(K8sConnector
):
64 kubectl_command
: str = "/usr/bin/kubectl",
65 juju_command
: str = "/usr/bin/juju",
69 vca_config
: dict = None,
72 :param fs: file system for kubernetes and helm configuration
73 :param db: Database object
74 :param kubectl_command: path to kubectl executable
75 :param helm_command: path to helm executable
77 :param: loop: Asyncio loop
81 K8sConnector
.__init
__(
85 on_update_db
=on_update_db
,
89 self
.loop
= loop
or asyncio
.get_event_loop()
90 self
.log
.debug("Initializing K8S Juju connector")
92 required_vca_config
= [
98 if not vca_config
or not all(k
in vca_config
for k
in required_vca_config
):
99 raise N2VCBadArgumentsException(
100 message
="Missing arguments in vca_config: {}".format(vca_config
),
101 bad_args
=required_vca_config
,
103 port
= vca_config
["port"] if "port" in vca_config
else 17070
104 url
= "{}:{}".format(vca_config
["host"], port
)
105 enable_os_upgrade
= vca_config
.get("enable_os_upgrade", True)
106 apt_mirror
= vca_config
.get("apt_mirror", None)
107 username
= vca_config
["user"]
108 secret
= vca_config
["secret"]
109 ca_cert
= base64_to_cacert(vca_config
["ca_cert"])
111 self
.libjuju
= Libjuju(
113 api_proxy
=None, # Not needed for k8s charms
114 enable_os_upgrade
=enable_os_upgrade
,
115 apt_mirror
=apt_mirror
,
123 self
.log
.debug("K8S Juju connector initialized")
124 # TODO: Remove these commented lines:
125 # self.authenticated = False
127 # self.juju_secret = ""
134 namespace
: str = "kube-system",
135 reuse_cluster_uuid
: str = None,
138 It prepares a given K8s cluster environment to run Juju bundles.
140 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
142 :param namespace: optional namespace to be used for juju. By default,
143 'kube-system' will be used
144 :param reuse_cluster_uuid: existing cluster uuid for reuse
145 :return: uuid of the K8s cluster and True if connector has installed some
146 software in the cluster
147 (on error, an exception will be raised)
152 # Bootstrapping cannot be done, by design, through the API. We need to
159 # 1. Has the environment already been bootstrapped?
160 # - Check the database to see if we have a record for this env
162 # 2. If this is a new env, create it
163 # - Add the k8s cloud to Juju
165 # - Record it in the database
167 # 3. Connect to the Juju controller for this cloud
170 # cluster_uuid = reuse_cluster_uuid
171 # if not cluster_uuid:
172 # cluster_uuid = str(uuid4())
174 ##################################################
175 # TODO: Pull info from db based on the namespace #
176 ##################################################
178 ###################################################
179 # TODO: Make it idempotent, calling add-k8s and #
180 # bootstrap whenever reuse_cluster_uuid is passed #
182 # `init_env` is called to initialize the K8s #
183 # cluster for juju. If this initialization fails, #
184 # it can be called again by LCM with the param #
185 # reuse_cluster_uuid, e.g. to try to fix it. #
186 ###################################################
188 # This is a new cluster, so bootstrap it
190 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
192 # Is a local k8s cluster?
193 # localk8s = self.is_local_k8s(k8s_creds)
195 # If the k8s is external, the juju controller needs a loadbalancer
196 # loadbalancer = False if localk8s else True
198 # Name the new k8s cloud
199 # k8s_cloud = "k8s-{}".format(cluster_uuid)
201 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
202 # await self.add_k8s(k8s_cloud, k8s_creds)
204 # Bootstrap Juju controller
205 # self.log.debug("Bootstrapping...")
206 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
207 # self.log.debug("Bootstrap done.")
209 # Get the controller information
211 # Parse ~/.local/share/juju/controllers.yaml
212 # controllers.testing.api-endpoints|ca-cert|uuid
213 # self.log.debug("Getting controller endpoints")
214 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
215 # controllers = yaml.load(f, Loader=yaml.Loader)
216 # controller = controllers["controllers"][cluster_uuid]
217 # endpoints = controller["api-endpoints"]
218 # juju_endpoint = endpoints[0]
219 # juju_ca_cert = controller["ca-cert"]
221 # Parse ~/.local/share/juju/accounts
222 # controllers.testing.user|password
223 # self.log.debug("Getting accounts")
224 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
225 # controllers = yaml.load(f, Loader=yaml.Loader)
226 # controller = controllers["controllers"][cluster_uuid]
228 # juju_user = controller["user"]
229 # juju_secret = controller["password"]
232 # "endpoint": juju_endpoint,
233 # "username": juju_user,
234 # "secret": juju_secret,
235 # "cacert": juju_ca_cert,
236 # "loadbalancer": loadbalancer,
239 # Store the cluster configuration so it
240 # can be used for subsequent calls
241 kubecfg
= tempfile
.NamedTemporaryFile()
242 with
open(kubecfg
.name
, "w") as kubecfg_file
:
243 kubecfg_file
.write(k8s_creds
)
244 kubectl
= Kubectl(config_file
=kubecfg
.name
)
246 # CREATING RESOURCES IN K8S
247 rbac_id
= generate_rbac_id()
248 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
249 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
251 # Create cleanup dictionary to clean up created resources
252 # if it fails in the middle of the process
255 self
._create
_cluster
_role
(
262 "delete": self
._delete
_cluster
_role
,
263 "args": (kubectl
, metadata_name
),
267 self
._create
_service
_account
(
274 "delete": self
._delete
_service
_account
,
275 "args": (kubectl
, metadata_name
),
279 self
._create
_cluster
_role
_binding
(
286 "delete": self
._delete
_service
_account
,
287 "args": (kubectl
, metadata_name
),
290 token
, client_cert_data
= await self
._get
_secret
_data
(
295 default_storage_class
= kubectl
.get_default_storage_class()
296 await self
.libjuju
.add_k8s(
300 client_cert_data
=client_cert_data
,
301 configuration
=kubectl
.configuration
,
302 storage_class
=default_storage_class
,
303 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
305 # self.log.debug("Setting config")
306 # await self.set_config(cluster_uuid, config)
309 # controller = await self.get_controller(cluster_uuid)
310 # await controller.disconnect()
312 # TODO: Remove these commented lines
313 # raise Exception("EOL")
314 # self.juju_public_key = None
315 # Login to the k8s cluster
316 # if not self.authenticated:
317 # await self.login(cluster_uuid)
319 # We're creating a new cluster
320 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
321 # cluster_uuid=cluster_uuid))
322 # model = await self.get_model(
323 # self.get_namespace(cluster_uuid),
324 # cluster_uuid=cluster_uuid
327 # Disconnect from the model
328 # if model and model.is_connected():
329 # await model.disconnect()
331 return cluster_uuid
, True
332 except Exception as e
:
333 self
.log
.error("Error initializing k8scluster: {}".format(e
))
334 if len(cleanup_data
) > 0:
335 self
.log
.debug("Cleaning up created resources in k8s cluster...")
336 for item
in cleanup_data
:
337 delete_function
= item
["delete"]
338 delete_args
= item
["args"]
339 delete_function(*delete_args
)
340 self
.log
.debug("Cleanup finished")
343 """Repo Management"""
349 _type
: str = "charm",
351 raise MethodNotImplemented()
353 async def repo_list(self
):
354 raise MethodNotImplemented()
356 async def repo_remove(
360 raise MethodNotImplemented()
362 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
364 Returns None as currently add_repo is not implemented
371 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
375 Resets the Kubernetes cluster by removing the model that represents it.
377 :param cluster_uuid str: The UUID of the cluster to reset
378 :return: Returns True if successful or raises an exception.
382 # Remove k8scluster from database
383 # self.log.debug("[reset] Removing k8scluster from juju database")
384 # juju_db = self.db.get_one("admin", {"_id": "juju"})
386 # for k in juju_db["k8sclusters"]:
387 # if k["_id"] == cluster_uuid:
388 # juju_db["k8sclusters"].remove(k)
391 # q_filter={"_id": "juju"},
392 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
396 # Destroy the controller (via CLI)
397 # self.log.debug("[reset] Destroying controller")
398 # await self.destroy_controller(cluster_uuid)
399 self
.log
.debug("[reset] Removing k8s cloud")
400 # k8s_cloud = "k8s-{}".format(cluster_uuid)
401 # await self.remove_cloud(k8s_cloud)
403 cloud_creds
= await self
.libjuju
.get_cloud_credentials(
405 self
._get
_credential
_name
(cluster_uuid
),
408 await self
.libjuju
.remove_cloud(cluster_uuid
)
410 kubecfg
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
412 kubecfg_file
= tempfile
.NamedTemporaryFile()
413 with
open(kubecfg_file
.name
, "w") as f
:
415 kubectl
= Kubectl(config_file
=kubecfg_file
.name
)
418 self
._delete
_cluster
_role
_binding
,
419 self
._delete
_service
_account
,
420 self
._delete
_cluster
_role
,
423 credential_attrs
= cloud_creds
[0].result
["attrs"]
424 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
425 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
426 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
427 delete_args
= (kubectl
, metadata_name
)
428 for delete_func
in delete_functions
:
430 delete_func(*delete_args
)
431 except Exception as e
:
432 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
434 except Exception as e
:
435 self
.log
.debug("Caught exception during reset: {}".format(e
))
438 # TODO: Remove these commented lines
439 # if not self.authenticated:
440 # await self.login(cluster_uuid)
442 # if self.controller.is_connected():
443 # # Destroy the model
444 # namespace = self.get_namespace(cluster_uuid)
445 # if await self.has_model(namespace):
446 # self.log.debug("[reset] Destroying model")
447 # await self.controller.destroy_model(namespace, destroy_storage=True)
449 # # Disconnect from the controller
450 # self.log.debug("[reset] Disconnecting controller")
451 # await self.logout()
460 timeout
: float = 1800,
462 db_dict
: dict = None,
463 kdu_name
: str = None,
464 namespace
: str = None,
468 :param cluster_uuid str: The UUID of the cluster to install to
469 :param kdu_model str: The name or path of a bundle to install
470 :param atomic bool: If set, waits until the model is active and resets
471 the cluster on failure.
472 :param timeout int: The time, in seconds, to wait for the install
474 :param params dict: Key-value pairs of instantiation parameters
475 :param kdu_name: Name of the KDU instance to be installed
476 :param namespace: K8s namespace to use for the KDU instance
478 :return: If successful, returns ?
482 # controller = await self.get_controller(cluster_uuid)
485 # Get or create the model, based on the NS
489 raise K8sException("db_dict must be set")
491 raise K8sException("bundle must be set")
493 if bundle
.startswith("cs:"):
495 elif bundle
.startswith("http"):
499 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
500 os
.chdir(new_workdir
)
501 bundle
= "local:{}".format(kdu_model
)
504 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
506 kdu_instance
= db_dict
["filter"]["_id"]
508 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
510 # Create the new model
511 self
.log
.debug("Adding model: {}".format(kdu_instance
))
512 await self
.libjuju
.add_model(
513 model_name
=kdu_instance
,
514 cloud_name
=cluster_uuid
,
515 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
519 # TODO: Instantiation parameters
522 "Juju bundle that models the KDU, in any of the following ways:
523 - <juju-repo>/<juju-bundle>
524 - <juju-bundle folder under k8s_models folder in the package>
525 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
527 - <URL_where_to_fetch_juju_bundle>
530 previous_workdir
= os
.getcwd()
531 except FileNotFoundError
:
532 previous_workdir
= "/app/storage"
534 self
.log
.debug("[install] deploying {}".format(bundle
))
535 await self
.libjuju
.deploy(
536 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
539 # Get the application
541 # # applications = model.applications
542 # self.log.debug("[install] Applications: {}".format(model.applications))
543 # for name in model.applications:
544 # self.log.debug("[install] Waiting for {} to settle".format(name))
545 # application = model.applications[name]
547 # # It's not enough to wait for all units to be active;
548 # # the application status needs to be active as well.
549 # self.log.debug("Waiting for all units to be active...")
550 # await model.block_until(
552 # unit.agent_status == "idle"
553 # and application.status in ["active", "unknown"]
554 # and unit.workload_status in ["active", "unknown"]
555 # for unit in application.units
559 # self.log.debug("All units active.")
561 # # TODO use asyncio.TimeoutError
562 # except concurrent.futures._base.TimeoutError:
563 # os.chdir(previous_workdir)
564 # self.log.debug("[install] Timeout exceeded; resetting cluster")
565 # await self.reset(cluster_uuid)
568 # Wait for the application to be active
569 # if model.is_connected():
570 # self.log.debug("[install] Disconnecting model")
571 # await model.disconnect()
572 # await controller.disconnect()
573 os
.chdir(previous_workdir
)
577 async def instances_list(self
, cluster_uuid
: str) -> list:
579 returns a list of deployed releases in a cluster
581 :param cluster_uuid: the cluster
590 kdu_model
: str = None,
595 :param cluster_uuid str: The UUID of the cluster to upgrade
596 :param kdu_instance str: The unique name of the KDU instance
597 :param kdu_model str: The name or path of the bundle to upgrade to
598 :param params dict: Key-value pairs of instantiation parameters
600 :return: If successful, reference to the new revision number of the
604 # TODO: Loop through the bundle and upgrade each charm individually
607 The API doesn't have a concept of bundle upgrades, because there are
608 many possible changes: charm revision, disk, number of units, etc.
610 As such, we are only supporting a limited subset of upgrades. We'll
611 upgrade the charm revision but leave storage and scale untouched.
613 Scale changes should happen through OSM constructs, and changes to
614 storage would require a redeployment of the service, at least in this
617 raise MethodNotImplemented()
618 # TODO: Remove these commented lines
620 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
623 # namespace = self.get_namespace(cluster_uuid)
624 # controller = await self.get_controller(cluster_uuid)
627 # if namespace not in await controller.list_models():
628 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
630 # model = await controller.get_model(namespace)
631 # with open(kdu_model, "r") as f:
632 # bundle = yaml.safe_load(f)
636 # 'description': 'Test bundle',
637 # 'bundle': 'kubernetes',
640 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
643 # 'password': 'manopw',
644 # 'root_password': 'osm4u',
647 # 'series': 'kubernetes'
652 # # TODO: This should be returned in an agreed-upon format
653 # for name in bundle["applications"]:
654 # self.log.debug(model.applications)
655 # application = model.applications[name]
656 # self.log.debug(application)
658 # path = bundle["applications"][name]["charm"]
661 # await application.upgrade_charm(switch=path)
662 # except juju.errors.JujuError as ex:
663 # if "already running charm" in str(ex):
664 # # We're already running this version
668 # await model.disconnect()
669 # await controller.disconnect()
682 :param cluster_uuid str: The UUID of the cluster to rollback
683 :param kdu_instance str: The unique name of the KDU instance
684 :param revision int: The revision to revert to. If omitted, rolls back
685 the previous upgrade.
687 :return: If successful, returns the revision of active KDU instance,
688 or raises an exception
690 raise MethodNotImplemented()
694 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
695 """Uninstall a KDU instance
697 :param cluster_uuid str: The UUID of the cluster
698 :param kdu_instance str: The unique name of the KDU instance
700 :return: Returns True if successful, or raises an exception
703 # controller = await self.get_controller(cluster_uuid)
705 self
.log
.debug("[uninstall] Destroying model")
707 await self
.libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
709 # self.log.debug("[uninstall] Model destroyed and disconnecting")
710 # await controller.disconnect()
713 # TODO: Remove these commented lines
714 # if not self.authenticated:
715 # self.log.debug("[uninstall] Connecting to controller")
716 # await self.login(cluster_uuid)
718 async def exec_primitive(
720 cluster_uuid
: str = None,
721 kdu_instance
: str = None,
722 primitive_name
: str = None,
723 timeout
: float = 300,
725 db_dict
: dict = None,
727 """Exec primitive (Juju action)
729 :param cluster_uuid str: The UUID of the cluster
730 :param kdu_instance str: The unique name of the KDU instance
731 :param primitive_name: Name of action that will be executed
732 :param timeout: Timeout for action execution
733 :param params: Dictionary of all the parameters needed for the action
734 :db_dict: Dictionary for any additional data
736 :return: Returns the output of the action
739 # controller = await self.get_controller(cluster_uuid)
741 if not params
or "application-name" not in params
:
743 "Missing application-name argument, \
744 argument needed for K8s actions"
748 "[exec_primitive] Getting model "
749 "kdu_instance: {}".format(kdu_instance
)
751 application_name
= params
["application-name"]
752 actions
= await self
.libjuju
.get_actions(application_name
, kdu_instance
)
753 if primitive_name
not in actions
:
754 raise K8sException("Primitive {} not found".format(primitive_name
))
755 output
, status
= await self
.libjuju
.execute_action(
756 application_name
, kdu_instance
, primitive_name
, **params
758 # model = await self.get_model(kdu_instance, controller=controller)
760 # application_name = params["application-name"]
761 # application = model.applications[application_name]
763 # actions = await application.get_actions()
764 # if primitive_name not in actions:
765 # raise K8sException("Primitive {} not found".format(primitive_name))
768 # for u in application.units:
769 # if await u.is_leader_from_status():
774 # raise K8sException("No leader unit found to execute action")
776 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
777 # action = await unit.run_action(primitive_name, **params)
779 # output = await model.get_action_output(action_uuid=action.entity_id)
780 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
783 # status[action.entity_id] if action.entity_id in status else "failed"
786 if status
!= "completed":
788 "status is not completed: {} output: {}".format(status
, output
)
793 except Exception as e
:
794 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
795 self
.log
.error(error_msg
)
796 raise K8sException(message
=error_msg
)
798 # await controller.disconnect()
799 # TODO: Remove these commented lines:
800 # if not self.authenticated:
801 # self.log.debug("[exec_primitive] Connecting to controller")
802 # await self.login(cluster_uuid)
806 async def inspect_kdu(
812 Inspects a bundle and returns a dictionary of config parameters and
813 their default values.
815 :param kdu_model str: The name or path of the bundle to inspect.
817 :return: If successful, returns a dictionary of available parameters
818 and their default values.
822 if not os
.path
.exists(kdu_model
):
823 raise K8sException("file {} not found".format(kdu_model
))
825 with
open(kdu_model
, "r") as f
:
826 bundle
= yaml
.safe_load(f
.read())
830 'description': 'Test bundle',
831 'bundle': 'kubernetes',
834 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
837 'password': 'manopw',
838 'root_password': 'osm4u',
841 'series': 'kubernetes'
846 # TODO: This should be returned in an agreed-upon format
847 kdu
= bundle
["applications"]
857 If available, returns the README of the bundle.
859 :param kdu_model str: The name or path of a bundle
861 :return: If found, returns the contents of the README.
865 files
= ["README", "README.txt", "README.md"]
866 path
= os
.path
.dirname(kdu_model
)
867 for file in os
.listdir(path
):
869 with
open(file, "r") as f
:
875 async def status_kdu(
880 """Get the status of the KDU
882 Get the current status of the KDU instance.
884 :param cluster_uuid str: The UUID of the cluster
885 :param kdu_instance str: The unique id of the KDU instance
887 :return: Returns a dictionary containing namespace, state, resources,
891 # controller = await self.get_controller(cluster_uuid)
892 # model = await self.get_model(kdu_instance, controller=controller)
894 # model_status = await model.get_status()
895 # status = model_status.applications
896 model_status
= await self
.libjuju
.get_model_status(kdu_instance
)
897 for name
in model_status
.applications
:
898 application
= model_status
.applications
[name
]
899 status
[name
] = {"status": application
["status"]["status"]}
901 # await model.disconnect()
902 # await controller.disconnect()
906 async def get_services(
907 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
909 """Return a list of services of a kdu_instance"""
911 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
913 # config_path = "/tmp/{}".format(cluster_uuid)
914 # config_file = "{}/config".format(config_path)
916 # if not os.path.exists(config_path):
917 # os.makedirs(config_path)
918 # with open(config_file, "w") as f:
919 # f.write(credentials)
921 kubecfg
= tempfile
.NamedTemporaryFile()
922 with
open(kubecfg
.name
, "w") as kubecfg_file
:
923 kubecfg_file
.write(credentials
)
924 kubectl
= Kubectl(config_file
=kubecfg
.name
)
926 return kubectl
.get_services(
927 field_selector
="metadata.namespace={}".format(kdu_instance
)
930 async def get_service(
931 self
, cluster_uuid
: str, service_name
: str, namespace
: str
933 """Return data for a specific service inside a namespace"""
935 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
937 # config_path = "/tmp/{}".format(cluster_uuid)
938 # config_file = "{}/config".format(config_path)
940 # if not os.path.exists(config_path):
941 # os.makedirs(config_path)
942 # with open(config_file, "w") as f:
943 # f.write(credentials)
945 kubecfg
= tempfile
.NamedTemporaryFile()
946 with
open(kubecfg
.name
, "w") as kubecfg_file
:
947 kubecfg_file
.write(credentials
)
948 kubectl
= Kubectl(config_file
=kubecfg
.name
)
950 return kubectl
.get_services(
951 field_selector
="metadata.name={},metadata.namespace={}".format(
952 service_name
, namespace
957 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
958 # """Add a k8s cloud to Juju
960 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
963 # :param cloud_name str: The name of the cloud to add.
964 # :param credentials dict: A dictionary representing the output of
965 # `kubectl config view --raw`.
967 # :returns: True if successful, otherwise raises an exception.
970 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
971 # self.log.debug(cmd)
973 # process = await asyncio.create_subprocess_exec(
975 # stdout=asyncio.subprocess.PIPE,
976 # stderr=asyncio.subprocess.PIPE,
977 # stdin=asyncio.subprocess.PIPE,
980 # # Feed the process the credentials
981 # process.stdin.write(credentials.encode("utf-8"))
982 # await process.stdin.drain()
983 # process.stdin.close()
985 # _stdout, stderr = await process.communicate()
987 # return_code = process.returncode
989 # self.log.debug("add-k8s return code: {}".format(return_code))
991 # if return_code > 0:
992 # raise Exception(stderr)
996 # async def add_model(
997 # self, model_name: str, cluster_uuid: str, controller: Controller
999 # """Adds a model to the controller
1001 # Adds a new model to the Juju controller
1003 # :param model_name str: The name of the model to add.
1004 # :param cluster_uuid str: ID of the cluster.
1005 # :param controller: Controller object in which the model will be added
1006 # :returns: The juju.model.Model object of the new model upon success or
1007 # raises an exception.
1011 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
1015 # if self.juju_public_key is not None:
1016 # model = await controller.add_model(
1017 # model_name, config={"authorized-keys": self.juju_public_key}
1020 # model = await controller.add_model(model_name)
1021 # except Exception as ex:
1022 # self.log.debug(ex)
1023 # self.log.debug("Caught exception: {}".format(ex))
1028 # async def bootstrap(
1029 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
1031 # """Bootstrap a Kubernetes controller
1033 # Bootstrap a Juju controller inside the Kubernetes cluster
1035 # :param cloud_name str: The name of the cloud.
1036 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1037 # :param loadbalancer bool: If the controller should use loadbalancer or not.
1038 # :returns: True upon success or raises an exception.
1041 # if not loadbalancer:
1042 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1045 # For public clusters, specify that the controller service is using a
1049 # self.juju_command,
1054 # "controller-service-type=loadbalancer",
1058 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1061 # process = await asyncio.create_subprocess_exec(
1062 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1065 # _stdout, stderr = await process.communicate()
1067 # return_code = process.returncode
1069 # if return_code > 0:
1071 # if b"already exists" not in stderr:
1072 # raise Exception(stderr)
1076 # async def destroy_controller(self, cluster_uuid: str) -> bool:
1077 # """Destroy a Kubernetes controller
1079 # Destroy an existing Kubernetes controller.
1081 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1082 # :returns: True upon success or raises an exception.
1085 # self.juju_command,
1086 # "destroy-controller",
1087 # "--destroy-all-models",
1088 # "--destroy-storage",
1093 # process = await asyncio.create_subprocess_exec(
1094 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1097 # _stdout, stderr = await process.communicate()
1099 # return_code = process.returncode
1101 # if return_code > 0:
1103 # if "already exists" not in stderr:
1104 # raise Exception(stderr)
1106 def get_credentials(self
, cluster_uuid
: str) -> str:
1108 Get Cluster Kubeconfig
1110 k8scluster
= self
.db
.get_one(
1111 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
1114 self
.db
.encrypt_decrypt_fields(
1115 k8scluster
.get("credentials"),
1117 ["password", "secret"],
1118 schema_version
=k8scluster
["schema_version"],
1119 salt
=k8scluster
["_id"],
1122 return yaml
.safe_dump(k8scluster
.get("credentials"))
1124 def _get_credential_name(self
, cluster_uuid
: str) -> str:
1126 Get credential name for a k8s cloud
1128 We cannot use the cluster_uuid for the credential name directly,
1129 because it cannot start with a number, it must start with a letter.
1130 Therefore, the k8s cloud credential name will be "cred-" followed
1131 by the cluster uuid.
1133 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1135 :return: Name to use for the credential name.
1137 return "cred-{}".format(cluster_uuid
)
1139 # def get_config(self, cluster_uuid: str,) -> dict:
1140 # """Get the cluster configuration
1142 # Gets the configuration of the cluster
1144 # :param cluster_uuid str: The UUID of the cluster.
1145 # :return: A dict upon success, or raises an exception.
1148 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1150 # for k in juju_db["k8sclusters"]:
1151 # if k["_id"] == cluster_uuid:
1152 # config = k["config"]
1153 # self.db.encrypt_decrypt_fields(
1156 # ["secret", "cacert"],
1157 # schema_version="1.1",
1163 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1167 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1168 # """Get a model from the Juju Controller.
1170 # Note: Model objects returned must call disconnected() before it goes
1173 # :param model_name str: The name of the model to get
1174 # :param controller Controller: Controller object
1175 # :return The juju.model.Model object if found, or None.
1178 # models = await controller.list_models()
1179 # if model_name not in models:
1180 # raise N2VCNotFound("Model {} not found".format(model_name))
1181 # self.log.debug("Found model: {}".format(model_name))
1182 # return await controller.get_model(model_name)
1188 """Get the namespace UUID
1189 Gets the namespace's unique name
1191 :param cluster_uuid str: The UUID of the cluster
1192 :returns: The namespace UUID, or raises an exception
1194 # config = self.get_config(cluster_uuid)
1196 # Make sure the name is in the config
1197 # if "namespace" not in config:
1198 # raise Exception("Namespace not found.")
1200 # TODO: We want to make sure this is unique to the cluster, in case
1201 # the cluster is being reused.
1202 # Consider pre/appending the cluster id to the namespace string
1205 # TODO: Remove these lines of code
1206 # async def has_model(self, model_name: str) -> bool:
1207 # """Check if a model exists in the controller
1209 # Checks to see if a model exists in the connected Juju controller.
1211 # :param model_name str: The name of the model
1212 # :return: A boolean indicating if the model exists
1214 # models = await self.controller.list_models()
1216 # if model_name in models:
1220 # def is_local_k8s(self, credentials: str,) -> bool:
1221 # """Check if a cluster is local
1223 # Checks if a cluster is running in the local host
1225 # :param credentials dict: A dictionary containing the k8s credentials
1226 # :returns: A boolean if the cluster is running locally
1229 # creds = yaml.safe_load(credentials)
1231 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1232 # for cluster in creds["clusters"]:
1233 # if "server" in cluster["cluster"]:
1234 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1239 # async def get_controller(self, cluster_uuid):
1240 # """Login to the Juju controller."""
1242 # config = self.get_config(cluster_uuid)
1244 # juju_endpoint = config["endpoint"]
1245 # juju_user = config["username"]
1246 # juju_secret = config["secret"]
1247 # juju_ca_cert = config["cacert"]
1249 # controller = Controller()
1253 # "Connecting to controller... ws://{} as {}".format(
1254 # juju_endpoint, juju_user,
1258 # await controller.connect(
1259 # endpoint=juju_endpoint,
1260 # username=juju_user,
1261 # password=juju_secret,
1262 # cacert=juju_ca_cert,
1264 # self.log.debug("JujuApi: Logged into controller")
1266 # except Exception as ex:
1267 # self.log.debug(ex)
1268 # self.log.debug("Caught exception: {}".format(ex))
1270 # self.log.fatal("VCA credentials not configured.")
1272 # TODO: Remove these commented lines
1273 # self.authenticated = False
1274 # if self.authenticated:
1277 # self.connecting = True
1278 # juju_public_key = None
1279 # self.authenticated = True
1280 # Test: Make sure we have the credentials loaded
1281 # async def logout(self):
1282 # """Logout of the Juju controller."""
1283 # self.log.debug("[logout]")
1284 # if not self.authenticated:
1287 # for model in self.models:
1288 # self.log.debug("Logging out of model {}".format(model))
1289 # await self.models[model].disconnect()
1291 # if self.controller:
1292 # self.log.debug("Disconnecting controller {}".format(self.controller))
1293 # await self.controller.disconnect()
1294 # self.controller = None
1296 # self.authenticated = False
1298 # async def remove_cloud(self, cloud_name: str,) -> bool:
1299 # """Remove a k8s cloud from Juju
1301 # Removes a Kubernetes cloud from Juju.
1303 # :param cloud_name str: The name of the cloud to add.
1305 # :returns: True if successful, otherwise raises an exception.
1308 # # Remove the bootstrapped controller
1309 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1310 # process = await asyncio.create_subprocess_exec(
1311 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1314 # _stdout, stderr = await process.communicate()
1316 # return_code = process.returncode
1318 # if return_code > 0:
1319 # raise Exception(stderr)
1321 # # Remove the cloud from the local config
1322 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1323 # process = await asyncio.create_subprocess_exec(
1324 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1327 # _stdout, stderr = await process.communicate()
1329 # return_code = process.returncode
1331 # if return_code > 0:
1332 # raise Exception(stderr)
1336 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1337 # """Save the cluster configuration
1339 # Saves the cluster information to the Mongo database
1341 # :param cluster_uuid str: The UUID of the cluster
1342 # :param config dict: A dictionary containing the cluster configuration
1345 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1347 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1348 # self.db.encrypt_decrypt_fields(
1351 # ["secret", "cacert"],
1352 # schema_version="1.1",
1353 # salt=cluster_uuid,
1355 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1358 # q_filter={"_id": "juju"},
1359 # update_dict={"k8sclusters": k8sclusters},
1362 # Private methods to create/delete needed resources in the
1363 # Kubernetes cluster to create the K8s cloud in Juju
1365 def _create_cluster_role(
1369 labels
: Dict
[str, str],
1371 cluster_roles
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role(
1372 field_selector
="metadata.name={}".format(name
)
1375 if len(cluster_roles
.items
) > 0:
1377 "Cluster role with metadata.name={} already exists".format(name
)
1380 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
1382 cluster_role
= V1ClusterRole(
1385 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
1386 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
1390 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
1392 def _delete_cluster_role(self
, kubectl
: Kubectl
, name
: str):
1393 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
1395 def _create_service_account(
1399 labels
: Dict
[str, str],
1401 service_accounts
= kubectl
.clients
[CORE_CLIENT
].list_namespaced_service_account(
1402 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
1404 if len(service_accounts
.items
) > 0:
1406 "Service account with metadata.name={} already exists".format(name
)
1409 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
1410 service_account
= V1ServiceAccount(metadata
=metadata
)
1412 kubectl
.clients
[CORE_CLIENT
].create_namespaced_service_account(
1413 ADMIN_NAMESPACE
, service_account
1416 def _delete_service_account(self
, kubectl
: Kubectl
, name
: str):
1417 kubectl
.clients
[CORE_CLIENT
].delete_namespaced_service_account(
1418 name
, ADMIN_NAMESPACE
1421 def _create_cluster_role_binding(
1425 labels
: Dict
[str, str],
1427 role_bindings
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
1428 field_selector
="metadata.name={}".format(name
)
1430 if len(role_bindings
.items
) > 0:
1431 raise Exception("Generated rbac id already exists")
1433 role_binding
= V1ClusterRoleBinding(
1434 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
1435 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
1437 V1Subject(kind
="ServiceAccount", name
=name
, namespace
=ADMIN_NAMESPACE
)
1440 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
1442 def _delete_cluster_role_binding(self
, kubectl
: Kubectl
, name
: str):
1443 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
1445 async def _get_secret_data(self
, kubectl
: Kubectl
, name
: str) -> (str, str):
1446 v1_core
= kubectl
.clients
[CORE_CLIENT
]
1452 service_accounts
= v1_core
.list_namespaced_service_account(
1453 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
1455 if len(service_accounts
.items
) == 0:
1457 "Service account not found with metadata.name={}".format(name
)
1459 service_account
= service_accounts
.items
[0]
1460 if service_account
.secrets
and len(service_account
.secrets
) > 0:
1461 secret_name
= service_account
.secrets
[0].name
1462 if secret_name
is not None or not retries_limit
:
1466 "Failed getting the secret from service account {}".format(name
)
1468 secret
= v1_core
.list_namespaced_secret(
1470 field_selector
="metadata.name={}".format(secret_name
),
1473 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
1474 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
1477 base64
.b64decode(token
).decode("utf-8"),
1478 base64
.b64decode(client_certificate_data
).decode("utf-8"),