1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from n2vc
.config
import ModelConfig
24 from n2vc
.exceptions
import K8sException
, N2VCBadArgumentsException
25 from n2vc
.k8s_conn
import K8sConnector
26 from n2vc
.kubectl
import Kubectl
, CORE_CLIENT
, RBAC_CLIENT
27 from .exceptions
import MethodNotImplemented
28 from n2vc
.utils
import base64_to_cacert
29 from n2vc
.libjuju
import Libjuju
31 from kubernetes
.client
.models
import (
41 from typing
import Dict
43 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
44 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
45 RBAC_LABEL_KEY_NAME
= "rbac-id"
47 ADMIN_NAMESPACE
= "kube-system"
48 RBAC_STACK_PREFIX
= "juju-credential"
50 # from juju.bundle import BundleHandler
53 # from .vnf import N2VC
56 def generate_rbac_id():
57 return binascii
.hexlify(os
.urandom(4)).decode()
60 class K8sJujuConnector(K8sConnector
):
65 kubectl_command
: str = "/usr/bin/kubectl",
66 juju_command
: str = "/usr/bin/juju",
70 vca_config
: dict = None,
73 :param fs: file system for kubernetes and helm configuration
74 :param db: Database object
75 :param kubectl_command: path to kubectl executable
76 :param helm_command: path to helm executable
78 :param: loop: Asyncio loop
82 K8sConnector
.__init
__(
86 on_update_db
=on_update_db
,
90 self
.loop
= loop
or asyncio
.get_event_loop()
91 self
.log
.debug("Initializing K8S Juju connector")
93 required_vca_config
= [
99 if not vca_config
or not all(k
in vca_config
for k
in required_vca_config
):
100 raise N2VCBadArgumentsException(
101 message
="Missing arguments in vca_config: {}".format(vca_config
),
102 bad_args
=required_vca_config
,
104 port
= vca_config
["port"] if "port" in vca_config
else 17070
105 url
= "{}:{}".format(vca_config
["host"], port
)
106 model_config
= ModelConfig(vca_config
)
107 username
= vca_config
["user"]
108 secret
= vca_config
["secret"]
109 ca_cert
= base64_to_cacert(vca_config
["ca_cert"])
111 self
.libjuju
= Libjuju(
113 api_proxy
=None, # Not needed for k8s charms
114 model_config
=model_config
,
122 self
.log
.debug("K8S Juju connector initialized")
123 # TODO: Remove these commented lines:
124 # self.authenticated = False
126 # self.juju_secret = ""
133 namespace
: str = "kube-system",
134 reuse_cluster_uuid
: str = None,
137 It prepares a given K8s cluster environment to run Juju bundles.
139 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
141 :param namespace: optional namespace to be used for juju. By default,
142 'kube-system' will be used
143 :param reuse_cluster_uuid: existing cluster uuid for reuse
144 :return: uuid of the K8s cluster and True if connector has installed some
145 software in the cluster
146 (on error, an exception will be raised)
151 # Bootstrapping cannot be done, by design, through the API. We need to
158 # 1. Has the environment already been bootstrapped?
159 # - Check the database to see if we have a record for this env
161 # 2. If this is a new env, create it
162 # - Add the k8s cloud to Juju
164 # - Record it in the database
166 # 3. Connect to the Juju controller for this cloud
169 # cluster_uuid = reuse_cluster_uuid
170 # if not cluster_uuid:
171 # cluster_uuid = str(uuid4())
173 ##################################################
174 # TODO: Pull info from db based on the namespace #
175 ##################################################
177 ###################################################
178 # TODO: Make it idempotent, calling add-k8s and #
179 # bootstrap whenever reuse_cluster_uuid is passed #
181 # `init_env` is called to initialize the K8s #
182 # cluster for juju. If this initialization fails, #
183 # it can be called again by LCM with the param #
184 # reuse_cluster_uuid, e.g. to try to fix it. #
185 ###################################################
187 # This is a new cluster, so bootstrap it
189 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
191 # Is a local k8s cluster?
192 # localk8s = self.is_local_k8s(k8s_creds)
194 # If the k8s is external, the juju controller needs a loadbalancer
195 # loadbalancer = False if localk8s else True
197 # Name the new k8s cloud
198 # k8s_cloud = "k8s-{}".format(cluster_uuid)
200 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
201 # await self.add_k8s(k8s_cloud, k8s_creds)
203 # Bootstrap Juju controller
204 # self.log.debug("Bootstrapping...")
205 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
206 # self.log.debug("Bootstrap done.")
208 # Get the controller information
210 # Parse ~/.local/share/juju/controllers.yaml
211 # controllers.testing.api-endpoints|ca-cert|uuid
212 # self.log.debug("Getting controller endpoints")
213 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
214 # controllers = yaml.load(f, Loader=yaml.Loader)
215 # controller = controllers["controllers"][cluster_uuid]
216 # endpoints = controller["api-endpoints"]
217 # juju_endpoint = endpoints[0]
218 # juju_ca_cert = controller["ca-cert"]
220 # Parse ~/.local/share/juju/accounts
221 # controllers.testing.user|password
222 # self.log.debug("Getting accounts")
223 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
224 # controllers = yaml.load(f, Loader=yaml.Loader)
225 # controller = controllers["controllers"][cluster_uuid]
227 # juju_user = controller["user"]
228 # juju_secret = controller["password"]
231 # "endpoint": juju_endpoint,
232 # "username": juju_user,
233 # "secret": juju_secret,
234 # "cacert": juju_ca_cert,
235 # "loadbalancer": loadbalancer,
238 # Store the cluster configuration so it
239 # can be used for subsequent calls
240 kubecfg
= tempfile
.NamedTemporaryFile()
241 with
open(kubecfg
.name
, "w") as kubecfg_file
:
242 kubecfg_file
.write(k8s_creds
)
243 kubectl
= Kubectl(config_file
=kubecfg
.name
)
245 # CREATING RESOURCES IN K8S
246 rbac_id
= generate_rbac_id()
247 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
248 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
250 # Create cleanup dictionary to clean up created resources
251 # if it fails in the middle of the process
254 self
._create
_cluster
_role
(
261 "delete": self
._delete
_cluster
_role
,
262 "args": (kubectl
, metadata_name
),
266 self
._create
_service
_account
(
273 "delete": self
._delete
_service
_account
,
274 "args": (kubectl
, metadata_name
),
278 self
._create
_cluster
_role
_binding
(
285 "delete": self
._delete
_service
_account
,
286 "args": (kubectl
, metadata_name
),
289 token
, client_cert_data
= await self
._get
_secret
_data
(
294 default_storage_class
= kubectl
.get_default_storage_class()
295 await self
.libjuju
.add_k8s(
299 client_cert_data
=client_cert_data
,
300 configuration
=kubectl
.configuration
,
301 storage_class
=default_storage_class
,
302 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
304 # self.log.debug("Setting config")
305 # await self.set_config(cluster_uuid, config)
308 # controller = await self.get_controller(cluster_uuid)
309 # await controller.disconnect()
311 # TODO: Remove these commented lines
312 # raise Exception("EOL")
313 # self.juju_public_key = None
314 # Login to the k8s cluster
315 # if not self.authenticated:
316 # await self.login(cluster_uuid)
318 # We're creating a new cluster
319 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
320 # cluster_uuid=cluster_uuid))
321 # model = await self.get_model(
322 # self.get_namespace(cluster_uuid),
323 # cluster_uuid=cluster_uuid
326 # Disconnect from the model
327 # if model and model.is_connected():
328 # await model.disconnect()
330 return cluster_uuid
, True
331 except Exception as e
:
332 self
.log
.error("Error initializing k8scluster: {}".format(e
))
333 if len(cleanup_data
) > 0:
334 self
.log
.debug("Cleaning up created resources in k8s cluster...")
335 for item
in cleanup_data
:
336 delete_function
= item
["delete"]
337 delete_args
= item
["args"]
338 delete_function(*delete_args
)
339 self
.log
.debug("Cleanup finished")
342 """Repo Management"""
348 _type
: str = "charm",
350 raise MethodNotImplemented()
352 async def repo_list(self
):
353 raise MethodNotImplemented()
355 async def repo_remove(
359 raise MethodNotImplemented()
361 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
363 Returns None as currently add_repo is not implemented
370 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
374 Resets the Kubernetes cluster by removing the model that represents it.
376 :param cluster_uuid str: The UUID of the cluster to reset
377 :return: Returns True if successful or raises an exception.
381 # Remove k8scluster from database
382 # self.log.debug("[reset] Removing k8scluster from juju database")
383 # juju_db = self.db.get_one("admin", {"_id": "juju"})
385 # for k in juju_db["k8sclusters"]:
386 # if k["_id"] == cluster_uuid:
387 # juju_db["k8sclusters"].remove(k)
390 # q_filter={"_id": "juju"},
391 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
395 # Destroy the controller (via CLI)
396 # self.log.debug("[reset] Destroying controller")
397 # await self.destroy_controller(cluster_uuid)
398 self
.log
.debug("[reset] Removing k8s cloud")
399 # k8s_cloud = "k8s-{}".format(cluster_uuid)
400 # await self.remove_cloud(k8s_cloud)
402 cloud_creds
= await self
.libjuju
.get_cloud_credentials(
404 self
._get
_credential
_name
(cluster_uuid
),
407 await self
.libjuju
.remove_cloud(cluster_uuid
)
409 kubecfg
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
411 kubecfg_file
= tempfile
.NamedTemporaryFile()
412 with
open(kubecfg_file
.name
, "w") as f
:
414 kubectl
= Kubectl(config_file
=kubecfg_file
.name
)
417 self
._delete
_cluster
_role
_binding
,
418 self
._delete
_service
_account
,
419 self
._delete
_cluster
_role
,
422 credential_attrs
= cloud_creds
[0].result
["attrs"]
423 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
424 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
425 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
426 delete_args
= (kubectl
, metadata_name
)
427 for delete_func
in delete_functions
:
429 delete_func(*delete_args
)
430 except Exception as e
:
431 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
433 except Exception as e
:
434 self
.log
.debug("Caught exception during reset: {}".format(e
))
437 # TODO: Remove these commented lines
438 # if not self.authenticated:
439 # await self.login(cluster_uuid)
441 # if self.controller.is_connected():
442 # # Destroy the model
443 # namespace = self.get_namespace(cluster_uuid)
444 # if await self.has_model(namespace):
445 # self.log.debug("[reset] Destroying model")
446 # await self.controller.destroy_model(namespace, destroy_storage=True)
448 # # Disconnect from the controller
449 # self.log.debug("[reset] Disconnecting controller")
450 # await self.logout()
460 timeout
: float = 1800,
462 db_dict
: dict = None,
463 kdu_name
: str = None,
464 namespace
: str = None,
468 :param cluster_uuid str: The UUID of the cluster to install to
469 :param kdu_model str: The name or path of a bundle to install
470 :param kdu_instance: Kdu instance name
471 :param atomic bool: If set, waits until the model is active and resets
472 the cluster on failure.
473 :param timeout int: The time, in seconds, to wait for the install
475 :param params dict: Key-value pairs of instantiation parameters
476 :param kdu_name: Name of the KDU instance to be installed
477 :param namespace: K8s namespace to use for the KDU instance
479 :return: If successful, returns ?
483 # controller = await self.get_controller(cluster_uuid)
486 # Get or create the model, based on the NS
490 raise K8sException("db_dict must be set")
492 raise K8sException("bundle must be set")
494 if bundle
.startswith("cs:"):
496 elif bundle
.startswith("http"):
500 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
501 os
.chdir(new_workdir
)
502 bundle
= "local:{}".format(kdu_model
)
504 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
506 # Create the new model
507 self
.log
.debug("Adding model: {}".format(kdu_instance
))
508 await self
.libjuju
.add_model(
509 model_name
=kdu_instance
,
510 cloud_name
=cluster_uuid
,
511 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
515 # TODO: Instantiation parameters
518 "Juju bundle that models the KDU, in any of the following ways:
519 - <juju-repo>/<juju-bundle>
520 - <juju-bundle folder under k8s_models folder in the package>
521 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
523 - <URL_where_to_fetch_juju_bundle>
526 previous_workdir
= os
.getcwd()
527 except FileNotFoundError
:
528 previous_workdir
= "/app/storage"
530 self
.log
.debug("[install] deploying {}".format(bundle
))
531 await self
.libjuju
.deploy(
532 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
535 # Get the application
537 # # applications = model.applications
538 # self.log.debug("[install] Applications: {}".format(model.applications))
539 # for name in model.applications:
540 # self.log.debug("[install] Waiting for {} to settle".format(name))
541 # application = model.applications[name]
543 # # It's not enough to wait for all units to be active;
544 # # the application status needs to be active as well.
545 # self.log.debug("Waiting for all units to be active...")
546 # await model.block_until(
548 # unit.agent_status == "idle"
549 # and application.status in ["active", "unknown"]
550 # and unit.workload_status in ["active", "unknown"]
551 # for unit in application.units
555 # self.log.debug("All units active.")
557 # # TODO use asyncio.TimeoutError
558 # except concurrent.futures._base.TimeoutError:
559 # os.chdir(previous_workdir)
560 # self.log.debug("[install] Timeout exceeded; resetting cluster")
561 # await self.reset(cluster_uuid)
564 # Wait for the application to be active
565 # if model.is_connected():
566 # self.log.debug("[install] Disconnecting model")
567 # await model.disconnect()
568 # await controller.disconnect()
569 os
.chdir(previous_workdir
)
572 async def instances_list(self
, cluster_uuid
: str) -> list:
574 returns a list of deployed releases in a cluster
576 :param cluster_uuid: the cluster
585 kdu_model
: str = None,
590 :param cluster_uuid str: The UUID of the cluster to upgrade
591 :param kdu_instance str: The unique name of the KDU instance
592 :param kdu_model str: The name or path of the bundle to upgrade to
593 :param params dict: Key-value pairs of instantiation parameters
595 :return: If successful, reference to the new revision number of the
599 # TODO: Loop through the bundle and upgrade each charm individually
602 The API doesn't have a concept of bundle upgrades, because there are
603 many possible changes: charm revision, disk, number of units, etc.
605 As such, we are only supporting a limited subset of upgrades. We'll
606 upgrade the charm revision but leave storage and scale untouched.
608 Scale changes should happen through OSM constructs, and changes to
609 storage would require a redeployment of the service, at least in this
612 raise MethodNotImplemented()
613 # TODO: Remove these commented lines
615 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
618 # namespace = self.get_namespace(cluster_uuid)
619 # controller = await self.get_controller(cluster_uuid)
622 # if namespace not in await controller.list_models():
623 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
625 # model = await controller.get_model(namespace)
626 # with open(kdu_model, "r") as f:
627 # bundle = yaml.safe_load(f)
631 # 'description': 'Test bundle',
632 # 'bundle': 'kubernetes',
635 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
638 # 'password': 'manopw',
639 # 'root_password': 'osm4u',
642 # 'series': 'kubernetes'
647 # # TODO: This should be returned in an agreed-upon format
648 # for name in bundle["applications"]:
649 # self.log.debug(model.applications)
650 # application = model.applications[name]
651 # self.log.debug(application)
653 # path = bundle["applications"][name]["charm"]
656 # await application.upgrade_charm(switch=path)
657 # except juju.errors.JujuError as ex:
658 # if "already running charm" in str(ex):
659 # # We're already running this version
663 # await model.disconnect()
664 # await controller.disconnect()
677 :param cluster_uuid str: The UUID of the cluster to rollback
678 :param kdu_instance str: The unique name of the KDU instance
679 :param revision int: The revision to revert to. If omitted, rolls back
680 the previous upgrade.
682 :return: If successful, returns the revision of active KDU instance,
683 or raises an exception
685 raise MethodNotImplemented()
689 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
690 """Uninstall a KDU instance
692 :param cluster_uuid str: The UUID of the cluster
693 :param kdu_instance str: The unique name of the KDU instance
695 :return: Returns True if successful, or raises an exception
698 # controller = await self.get_controller(cluster_uuid)
700 self
.log
.debug("[uninstall] Destroying model")
702 await self
.libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
704 # self.log.debug("[uninstall] Model destroyed and disconnecting")
705 # await controller.disconnect()
708 # TODO: Remove these commented lines
709 # if not self.authenticated:
710 # self.log.debug("[uninstall] Connecting to controller")
711 # await self.login(cluster_uuid)
713 async def exec_primitive(
715 cluster_uuid
: str = None,
716 kdu_instance
: str = None,
717 primitive_name
: str = None,
718 timeout
: float = 300,
720 db_dict
: dict = None,
722 """Exec primitive (Juju action)
724 :param cluster_uuid str: The UUID of the cluster
725 :param kdu_instance str: The unique name of the KDU instance
726 :param primitive_name: Name of action that will be executed
727 :param timeout: Timeout for action execution
728 :param params: Dictionary of all the parameters needed for the action
729 :db_dict: Dictionary for any additional data
731 :return: Returns the output of the action
734 # controller = await self.get_controller(cluster_uuid)
736 if not params
or "application-name" not in params
:
738 "Missing application-name argument, \
739 argument needed for K8s actions"
743 "[exec_primitive] Getting model "
744 "kdu_instance: {}".format(kdu_instance
)
746 application_name
= params
["application-name"]
747 actions
= await self
.libjuju
.get_actions(application_name
, kdu_instance
)
748 if primitive_name
not in actions
:
749 raise K8sException("Primitive {} not found".format(primitive_name
))
750 output
, status
= await self
.libjuju
.execute_action(
751 application_name
, kdu_instance
, primitive_name
, **params
753 # model = await self.get_model(kdu_instance, controller=controller)
755 # application_name = params["application-name"]
756 # application = model.applications[application_name]
758 # actions = await application.get_actions()
759 # if primitive_name not in actions:
760 # raise K8sException("Primitive {} not found".format(primitive_name))
763 # for u in application.units:
764 # if await u.is_leader_from_status():
769 # raise K8sException("No leader unit found to execute action")
771 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
772 # action = await unit.run_action(primitive_name, **params)
774 # output = await model.get_action_output(action_uuid=action.entity_id)
775 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
778 # status[action.entity_id] if action.entity_id in status else "failed"
781 if status
!= "completed":
783 "status is not completed: {} output: {}".format(status
, output
)
788 except Exception as e
:
789 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
790 self
.log
.error(error_msg
)
791 raise K8sException(message
=error_msg
)
793 # await controller.disconnect()
794 # TODO: Remove these commented lines:
795 # if not self.authenticated:
796 # self.log.debug("[exec_primitive] Connecting to controller")
797 # await self.login(cluster_uuid)
801 async def inspect_kdu(
807 Inspects a bundle and returns a dictionary of config parameters and
808 their default values.
810 :param kdu_model str: The name or path of the bundle to inspect.
812 :return: If successful, returns a dictionary of available parameters
813 and their default values.
817 if not os
.path
.exists(kdu_model
):
818 raise K8sException("file {} not found".format(kdu_model
))
820 with
open(kdu_model
, "r") as f
:
821 bundle
= yaml
.safe_load(f
.read())
825 'description': 'Test bundle',
826 'bundle': 'kubernetes',
829 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
832 'password': 'manopw',
833 'root_password': 'osm4u',
836 'series': 'kubernetes'
841 # TODO: This should be returned in an agreed-upon format
842 kdu
= bundle
["applications"]
852 If available, returns the README of the bundle.
854 :param kdu_model str: The name or path of a bundle
856 :return: If found, returns the contents of the README.
860 files
= ["README", "README.txt", "README.md"]
861 path
= os
.path
.dirname(kdu_model
)
862 for file in os
.listdir(path
):
864 with
open(file, "r") as f
:
870 async def status_kdu(
875 """Get the status of the KDU
877 Get the current status of the KDU instance.
879 :param cluster_uuid str: The UUID of the cluster
880 :param kdu_instance str: The unique id of the KDU instance
882 :return: Returns a dictionary containing namespace, state, resources,
886 # controller = await self.get_controller(cluster_uuid)
887 # model = await self.get_model(kdu_instance, controller=controller)
889 # model_status = await model.get_status()
890 # status = model_status.applications
891 model_status
= await self
.libjuju
.get_model_status(kdu_instance
)
892 for name
in model_status
.applications
:
893 application
= model_status
.applications
[name
]
894 status
[name
] = {"status": application
["status"]["status"]}
896 # await model.disconnect()
897 # await controller.disconnect()
901 async def get_services(
902 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
904 """Return a list of services of a kdu_instance"""
906 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
908 # config_path = "/tmp/{}".format(cluster_uuid)
909 # config_file = "{}/config".format(config_path)
911 # if not os.path.exists(config_path):
912 # os.makedirs(config_path)
913 # with open(config_file, "w") as f:
914 # f.write(credentials)
916 kubecfg
= tempfile
.NamedTemporaryFile()
917 with
open(kubecfg
.name
, "w") as kubecfg_file
:
918 kubecfg_file
.write(credentials
)
919 kubectl
= Kubectl(config_file
=kubecfg
.name
)
921 return kubectl
.get_services(
922 field_selector
="metadata.namespace={}".format(kdu_instance
)
925 async def get_service(
926 self
, cluster_uuid
: str, service_name
: str, namespace
: str
928 """Return data for a specific service inside a namespace"""
930 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
932 # config_path = "/tmp/{}".format(cluster_uuid)
933 # config_file = "{}/config".format(config_path)
935 # if not os.path.exists(config_path):
936 # os.makedirs(config_path)
937 # with open(config_file, "w") as f:
938 # f.write(credentials)
940 kubecfg
= tempfile
.NamedTemporaryFile()
941 with
open(kubecfg
.name
, "w") as kubecfg_file
:
942 kubecfg_file
.write(credentials
)
943 kubectl
= Kubectl(config_file
=kubecfg
.name
)
945 return kubectl
.get_services(
946 field_selector
="metadata.name={},metadata.namespace={}".format(
947 service_name
, namespace
952 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
953 # """Add a k8s cloud to Juju
955 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
958 # :param cloud_name str: The name of the cloud to add.
959 # :param credentials dict: A dictionary representing the output of
960 # `kubectl config view --raw`.
962 # :returns: True if successful, otherwise raises an exception.
965 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
966 # self.log.debug(cmd)
968 # process = await asyncio.create_subprocess_exec(
970 # stdout=asyncio.subprocess.PIPE,
971 # stderr=asyncio.subprocess.PIPE,
972 # stdin=asyncio.subprocess.PIPE,
975 # # Feed the process the credentials
976 # process.stdin.write(credentials.encode("utf-8"))
977 # await process.stdin.drain()
978 # process.stdin.close()
980 # _stdout, stderr = await process.communicate()
982 # return_code = process.returncode
984 # self.log.debug("add-k8s return code: {}".format(return_code))
986 # if return_code > 0:
987 # raise Exception(stderr)
991 # async def add_model(
992 # self, model_name: str, cluster_uuid: str, controller: Controller
994 # """Adds a model to the controller
996 # Adds a new model to the Juju controller
998 # :param model_name str: The name of the model to add.
999 # :param cluster_uuid str: ID of the cluster.
1000 # :param controller: Controller object in which the model will be added
1001 # :returns: The juju.model.Model object of the new model upon success or
1002 # raises an exception.
1006 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
1010 # if self.juju_public_key is not None:
1011 # model = await controller.add_model(
1012 # model_name, config={"authorized-keys": self.juju_public_key}
1015 # model = await controller.add_model(model_name)
1016 # except Exception as ex:
1017 # self.log.debug(ex)
1018 # self.log.debug("Caught exception: {}".format(ex))
1023 # async def bootstrap(
1024 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
1026 # """Bootstrap a Kubernetes controller
1028 # Bootstrap a Juju controller inside the Kubernetes cluster
1030 # :param cloud_name str: The name of the cloud.
1031 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1032 # :param loadbalancer bool: If the controller should use loadbalancer or not.
1033 # :returns: True upon success or raises an exception.
1036 # if not loadbalancer:
1037 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1040 # For public clusters, specify that the controller service is using a
1044 # self.juju_command,
1049 # "controller-service-type=loadbalancer",
1053 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1056 # process = await asyncio.create_subprocess_exec(
1057 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1060 # _stdout, stderr = await process.communicate()
1062 # return_code = process.returncode
1064 # if return_code > 0:
1066 # if b"already exists" not in stderr:
1067 # raise Exception(stderr)
1071 # async def destroy_controller(self, cluster_uuid: str) -> bool:
1072 # """Destroy a Kubernetes controller
1074 # Destroy an existing Kubernetes controller.
1076 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1077 # :returns: True upon success or raises an exception.
1080 # self.juju_command,
1081 # "destroy-controller",
1082 # "--destroy-all-models",
1083 # "--destroy-storage",
1088 # process = await asyncio.create_subprocess_exec(
1089 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1092 # _stdout, stderr = await process.communicate()
1094 # return_code = process.returncode
1096 # if return_code > 0:
1098 # if "already exists" not in stderr:
1099 # raise Exception(stderr)
1101 def get_credentials(self
, cluster_uuid
: str) -> str:
1103 Get Cluster Kubeconfig
1105 k8scluster
= self
.db
.get_one(
1106 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
1109 self
.db
.encrypt_decrypt_fields(
1110 k8scluster
.get("credentials"),
1112 ["password", "secret"],
1113 schema_version
=k8scluster
["schema_version"],
1114 salt
=k8scluster
["_id"],
1117 return yaml
.safe_dump(k8scluster
.get("credentials"))
1119 def _get_credential_name(self
, cluster_uuid
: str) -> str:
1121 Get credential name for a k8s cloud
1123 We cannot use the cluster_uuid for the credential name directly,
1124 because it cannot start with a number, it must start with a letter.
1125 Therefore, the k8s cloud credential name will be "cred-" followed
1126 by the cluster uuid.
1128 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1130 :return: Name to use for the credential name.
1132 return "cred-{}".format(cluster_uuid
)
1134 # def get_config(self, cluster_uuid: str,) -> dict:
1135 # """Get the cluster configuration
1137 # Gets the configuration of the cluster
1139 # :param cluster_uuid str: The UUID of the cluster.
1140 # :return: A dict upon success, or raises an exception.
1143 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1145 # for k in juju_db["k8sclusters"]:
1146 # if k["_id"] == cluster_uuid:
1147 # config = k["config"]
1148 # self.db.encrypt_decrypt_fields(
1151 # ["secret", "cacert"],
1152 # schema_version="1.1",
1158 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1162 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1163 # """Get a model from the Juju Controller.
1165 # Note: Model objects returned must call disconnected() before it goes
1168 # :param model_name str: The name of the model to get
1169 # :param controller Controller: Controller object
1170 # :return The juju.model.Model object if found, or None.
1173 # models = await controller.list_models()
1174 # if model_name not in models:
1175 # raise N2VCNotFound("Model {} not found".format(model_name))
1176 # self.log.debug("Found model: {}".format(model_name))
1177 # return await controller.get_model(model_name)
1183 """Get the namespace UUID
1184 Gets the namespace's unique name
1186 :param cluster_uuid str: The UUID of the cluster
1187 :returns: The namespace UUID, or raises an exception
1189 # config = self.get_config(cluster_uuid)
1191 # Make sure the name is in the config
1192 # if "namespace" not in config:
1193 # raise Exception("Namespace not found.")
1195 # TODO: We want to make sure this is unique to the cluster, in case
1196 # the cluster is being reused.
1197 # Consider pre/appending the cluster id to the namespace string
1200 # TODO: Remove these lines of code
1201 # async def has_model(self, model_name: str) -> bool:
1202 # """Check if a model exists in the controller
1204 # Checks to see if a model exists in the connected Juju controller.
1206 # :param model_name str: The name of the model
1207 # :return: A boolean indicating if the model exists
1209 # models = await self.controller.list_models()
1211 # if model_name in models:
1215 # def is_local_k8s(self, credentials: str,) -> bool:
1216 # """Check if a cluster is local
1218 # Checks if a cluster is running in the local host
1220 # :param credentials dict: A dictionary containing the k8s credentials
1221 # :returns: A boolean if the cluster is running locally
1224 # creds = yaml.safe_load(credentials)
1226 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1227 # for cluster in creds["clusters"]:
1228 # if "server" in cluster["cluster"]:
1229 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1234 # async def get_controller(self, cluster_uuid):
1235 # """Login to the Juju controller."""
1237 # config = self.get_config(cluster_uuid)
1239 # juju_endpoint = config["endpoint"]
1240 # juju_user = config["username"]
1241 # juju_secret = config["secret"]
1242 # juju_ca_cert = config["cacert"]
1244 # controller = Controller()
1248 # "Connecting to controller... ws://{} as {}".format(
1249 # juju_endpoint, juju_user,
1253 # await controller.connect(
1254 # endpoint=juju_endpoint,
1255 # username=juju_user,
1256 # password=juju_secret,
1257 # cacert=juju_ca_cert,
1259 # self.log.debug("JujuApi: Logged into controller")
1261 # except Exception as ex:
1262 # self.log.debug(ex)
1263 # self.log.debug("Caught exception: {}".format(ex))
1265 # self.log.fatal("VCA credentials not configured.")
1267 # TODO: Remove these commented lines
1268 # self.authenticated = False
1269 # if self.authenticated:
1272 # self.connecting = True
1273 # juju_public_key = None
1274 # self.authenticated = True
1275 # Test: Make sure we have the credentials loaded
1276 # async def logout(self):
1277 # """Logout of the Juju controller."""
1278 # self.log.debug("[logout]")
1279 # if not self.authenticated:
1282 # for model in self.models:
1283 # self.log.debug("Logging out of model {}".format(model))
1284 # await self.models[model].disconnect()
1286 # if self.controller:
1287 # self.log.debug("Disconnecting controller {}".format(self.controller))
1288 # await self.controller.disconnect()
1289 # self.controller = None
1291 # self.authenticated = False
1293 # async def remove_cloud(self, cloud_name: str,) -> bool:
1294 # """Remove a k8s cloud from Juju
1296 # Removes a Kubernetes cloud from Juju.
1298 # :param cloud_name str: The name of the cloud to add.
1300 # :returns: True if successful, otherwise raises an exception.
1303 # # Remove the bootstrapped controller
1304 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1305 # process = await asyncio.create_subprocess_exec(
1306 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1309 # _stdout, stderr = await process.communicate()
1311 # return_code = process.returncode
1313 # if return_code > 0:
1314 # raise Exception(stderr)
1316 # # Remove the cloud from the local config
1317 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1318 # process = await asyncio.create_subprocess_exec(
1319 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1322 # _stdout, stderr = await process.communicate()
1324 # return_code = process.returncode
1326 # if return_code > 0:
1327 # raise Exception(stderr)
1331 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1332 # """Save the cluster configuration
1334 # Saves the cluster information to the Mongo database
1336 # :param cluster_uuid str: The UUID of the cluster
1337 # :param config dict: A dictionary containing the cluster configuration
1340 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1342 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1343 # self.db.encrypt_decrypt_fields(
1346 # ["secret", "cacert"],
1347 # schema_version="1.1",
1348 # salt=cluster_uuid,
1350 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1353 # q_filter={"_id": "juju"},
1354 # update_dict={"k8sclusters": k8sclusters},
1357 # Private methods to create/delete needed resources in the
1358 # Kubernetes cluster to create the K8s cloud in Juju
1360 def _create_cluster_role(
1364 labels
: Dict
[str, str],
1366 cluster_roles
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role(
1367 field_selector
="metadata.name={}".format(name
)
1370 if len(cluster_roles
.items
) > 0:
1372 "Cluster role with metadata.name={} already exists".format(name
)
1375 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
1377 cluster_role
= V1ClusterRole(
1380 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
1381 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
1385 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
1387 def _delete_cluster_role(self
, kubectl
: Kubectl
, name
: str):
1388 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
1390 def _create_service_account(
1394 labels
: Dict
[str, str],
1396 service_accounts
= kubectl
.clients
[CORE_CLIENT
].list_namespaced_service_account(
1397 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
1399 if len(service_accounts
.items
) > 0:
1401 "Service account with metadata.name={} already exists".format(name
)
1404 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
1405 service_account
= V1ServiceAccount(metadata
=metadata
)
1407 kubectl
.clients
[CORE_CLIENT
].create_namespaced_service_account(
1408 ADMIN_NAMESPACE
, service_account
1411 def _delete_service_account(self
, kubectl
: Kubectl
, name
: str):
1412 kubectl
.clients
[CORE_CLIENT
].delete_namespaced_service_account(
1413 name
, ADMIN_NAMESPACE
1416 def _create_cluster_role_binding(
1420 labels
: Dict
[str, str],
1422 role_bindings
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
1423 field_selector
="metadata.name={}".format(name
)
1425 if len(role_bindings
.items
) > 0:
1426 raise Exception("Generated rbac id already exists")
1428 role_binding
= V1ClusterRoleBinding(
1429 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
1430 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
1432 V1Subject(kind
="ServiceAccount", name
=name
, namespace
=ADMIN_NAMESPACE
)
1435 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
1437 def _delete_cluster_role_binding(self
, kubectl
: Kubectl
, name
: str):
1438 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
1440 async def _get_secret_data(self
, kubectl
: Kubectl
, name
: str) -> (str, str):
1441 v1_core
= kubectl
.clients
[CORE_CLIENT
]
1447 service_accounts
= v1_core
.list_namespaced_service_account(
1448 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
1450 if len(service_accounts
.items
) == 0:
1452 "Service account not found with metadata.name={}".format(name
)
1454 service_account
= service_accounts
.items
[0]
1455 if service_account
.secrets
and len(service_account
.secrets
) > 0:
1456 secret_name
= service_account
.secrets
[0].name
1457 if secret_name
is not None or not retries_limit
:
1461 "Failed getting the secret from service account {}".format(name
)
1463 secret
= v1_core
.list_namespaced_secret(
1465 field_selector
="metadata.name={}".format(secret_name
),
1468 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
1469 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
1472 base64
.b64decode(token
).decode("utf-8"),
1473 base64
.b64decode(client_certificate_data
).decode("utf-8"),
1477 def generate_kdu_instance_name(**kwargs
):
1478 db_dict
= kwargs
.get("db_dict")
1479 kdu_name
= kwargs
.get("kdu_name", None)
1481 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
1483 kdu_instance
= db_dict
["filter"]["_id"]