1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from n2vc
.exceptions
import K8sException
, N2VCBadArgumentsException
24 from n2vc
.k8s_conn
import K8sConnector
25 from n2vc
.kubectl
import Kubectl
, CORE_CLIENT
, RBAC_CLIENT
26 from .exceptions
import MethodNotImplemented
27 from n2vc
.utils
import base64_to_cacert
28 from n2vc
.libjuju
import Libjuju
30 from kubernetes
.client
.models
import (
40 from typing
import Dict
42 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
44 RBAC_LABEL_KEY_NAME
= "rbac-id"
46 ADMIN_NAMESPACE
= "kube-system"
47 RBAC_STACK_PREFIX
= "juju-credential"
49 # from juju.bundle import BundleHandler
52 # from .vnf import N2VC
55 def generate_rbac_id():
56 return binascii
.hexlify(os
.urandom(4)).decode()
59 class K8sJujuConnector(K8sConnector
):
64 kubectl_command
: str = "/usr/bin/kubectl",
65 juju_command
: str = "/usr/bin/juju",
69 vca_config
: dict = None,
72 :param fs: file system for kubernetes and helm configuration
73 :param db: Database object
74 :param kubectl_command: path to kubectl executable
75 :param helm_command: path to helm executable
77 :param: loop: Asyncio loop
81 K8sConnector
.__init
__(
85 on_update_db
=on_update_db
,
89 self
.loop
= loop
or asyncio
.get_event_loop()
90 self
.log
.debug("Initializing K8S Juju connector")
92 required_vca_config
= [
98 if not vca_config
or not all(k
in vca_config
for k
in required_vca_config
):
99 raise N2VCBadArgumentsException(
100 message
="Missing arguments in vca_config: {}".format(vca_config
),
101 bad_args
=required_vca_config
,
103 port
= vca_config
["port"] if "port" in vca_config
else 17070
104 url
= "{}:{}".format(vca_config
["host"], port
)
105 enable_os_upgrade
= vca_config
.get("enable_os_upgrade", True)
106 apt_mirror
= vca_config
.get("apt_mirror", None)
107 username
= vca_config
["user"]
108 secret
= vca_config
["secret"]
109 ca_cert
= base64_to_cacert(vca_config
["ca_cert"])
111 self
.libjuju
= Libjuju(
113 api_proxy
=None, # Not needed for k8s charms
114 enable_os_upgrade
=enable_os_upgrade
,
115 apt_mirror
=apt_mirror
,
123 self
.log
.debug("K8S Juju connector initialized")
124 # TODO: Remove these commented lines:
125 # self.authenticated = False
127 # self.juju_secret = ""
134 namespace
: str = "kube-system",
135 reuse_cluster_uuid
: str = None,
138 It prepares a given K8s cluster environment to run Juju bundles.
140 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
142 :param namespace: optional namespace to be used for juju. By default,
143 'kube-system' will be used
144 :param reuse_cluster_uuid: existing cluster uuid for reuse
145 :return: uuid of the K8s cluster and True if connector has installed some
146 software in the cluster
147 (on error, an exception will be raised)
152 # Bootstrapping cannot be done, by design, through the API. We need to
159 # 1. Has the environment already been bootstrapped?
160 # - Check the database to see if we have a record for this env
162 # 2. If this is a new env, create it
163 # - Add the k8s cloud to Juju
165 # - Record it in the database
167 # 3. Connect to the Juju controller for this cloud
170 # cluster_uuid = reuse_cluster_uuid
171 # if not cluster_uuid:
172 # cluster_uuid = str(uuid4())
174 ##################################################
175 # TODO: Pull info from db based on the namespace #
176 ##################################################
178 ###################################################
179 # TODO: Make it idempotent, calling add-k8s and #
180 # bootstrap whenever reuse_cluster_uuid is passed #
182 # `init_env` is called to initialize the K8s #
183 # cluster for juju. If this initialization fails, #
184 # it can be called again by LCM with the param #
185 # reuse_cluster_uuid, e.g. to try to fix it. #
186 ###################################################
188 # This is a new cluster, so bootstrap it
190 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
192 # Is a local k8s cluster?
193 # localk8s = self.is_local_k8s(k8s_creds)
195 # If the k8s is external, the juju controller needs a loadbalancer
196 # loadbalancer = False if localk8s else True
198 # Name the new k8s cloud
199 # k8s_cloud = "k8s-{}".format(cluster_uuid)
201 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
202 # await self.add_k8s(k8s_cloud, k8s_creds)
204 # Bootstrap Juju controller
205 # self.log.debug("Bootstrapping...")
206 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
207 # self.log.debug("Bootstrap done.")
209 # Get the controller information
211 # Parse ~/.local/share/juju/controllers.yaml
212 # controllers.testing.api-endpoints|ca-cert|uuid
213 # self.log.debug("Getting controller endpoints")
214 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
215 # controllers = yaml.load(f, Loader=yaml.Loader)
216 # controller = controllers["controllers"][cluster_uuid]
217 # endpoints = controller["api-endpoints"]
218 # juju_endpoint = endpoints[0]
219 # juju_ca_cert = controller["ca-cert"]
221 # Parse ~/.local/share/juju/accounts
222 # controllers.testing.user|password
223 # self.log.debug("Getting accounts")
224 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
225 # controllers = yaml.load(f, Loader=yaml.Loader)
226 # controller = controllers["controllers"][cluster_uuid]
228 # juju_user = controller["user"]
229 # juju_secret = controller["password"]
232 # "endpoint": juju_endpoint,
233 # "username": juju_user,
234 # "secret": juju_secret,
235 # "cacert": juju_ca_cert,
236 # "loadbalancer": loadbalancer,
239 # Store the cluster configuration so it
240 # can be used for subsequent calls
241 kubecfg
= tempfile
.NamedTemporaryFile()
242 with
open(kubecfg
.name
, "w") as kubecfg_file
:
243 kubecfg_file
.write(k8s_creds
)
244 kubectl
= Kubectl(config_file
=kubecfg
.name
)
246 # CREATING RESOURCES IN K8S
247 rbac_id
= generate_rbac_id()
248 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
249 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
251 # Create cleanup dictionary to clean up created resources
252 # if it fails in the middle of the process
255 self
._create
_cluster
_role
(
262 "delete": self
._delete
_cluster
_role
,
263 "args": (kubectl
, metadata_name
),
267 self
._create
_service
_account
(
274 "delete": self
._delete
_service
_account
,
275 "args": (kubectl
, metadata_name
),
279 self
._create
_cluster
_role
_binding
(
286 "delete": self
._delete
_service
_account
,
287 "args": (kubectl
, metadata_name
),
290 token
, client_cert_data
= await self
._get
_secret
_data
(
295 default_storage_class
= kubectl
.get_default_storage_class()
296 await self
.libjuju
.add_k8s(
300 client_cert_data
=client_cert_data
,
301 configuration
=kubectl
.configuration
,
302 storage_class
=default_storage_class
,
303 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
305 # self.log.debug("Setting config")
306 # await self.set_config(cluster_uuid, config)
309 # controller = await self.get_controller(cluster_uuid)
310 # await controller.disconnect()
312 # TODO: Remove these commented lines
313 # raise Exception("EOL")
314 # self.juju_public_key = None
315 # Login to the k8s cluster
316 # if not self.authenticated:
317 # await self.login(cluster_uuid)
319 # We're creating a new cluster
320 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
321 # cluster_uuid=cluster_uuid))
322 # model = await self.get_model(
323 # self.get_namespace(cluster_uuid),
324 # cluster_uuid=cluster_uuid
327 # Disconnect from the model
328 # if model and model.is_connected():
329 # await model.disconnect()
331 return cluster_uuid
, True
332 except Exception as e
:
333 self
.log
.error("Error initializing k8scluster: {}".format(e
))
334 if len(cleanup_data
) > 0:
335 self
.log
.debug("Cleaning up created resources in k8s cluster...")
336 for item
in cleanup_data
:
337 delete_function
= item
["delete"]
338 delete_args
= item
["args"]
339 delete_function(*delete_args
)
340 self
.log
.debug("Cleanup finished")
343 """Repo Management"""
349 _type
: str = "charm",
351 raise MethodNotImplemented()
353 async def repo_list(self
):
354 raise MethodNotImplemented()
356 async def repo_remove(
360 raise MethodNotImplemented()
362 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
364 Returns None as currently add_repo is not implemented
371 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
375 Resets the Kubernetes cluster by removing the model that represents it.
377 :param cluster_uuid str: The UUID of the cluster to reset
378 :return: Returns True if successful or raises an exception.
382 # Remove k8scluster from database
383 # self.log.debug("[reset] Removing k8scluster from juju database")
384 # juju_db = self.db.get_one("admin", {"_id": "juju"})
386 # for k in juju_db["k8sclusters"]:
387 # if k["_id"] == cluster_uuid:
388 # juju_db["k8sclusters"].remove(k)
391 # q_filter={"_id": "juju"},
392 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
396 # Destroy the controller (via CLI)
397 # self.log.debug("[reset] Destroying controller")
398 # await self.destroy_controller(cluster_uuid)
399 self
.log
.debug("[reset] Removing k8s cloud")
400 # k8s_cloud = "k8s-{}".format(cluster_uuid)
401 # await self.remove_cloud(k8s_cloud)
403 cloud_creds
= await self
.libjuju
.get_cloud_credentials(
405 self
._get
_credential
_name
(cluster_uuid
),
408 await self
.libjuju
.remove_cloud(cluster_uuid
)
410 kubecfg
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
412 kubecfg_file
= tempfile
.NamedTemporaryFile()
413 with
open(kubecfg_file
.name
, "w") as f
:
415 kubectl
= Kubectl(config_file
=kubecfg_file
.name
)
418 self
._delete
_cluster
_role
_binding
,
419 self
._delete
_service
_account
,
420 self
._delete
_cluster
_role
,
423 credential_attrs
= cloud_creds
[0].result
["attrs"]
424 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
425 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
426 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
427 delete_args
= (kubectl
, metadata_name
)
428 for delete_func
in delete_functions
:
430 delete_func(*delete_args
)
431 except Exception as e
:
432 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
434 except Exception as e
:
435 self
.log
.debug("Caught exception during reset: {}".format(e
))
438 # TODO: Remove these commented lines
439 # if not self.authenticated:
440 # await self.login(cluster_uuid)
442 # if self.controller.is_connected():
443 # # Destroy the model
444 # namespace = self.get_namespace(cluster_uuid)
445 # if await self.has_model(namespace):
446 # self.log.debug("[reset] Destroying model")
447 # await self.controller.destroy_model(namespace, destroy_storage=True)
449 # # Disconnect from the controller
450 # self.log.debug("[reset] Disconnecting controller")
451 # await self.logout()
461 timeout
: float = 1800,
463 db_dict
: dict = None,
464 kdu_name
: str = None,
465 namespace
: str = None,
469 :param cluster_uuid str: The UUID of the cluster to install to
470 :param kdu_model str: The name or path of a bundle to install
471 :param kdu_instance: Kdu instance name
472 :param atomic bool: If set, waits until the model is active and resets
473 the cluster on failure.
474 :param timeout int: The time, in seconds, to wait for the install
476 :param params dict: Key-value pairs of instantiation parameters
477 :param kdu_name: Name of the KDU instance to be installed
478 :param namespace: K8s namespace to use for the KDU instance
480 :return: If successful, returns ?
484 # controller = await self.get_controller(cluster_uuid)
487 # Get or create the model, based on the NS
491 raise K8sException("db_dict must be set")
493 raise K8sException("bundle must be set")
495 if bundle
.startswith("cs:"):
497 elif bundle
.startswith("http"):
501 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
502 os
.chdir(new_workdir
)
503 bundle
= "local:{}".format(kdu_model
)
505 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
507 # Create the new model
508 self
.log
.debug("Adding model: {}".format(kdu_instance
))
509 await self
.libjuju
.add_model(
510 model_name
=kdu_instance
,
511 cloud_name
=cluster_uuid
,
512 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
516 # TODO: Instantiation parameters
519 "Juju bundle that models the KDU, in any of the following ways:
520 - <juju-repo>/<juju-bundle>
521 - <juju-bundle folder under k8s_models folder in the package>
522 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
524 - <URL_where_to_fetch_juju_bundle>
527 previous_workdir
= os
.getcwd()
528 except FileNotFoundError
:
529 previous_workdir
= "/app/storage"
531 self
.log
.debug("[install] deploying {}".format(bundle
))
532 await self
.libjuju
.deploy(
533 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
536 # Get the application
538 # # applications = model.applications
539 # self.log.debug("[install] Applications: {}".format(model.applications))
540 # for name in model.applications:
541 # self.log.debug("[install] Waiting for {} to settle".format(name))
542 # application = model.applications[name]
544 # # It's not enough to wait for all units to be active;
545 # # the application status needs to be active as well.
546 # self.log.debug("Waiting for all units to be active...")
547 # await model.block_until(
549 # unit.agent_status == "idle"
550 # and application.status in ["active", "unknown"]
551 # and unit.workload_status in ["active", "unknown"]
552 # for unit in application.units
556 # self.log.debug("All units active.")
558 # # TODO use asyncio.TimeoutError
559 # except concurrent.futures._base.TimeoutError:
560 # os.chdir(previous_workdir)
561 # self.log.debug("[install] Timeout exceeded; resetting cluster")
562 # await self.reset(cluster_uuid)
565 # Wait for the application to be active
566 # if model.is_connected():
567 # self.log.debug("[install] Disconnecting model")
568 # await model.disconnect()
569 # await controller.disconnect()
570 os
.chdir(previous_workdir
)
573 async def instances_list(self
, cluster_uuid
: str) -> list:
575 returns a list of deployed releases in a cluster
577 :param cluster_uuid: the cluster
586 kdu_model
: str = None,
591 :param cluster_uuid str: The UUID of the cluster to upgrade
592 :param kdu_instance str: The unique name of the KDU instance
593 :param kdu_model str: The name or path of the bundle to upgrade to
594 :param params dict: Key-value pairs of instantiation parameters
596 :return: If successful, reference to the new revision number of the
600 # TODO: Loop through the bundle and upgrade each charm individually
603 The API doesn't have a concept of bundle upgrades, because there are
604 many possible changes: charm revision, disk, number of units, etc.
606 As such, we are only supporting a limited subset of upgrades. We'll
607 upgrade the charm revision but leave storage and scale untouched.
609 Scale changes should happen through OSM constructs, and changes to
610 storage would require a redeployment of the service, at least in this
613 raise MethodNotImplemented()
614 # TODO: Remove these commented lines
616 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
619 # namespace = self.get_namespace(cluster_uuid)
620 # controller = await self.get_controller(cluster_uuid)
623 # if namespace not in await controller.list_models():
624 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
626 # model = await controller.get_model(namespace)
627 # with open(kdu_model, "r") as f:
628 # bundle = yaml.safe_load(f)
632 # 'description': 'Test bundle',
633 # 'bundle': 'kubernetes',
636 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
639 # 'password': 'manopw',
640 # 'root_password': 'osm4u',
643 # 'series': 'kubernetes'
648 # # TODO: This should be returned in an agreed-upon format
649 # for name in bundle["applications"]:
650 # self.log.debug(model.applications)
651 # application = model.applications[name]
652 # self.log.debug(application)
654 # path = bundle["applications"][name]["charm"]
657 # await application.upgrade_charm(switch=path)
658 # except juju.errors.JujuError as ex:
659 # if "already running charm" in str(ex):
660 # # We're already running this version
664 # await model.disconnect()
665 # await controller.disconnect()
678 :param cluster_uuid str: The UUID of the cluster to rollback
679 :param kdu_instance str: The unique name of the KDU instance
680 :param revision int: The revision to revert to. If omitted, rolls back
681 the previous upgrade.
683 :return: If successful, returns the revision of active KDU instance,
684 or raises an exception
686 raise MethodNotImplemented()
690 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
691 """Uninstall a KDU instance
693 :param cluster_uuid str: The UUID of the cluster
694 :param kdu_instance str: The unique name of the KDU instance
696 :return: Returns True if successful, or raises an exception
699 # controller = await self.get_controller(cluster_uuid)
701 self
.log
.debug("[uninstall] Destroying model")
703 await self
.libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
705 # self.log.debug("[uninstall] Model destroyed and disconnecting")
706 # await controller.disconnect()
709 # TODO: Remove these commented lines
710 # if not self.authenticated:
711 # self.log.debug("[uninstall] Connecting to controller")
712 # await self.login(cluster_uuid)
714 async def exec_primitive(
716 cluster_uuid
: str = None,
717 kdu_instance
: str = None,
718 primitive_name
: str = None,
719 timeout
: float = 300,
721 db_dict
: dict = None,
723 """Exec primitive (Juju action)
725 :param cluster_uuid str: The UUID of the cluster
726 :param kdu_instance str: The unique name of the KDU instance
727 :param primitive_name: Name of action that will be executed
728 :param timeout: Timeout for action execution
729 :param params: Dictionary of all the parameters needed for the action
730 :db_dict: Dictionary for any additional data
732 :return: Returns the output of the action
735 # controller = await self.get_controller(cluster_uuid)
737 if not params
or "application-name" not in params
:
739 "Missing application-name argument, \
740 argument needed for K8s actions"
744 "[exec_primitive] Getting model "
745 "kdu_instance: {}".format(kdu_instance
)
747 application_name
= params
["application-name"]
748 actions
= await self
.libjuju
.get_actions(application_name
, kdu_instance
)
749 if primitive_name
not in actions
:
750 raise K8sException("Primitive {} not found".format(primitive_name
))
751 output
, status
= await self
.libjuju
.execute_action(
752 application_name
, kdu_instance
, primitive_name
, **params
754 # model = await self.get_model(kdu_instance, controller=controller)
756 # application_name = params["application-name"]
757 # application = model.applications[application_name]
759 # actions = await application.get_actions()
760 # if primitive_name not in actions:
761 # raise K8sException("Primitive {} not found".format(primitive_name))
764 # for u in application.units:
765 # if await u.is_leader_from_status():
770 # raise K8sException("No leader unit found to execute action")
772 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
773 # action = await unit.run_action(primitive_name, **params)
775 # output = await model.get_action_output(action_uuid=action.entity_id)
776 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
779 # status[action.entity_id] if action.entity_id in status else "failed"
782 if status
!= "completed":
784 "status is not completed: {} output: {}".format(status
, output
)
789 except Exception as e
:
790 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
791 self
.log
.error(error_msg
)
792 raise K8sException(message
=error_msg
)
794 # await controller.disconnect()
795 # TODO: Remove these commented lines:
796 # if not self.authenticated:
797 # self.log.debug("[exec_primitive] Connecting to controller")
798 # await self.login(cluster_uuid)
802 async def inspect_kdu(
808 Inspects a bundle and returns a dictionary of config parameters and
809 their default values.
811 :param kdu_model str: The name or path of the bundle to inspect.
813 :return: If successful, returns a dictionary of available parameters
814 and their default values.
818 if not os
.path
.exists(kdu_model
):
819 raise K8sException("file {} not found".format(kdu_model
))
821 with
open(kdu_model
, "r") as f
:
822 bundle
= yaml
.safe_load(f
.read())
826 'description': 'Test bundle',
827 'bundle': 'kubernetes',
830 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
833 'password': 'manopw',
834 'root_password': 'osm4u',
837 'series': 'kubernetes'
842 # TODO: This should be returned in an agreed-upon format
843 kdu
= bundle
["applications"]
853 If available, returns the README of the bundle.
855 :param kdu_model str: The name or path of a bundle
857 :return: If found, returns the contents of the README.
861 files
= ["README", "README.txt", "README.md"]
862 path
= os
.path
.dirname(kdu_model
)
863 for file in os
.listdir(path
):
865 with
open(file, "r") as f
:
871 async def status_kdu(
876 """Get the status of the KDU
878 Get the current status of the KDU instance.
880 :param cluster_uuid str: The UUID of the cluster
881 :param kdu_instance str: The unique id of the KDU instance
883 :return: Returns a dictionary containing namespace, state, resources,
887 # controller = await self.get_controller(cluster_uuid)
888 # model = await self.get_model(kdu_instance, controller=controller)
890 # model_status = await model.get_status()
891 # status = model_status.applications
892 model_status
= await self
.libjuju
.get_model_status(kdu_instance
)
893 for name
in model_status
.applications
:
894 application
= model_status
.applications
[name
]
895 status
[name
] = {"status": application
["status"]["status"]}
897 # await model.disconnect()
898 # await controller.disconnect()
902 async def get_services(
903 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
905 """Return a list of services of a kdu_instance"""
907 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
909 # config_path = "/tmp/{}".format(cluster_uuid)
910 # config_file = "{}/config".format(config_path)
912 # if not os.path.exists(config_path):
913 # os.makedirs(config_path)
914 # with open(config_file, "w") as f:
915 # f.write(credentials)
917 kubecfg
= tempfile
.NamedTemporaryFile()
918 with
open(kubecfg
.name
, "w") as kubecfg_file
:
919 kubecfg_file
.write(credentials
)
920 kubectl
= Kubectl(config_file
=kubecfg
.name
)
922 return kubectl
.get_services(
923 field_selector
="metadata.namespace={}".format(kdu_instance
)
926 async def get_service(
927 self
, cluster_uuid
: str, service_name
: str, namespace
: str
929 """Return data for a specific service inside a namespace"""
931 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
933 # config_path = "/tmp/{}".format(cluster_uuid)
934 # config_file = "{}/config".format(config_path)
936 # if not os.path.exists(config_path):
937 # os.makedirs(config_path)
938 # with open(config_file, "w") as f:
939 # f.write(credentials)
941 kubecfg
= tempfile
.NamedTemporaryFile()
942 with
open(kubecfg
.name
, "w") as kubecfg_file
:
943 kubecfg_file
.write(credentials
)
944 kubectl
= Kubectl(config_file
=kubecfg
.name
)
946 return kubectl
.get_services(
947 field_selector
="metadata.name={},metadata.namespace={}".format(
948 service_name
, namespace
953 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
954 # """Add a k8s cloud to Juju
956 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
959 # :param cloud_name str: The name of the cloud to add.
960 # :param credentials dict: A dictionary representing the output of
961 # `kubectl config view --raw`.
963 # :returns: True if successful, otherwise raises an exception.
966 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
967 # self.log.debug(cmd)
969 # process = await asyncio.create_subprocess_exec(
971 # stdout=asyncio.subprocess.PIPE,
972 # stderr=asyncio.subprocess.PIPE,
973 # stdin=asyncio.subprocess.PIPE,
976 # # Feed the process the credentials
977 # process.stdin.write(credentials.encode("utf-8"))
978 # await process.stdin.drain()
979 # process.stdin.close()
981 # _stdout, stderr = await process.communicate()
983 # return_code = process.returncode
985 # self.log.debug("add-k8s return code: {}".format(return_code))
987 # if return_code > 0:
988 # raise Exception(stderr)
992 # async def add_model(
993 # self, model_name: str, cluster_uuid: str, controller: Controller
995 # """Adds a model to the controller
997 # Adds a new model to the Juju controller
999 # :param model_name str: The name of the model to add.
1000 # :param cluster_uuid str: ID of the cluster.
1001 # :param controller: Controller object in which the model will be added
1002 # :returns: The juju.model.Model object of the new model upon success or
1003 # raises an exception.
1007 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
1011 # if self.juju_public_key is not None:
1012 # model = await controller.add_model(
1013 # model_name, config={"authorized-keys": self.juju_public_key}
1016 # model = await controller.add_model(model_name)
1017 # except Exception as ex:
1018 # self.log.debug(ex)
1019 # self.log.debug("Caught exception: {}".format(ex))
1024 # async def bootstrap(
1025 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
1027 # """Bootstrap a Kubernetes controller
1029 # Bootstrap a Juju controller inside the Kubernetes cluster
1031 # :param cloud_name str: The name of the cloud.
1032 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1033 # :param loadbalancer bool: If the controller should use loadbalancer or not.
1034 # :returns: True upon success or raises an exception.
1037 # if not loadbalancer:
1038 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1041 # For public clusters, specify that the controller service is using a
1045 # self.juju_command,
1050 # "controller-service-type=loadbalancer",
1054 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1057 # process = await asyncio.create_subprocess_exec(
1058 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1061 # _stdout, stderr = await process.communicate()
1063 # return_code = process.returncode
1065 # if return_code > 0:
1067 # if b"already exists" not in stderr:
1068 # raise Exception(stderr)
1072 # async def destroy_controller(self, cluster_uuid: str) -> bool:
1073 # """Destroy a Kubernetes controller
1075 # Destroy an existing Kubernetes controller.
1077 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1078 # :returns: True upon success or raises an exception.
1081 # self.juju_command,
1082 # "destroy-controller",
1083 # "--destroy-all-models",
1084 # "--destroy-storage",
1089 # process = await asyncio.create_subprocess_exec(
1090 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1093 # _stdout, stderr = await process.communicate()
1095 # return_code = process.returncode
1097 # if return_code > 0:
1099 # if "already exists" not in stderr:
1100 # raise Exception(stderr)
1102 def get_credentials(self
, cluster_uuid
: str) -> str:
1104 Get Cluster Kubeconfig
1106 k8scluster
= self
.db
.get_one(
1107 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
1110 self
.db
.encrypt_decrypt_fields(
1111 k8scluster
.get("credentials"),
1113 ["password", "secret"],
1114 schema_version
=k8scluster
["schema_version"],
1115 salt
=k8scluster
["_id"],
1118 return yaml
.safe_dump(k8scluster
.get("credentials"))
1120 def _get_credential_name(self
, cluster_uuid
: str) -> str:
1122 Get credential name for a k8s cloud
1124 We cannot use the cluster_uuid for the credential name directly,
1125 because it cannot start with a number, it must start with a letter.
1126 Therefore, the k8s cloud credential name will be "cred-" followed
1127 by the cluster uuid.
1129 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1131 :return: Name to use for the credential name.
1133 return "cred-{}".format(cluster_uuid
)
1135 # def get_config(self, cluster_uuid: str,) -> dict:
1136 # """Get the cluster configuration
1138 # Gets the configuration of the cluster
1140 # :param cluster_uuid str: The UUID of the cluster.
1141 # :return: A dict upon success, or raises an exception.
1144 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1146 # for k in juju_db["k8sclusters"]:
1147 # if k["_id"] == cluster_uuid:
1148 # config = k["config"]
1149 # self.db.encrypt_decrypt_fields(
1152 # ["secret", "cacert"],
1153 # schema_version="1.1",
1159 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1163 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1164 # """Get a model from the Juju Controller.
1166 # Note: Model objects returned must call disconnected() before it goes
1169 # :param model_name str: The name of the model to get
1170 # :param controller Controller: Controller object
1171 # :return The juju.model.Model object if found, or None.
1174 # models = await controller.list_models()
1175 # if model_name not in models:
1176 # raise N2VCNotFound("Model {} not found".format(model_name))
1177 # self.log.debug("Found model: {}".format(model_name))
1178 # return await controller.get_model(model_name)
1184 """Get the namespace UUID
1185 Gets the namespace's unique name
1187 :param cluster_uuid str: The UUID of the cluster
1188 :returns: The namespace UUID, or raises an exception
1190 # config = self.get_config(cluster_uuid)
1192 # Make sure the name is in the config
1193 # if "namespace" not in config:
1194 # raise Exception("Namespace not found.")
1196 # TODO: We want to make sure this is unique to the cluster, in case
1197 # the cluster is being reused.
1198 # Consider pre/appending the cluster id to the namespace string
1201 # TODO: Remove these lines of code
1202 # async def has_model(self, model_name: str) -> bool:
1203 # """Check if a model exists in the controller
1205 # Checks to see if a model exists in the connected Juju controller.
1207 # :param model_name str: The name of the model
1208 # :return: A boolean indicating if the model exists
1210 # models = await self.controller.list_models()
1212 # if model_name in models:
1216 # def is_local_k8s(self, credentials: str,) -> bool:
1217 # """Check if a cluster is local
1219 # Checks if a cluster is running in the local host
1221 # :param credentials dict: A dictionary containing the k8s credentials
1222 # :returns: A boolean if the cluster is running locally
1225 # creds = yaml.safe_load(credentials)
1227 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1228 # for cluster in creds["clusters"]:
1229 # if "server" in cluster["cluster"]:
1230 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1235 # async def get_controller(self, cluster_uuid):
1236 # """Login to the Juju controller."""
1238 # config = self.get_config(cluster_uuid)
1240 # juju_endpoint = config["endpoint"]
1241 # juju_user = config["username"]
1242 # juju_secret = config["secret"]
1243 # juju_ca_cert = config["cacert"]
1245 # controller = Controller()
1249 # "Connecting to controller... ws://{} as {}".format(
1250 # juju_endpoint, juju_user,
1254 # await controller.connect(
1255 # endpoint=juju_endpoint,
1256 # username=juju_user,
1257 # password=juju_secret,
1258 # cacert=juju_ca_cert,
1260 # self.log.debug("JujuApi: Logged into controller")
1262 # except Exception as ex:
1263 # self.log.debug(ex)
1264 # self.log.debug("Caught exception: {}".format(ex))
1266 # self.log.fatal("VCA credentials not configured.")
1268 # TODO: Remove these commented lines
1269 # self.authenticated = False
1270 # if self.authenticated:
1273 # self.connecting = True
1274 # juju_public_key = None
1275 # self.authenticated = True
1276 # Test: Make sure we have the credentials loaded
1277 # async def logout(self):
1278 # """Logout of the Juju controller."""
1279 # self.log.debug("[logout]")
1280 # if not self.authenticated:
1283 # for model in self.models:
1284 # self.log.debug("Logging out of model {}".format(model))
1285 # await self.models[model].disconnect()
1287 # if self.controller:
1288 # self.log.debug("Disconnecting controller {}".format(self.controller))
1289 # await self.controller.disconnect()
1290 # self.controller = None
1292 # self.authenticated = False
1294 # async def remove_cloud(self, cloud_name: str,) -> bool:
1295 # """Remove a k8s cloud from Juju
1297 # Removes a Kubernetes cloud from Juju.
1299 # :param cloud_name str: The name of the cloud to add.
1301 # :returns: True if successful, otherwise raises an exception.
1304 # # Remove the bootstrapped controller
1305 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1306 # process = await asyncio.create_subprocess_exec(
1307 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1310 # _stdout, stderr = await process.communicate()
1312 # return_code = process.returncode
1314 # if return_code > 0:
1315 # raise Exception(stderr)
1317 # # Remove the cloud from the local config
1318 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1319 # process = await asyncio.create_subprocess_exec(
1320 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1323 # _stdout, stderr = await process.communicate()
1325 # return_code = process.returncode
1327 # if return_code > 0:
1328 # raise Exception(stderr)
1332 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1333 # """Save the cluster configuration
1335 # Saves the cluster information to the Mongo database
1337 # :param cluster_uuid str: The UUID of the cluster
1338 # :param config dict: A dictionary containing the cluster configuration
1341 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1343 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1344 # self.db.encrypt_decrypt_fields(
1347 # ["secret", "cacert"],
1348 # schema_version="1.1",
1349 # salt=cluster_uuid,
1351 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1354 # q_filter={"_id": "juju"},
1355 # update_dict={"k8sclusters": k8sclusters},
1358 # Private methods to create/delete needed resources in the
1359 # Kubernetes cluster to create the K8s cloud in Juju
1361 def _create_cluster_role(
1365 labels
: Dict
[str, str],
1367 cluster_roles
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role(
1368 field_selector
="metadata.name={}".format(name
)
1371 if len(cluster_roles
.items
) > 0:
1373 "Cluster role with metadata.name={} already exists".format(name
)
1376 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
1378 cluster_role
= V1ClusterRole(
1381 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
1382 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
1386 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
1388 def _delete_cluster_role(self
, kubectl
: Kubectl
, name
: str):
1389 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
1391 def _create_service_account(
1395 labels
: Dict
[str, str],
1397 service_accounts
= kubectl
.clients
[CORE_CLIENT
].list_namespaced_service_account(
1398 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
1400 if len(service_accounts
.items
) > 0:
1402 "Service account with metadata.name={} already exists".format(name
)
1405 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
1406 service_account
= V1ServiceAccount(metadata
=metadata
)
1408 kubectl
.clients
[CORE_CLIENT
].create_namespaced_service_account(
1409 ADMIN_NAMESPACE
, service_account
1412 def _delete_service_account(self
, kubectl
: Kubectl
, name
: str):
1413 kubectl
.clients
[CORE_CLIENT
].delete_namespaced_service_account(
1414 name
, ADMIN_NAMESPACE
1417 def _create_cluster_role_binding(
1421 labels
: Dict
[str, str],
1423 role_bindings
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
1424 field_selector
="metadata.name={}".format(name
)
1426 if len(role_bindings
.items
) > 0:
1427 raise Exception("Generated rbac id already exists")
1429 role_binding
= V1ClusterRoleBinding(
1430 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
1431 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
1433 V1Subject(kind
="ServiceAccount", name
=name
, namespace
=ADMIN_NAMESPACE
)
1436 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
1438 def _delete_cluster_role_binding(self
, kubectl
: Kubectl
, name
: str):
1439 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
1441 async def _get_secret_data(self
, kubectl
: Kubectl
, name
: str) -> (str, str):
1442 v1_core
= kubectl
.clients
[CORE_CLIENT
]
1448 service_accounts
= v1_core
.list_namespaced_service_account(
1449 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
1451 if len(service_accounts
.items
) == 0:
1453 "Service account not found with metadata.name={}".format(name
)
1455 service_account
= service_accounts
.items
[0]
1456 if service_account
.secrets
and len(service_account
.secrets
) > 0:
1457 secret_name
= service_account
.secrets
[0].name
1458 if secret_name
is not None or not retries_limit
:
1462 "Failed getting the secret from service account {}".format(name
)
1464 secret
= v1_core
.list_namespaced_secret(
1466 field_selector
="metadata.name={}".format(secret_name
),
1469 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
1470 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
1473 base64
.b64decode(token
).decode("utf-8"),
1474 base64
.b64decode(client_certificate_data
).decode("utf-8"),
1478 def generate_kdu_instance_name(**kwargs
):
1479 db_dict
= kwargs
.get("db_dict")
1480 kdu_name
= kwargs
.get("kdu_name", None)
1482 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
1484 kdu_instance
= db_dict
["filter"]["_id"]