1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from n2vc
.config
import ModelConfig
24 from n2vc
.exceptions
import K8sException
, N2VCBadArgumentsException
25 from n2vc
.k8s_conn
import K8sConnector
26 from n2vc
.kubectl
import Kubectl
, CORE_CLIENT
, RBAC_CLIENT
27 from .exceptions
import MethodNotImplemented
28 from n2vc
.utils
import base64_to_cacert
29 from n2vc
.libjuju
import Libjuju
31 from kubernetes
.client
.models
import (
41 from typing
import Dict
43 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
44 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
45 RBAC_LABEL_KEY_NAME
= "rbac-id"
47 ADMIN_NAMESPACE
= "kube-system"
48 RBAC_STACK_PREFIX
= "juju-credential"
51 def generate_rbac_id():
52 return binascii
.hexlify(os
.urandom(4)).decode()
55 class K8sJujuConnector(K8sConnector
):
60 kubectl_command
: str = "/usr/bin/kubectl",
61 juju_command
: str = "/usr/bin/juju",
65 vca_config
: dict = None,
68 :param fs: file system for kubernetes and helm configuration
69 :param db: Database object
70 :param kubectl_command: path to kubectl executable
71 :param helm_command: path to helm executable
73 :param: loop: Asyncio loop
77 K8sConnector
.__init
__(
81 on_update_db
=on_update_db
,
85 self
.loop
= loop
or asyncio
.get_event_loop()
86 self
.log
.debug("Initializing K8S Juju connector")
88 required_vca_config
= [
94 if not vca_config
or not all(k
in vca_config
for k
in required_vca_config
):
95 raise N2VCBadArgumentsException(
96 message
="Missing arguments in vca_config: {}".format(vca_config
),
97 bad_args
=required_vca_config
,
99 port
= vca_config
["port"] if "port" in vca_config
else 17070
100 url
= "{}:{}".format(vca_config
["host"], port
)
101 model_config
= ModelConfig(vca_config
)
102 username
= vca_config
["user"]
103 secret
= vca_config
["secret"]
104 ca_cert
= base64_to_cacert(vca_config
["ca_cert"])
106 self
.libjuju
= Libjuju(
108 api_proxy
=None, # Not needed for k8s charms
109 model_config
=model_config
,
117 self
.log
.debug("K8S Juju connector initialized")
118 # TODO: Remove these commented lines:
119 # self.authenticated = False
121 # self.juju_secret = ""
128 namespace
: str = "kube-system",
129 reuse_cluster_uuid
: str = None,
132 It prepares a given K8s cluster environment to run Juju bundles.
134 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
136 :param namespace: optional namespace to be used for juju. By default,
137 'kube-system' will be used
138 :param reuse_cluster_uuid: existing cluster uuid for reuse
139 :return: uuid of the K8s cluster and True if connector has installed some
140 software in the cluster
141 (on error, an exception will be raised)
144 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
146 kubecfg
= tempfile
.NamedTemporaryFile()
147 with
open(kubecfg
.name
, "w") as kubecfg_file
:
148 kubecfg_file
.write(k8s_creds
)
149 kubectl
= Kubectl(config_file
=kubecfg
.name
)
151 # CREATING RESOURCES IN K8S
152 rbac_id
= generate_rbac_id()
153 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
154 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
156 # Create cleanup dictionary to clean up created resources
157 # if it fails in the middle of the process
160 self
._create
_cluster
_role
(
167 "delete": self
._delete
_cluster
_role
,
168 "args": (kubectl
, metadata_name
),
172 self
._create
_service
_account
(
179 "delete": self
._delete
_service
_account
,
180 "args": (kubectl
, metadata_name
),
184 self
._create
_cluster
_role
_binding
(
191 "delete": self
._delete
_service
_account
,
192 "args": (kubectl
, metadata_name
),
195 token
, client_cert_data
= await self
._get
_secret
_data
(
200 default_storage_class
= kubectl
.get_default_storage_class()
201 await self
.libjuju
.add_k8s(
205 client_cert_data
=client_cert_data
,
206 configuration
=kubectl
.configuration
,
207 storage_class
=default_storage_class
,
208 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
210 return cluster_uuid
, True
211 except Exception as e
:
212 self
.log
.error("Error initializing k8scluster: {}".format(e
))
213 if len(cleanup_data
) > 0:
214 self
.log
.debug("Cleaning up created resources in k8s cluster...")
215 for item
in cleanup_data
:
216 delete_function
= item
["delete"]
217 delete_args
= item
["args"]
218 delete_function(*delete_args
)
219 self
.log
.debug("Cleanup finished")
222 """Repo Management"""
228 _type
: str = "charm",
230 raise MethodNotImplemented()
232 async def repo_list(self
):
233 raise MethodNotImplemented()
235 async def repo_remove(
239 raise MethodNotImplemented()
241 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
243 Returns None as currently add_repo is not implemented
250 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
254 Resets the Kubernetes cluster by removing the model that represents it.
256 :param cluster_uuid str: The UUID of the cluster to reset
257 :return: Returns True if successful or raises an exception.
261 self
.log
.debug("[reset] Removing k8s cloud")
263 cloud_creds
= await self
.libjuju
.get_cloud_credentials(
265 self
._get
_credential
_name
(cluster_uuid
),
268 await self
.libjuju
.remove_cloud(cluster_uuid
)
270 kubecfg
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
272 kubecfg_file
= tempfile
.NamedTemporaryFile()
273 with
open(kubecfg_file
.name
, "w") as f
:
275 kubectl
= Kubectl(config_file
=kubecfg_file
.name
)
278 self
._delete
_cluster
_role
_binding
,
279 self
._delete
_service
_account
,
280 self
._delete
_cluster
_role
,
283 credential_attrs
= cloud_creds
[0].result
["attrs"]
284 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
285 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
286 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
287 delete_args
= (kubectl
, metadata_name
)
288 for delete_func
in delete_functions
:
290 delete_func(*delete_args
)
291 except Exception as e
:
292 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
294 except Exception as e
:
295 self
.log
.debug("Caught exception during reset: {}".format(e
))
307 timeout
: float = 1800,
309 db_dict
: dict = None,
310 kdu_name
: str = None,
311 namespace
: str = None,
315 :param cluster_uuid str: The UUID of the cluster to install to
316 :param kdu_model str: The name or path of a bundle to install
317 :param kdu_instance: Kdu instance name
318 :param atomic bool: If set, waits until the model is active and resets
319 the cluster on failure.
320 :param timeout int: The time, in seconds, to wait for the install
322 :param params dict: Key-value pairs of instantiation parameters
323 :param kdu_name: Name of the KDU instance to be installed
324 :param namespace: K8s namespace to use for the KDU instance
326 :return: If successful, returns ?
331 raise K8sException("db_dict must be set")
333 raise K8sException("bundle must be set")
335 if bundle
.startswith("cs:"):
337 elif bundle
.startswith("http"):
341 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
342 os
.chdir(new_workdir
)
343 bundle
= "local:{}".format(kdu_model
)
345 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
347 # Create the new model
348 self
.log
.debug("Adding model: {}".format(kdu_instance
))
349 await self
.libjuju
.add_model(
350 model_name
=kdu_instance
,
351 cloud_name
=cluster_uuid
,
352 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
356 # TODO: Instantiation parameters
359 "Juju bundle that models the KDU, in any of the following ways:
360 - <juju-repo>/<juju-bundle>
361 - <juju-bundle folder under k8s_models folder in the package>
362 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
364 - <URL_where_to_fetch_juju_bundle>
367 previous_workdir
= os
.getcwd()
368 except FileNotFoundError
:
369 previous_workdir
= "/app/storage"
371 self
.log
.debug("[install] deploying {}".format(bundle
))
372 await self
.libjuju
.deploy(
373 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
375 os
.chdir(previous_workdir
)
378 async def instances_list(self
, cluster_uuid
: str) -> list:
380 returns a list of deployed releases in a cluster
382 :param cluster_uuid: the cluster
391 kdu_model
: str = None,
396 :param cluster_uuid str: The UUID of the cluster to upgrade
397 :param kdu_instance str: The unique name of the KDU instance
398 :param kdu_model str: The name or path of the bundle to upgrade to
399 :param params dict: Key-value pairs of instantiation parameters
401 :return: If successful, reference to the new revision number of the
405 # TODO: Loop through the bundle and upgrade each charm individually
408 The API doesn't have a concept of bundle upgrades, because there are
409 many possible changes: charm revision, disk, number of units, etc.
411 As such, we are only supporting a limited subset of upgrades. We'll
412 upgrade the charm revision but leave storage and scale untouched.
414 Scale changes should happen through OSM constructs, and changes to
415 storage would require a redeployment of the service, at least in this
418 raise MethodNotImplemented()
430 :param cluster_uuid str: The UUID of the cluster to rollback
431 :param kdu_instance str: The unique name of the KDU instance
432 :param revision int: The revision to revert to. If omitted, rolls back
433 the previous upgrade.
435 :return: If successful, returns the revision of active KDU instance,
436 or raises an exception
438 raise MethodNotImplemented()
442 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
443 """Uninstall a KDU instance
445 :param cluster_uuid str: The UUID of the cluster
446 :param kdu_instance str: The unique name of the KDU instance
448 :return: Returns True if successful, or raises an exception
451 self
.log
.debug("[uninstall] Destroying model")
453 await self
.libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
455 # self.log.debug("[uninstall] Model destroyed and disconnecting")
456 # await controller.disconnect()
459 # TODO: Remove these commented lines
460 # if not self.authenticated:
461 # self.log.debug("[uninstall] Connecting to controller")
462 # await self.login(cluster_uuid)
464 async def exec_primitive(
466 cluster_uuid
: str = None,
467 kdu_instance
: str = None,
468 primitive_name
: str = None,
469 timeout
: float = 300,
471 db_dict
: dict = None,
473 """Exec primitive (Juju action)
475 :param cluster_uuid str: The UUID of the cluster
476 :param kdu_instance str: The unique name of the KDU instance
477 :param primitive_name: Name of action that will be executed
478 :param timeout: Timeout for action execution
479 :param params: Dictionary of all the parameters needed for the action
480 :db_dict: Dictionary for any additional data
482 :return: Returns the output of the action
485 if not params
or "application-name" not in params
:
487 "Missing application-name argument, \
488 argument needed for K8s actions"
492 "[exec_primitive] Getting model "
493 "kdu_instance: {}".format(kdu_instance
)
495 application_name
= params
["application-name"]
496 actions
= await self
.libjuju
.get_actions(application_name
, kdu_instance
)
497 if primitive_name
not in actions
:
498 raise K8sException("Primitive {} not found".format(primitive_name
))
499 output
, status
= await self
.libjuju
.execute_action(
500 application_name
, kdu_instance
, primitive_name
, **params
503 if status
!= "completed":
505 "status is not completed: {} output: {}".format(status
, output
)
510 except Exception as e
:
511 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
512 self
.log
.error(error_msg
)
513 raise K8sException(message
=error_msg
)
517 async def inspect_kdu(
523 Inspects a bundle and returns a dictionary of config parameters and
524 their default values.
526 :param kdu_model str: The name or path of the bundle to inspect.
528 :return: If successful, returns a dictionary of available parameters
529 and their default values.
533 if not os
.path
.exists(kdu_model
):
534 raise K8sException("file {} not found".format(kdu_model
))
536 with
open(kdu_model
, "r") as f
:
537 bundle
= yaml
.safe_load(f
.read())
541 'description': 'Test bundle',
542 'bundle': 'kubernetes',
545 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
548 'password': 'manopw',
549 'root_password': 'osm4u',
552 'series': 'kubernetes'
557 # TODO: This should be returned in an agreed-upon format
558 kdu
= bundle
["applications"]
568 If available, returns the README of the bundle.
570 :param kdu_model str: The name or path of a bundle
572 :return: If found, returns the contents of the README.
576 files
= ["README", "README.txt", "README.md"]
577 path
= os
.path
.dirname(kdu_model
)
578 for file in os
.listdir(path
):
580 with
open(file, "r") as f
:
586 async def status_kdu(
591 """Get the status of the KDU
593 Get the current status of the KDU instance.
595 :param cluster_uuid str: The UUID of the cluster
596 :param kdu_instance str: The unique id of the KDU instance
598 :return: Returns a dictionary containing namespace, state, resources,
602 model_status
= await self
.libjuju
.get_model_status(kdu_instance
)
603 for name
in model_status
.applications
:
604 application
= model_status
.applications
[name
]
605 status
[name
] = {"status": application
["status"]["status"]}
609 async def get_services(
610 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
612 """Return a list of services of a kdu_instance"""
614 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
616 kubecfg
= tempfile
.NamedTemporaryFile()
617 with
open(kubecfg
.name
, "w") as kubecfg_file
:
618 kubecfg_file
.write(credentials
)
619 kubectl
= Kubectl(config_file
=kubecfg
.name
)
621 return kubectl
.get_services(
622 field_selector
="metadata.namespace={}".format(kdu_instance
)
625 async def get_service(
626 self
, cluster_uuid
: str, service_name
: str, namespace
: str
628 """Return data for a specific service inside a namespace"""
630 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
632 kubecfg
= tempfile
.NamedTemporaryFile()
633 with
open(kubecfg
.name
, "w") as kubecfg_file
:
634 kubecfg_file
.write(credentials
)
635 kubectl
= Kubectl(config_file
=kubecfg
.name
)
637 return kubectl
.get_services(
638 field_selector
="metadata.name={},metadata.namespace={}".format(
639 service_name
, namespace
643 def get_credentials(self
, cluster_uuid
: str) -> str:
645 Get Cluster Kubeconfig
647 k8scluster
= self
.db
.get_one(
648 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
651 self
.db
.encrypt_decrypt_fields(
652 k8scluster
.get("credentials"),
654 ["password", "secret"],
655 schema_version
=k8scluster
["schema_version"],
656 salt
=k8scluster
["_id"],
659 return yaml
.safe_dump(k8scluster
.get("credentials"))
661 def _get_credential_name(self
, cluster_uuid
: str) -> str:
663 Get credential name for a k8s cloud
665 We cannot use the cluster_uuid for the credential name directly,
666 because it cannot start with a number, it must start with a letter.
667 Therefore, the k8s cloud credential name will be "cred-" followed
670 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
672 :return: Name to use for the credential name.
674 return "cred-{}".format(cluster_uuid
)
680 """Get the namespace UUID
681 Gets the namespace's unique name
683 :param cluster_uuid str: The UUID of the cluster
684 :returns: The namespace UUID, or raises an exception
688 def _create_cluster_role(
692 labels
: Dict
[str, str],
694 cluster_roles
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role(
695 field_selector
="metadata.name={}".format(name
)
698 if len(cluster_roles
.items
) > 0:
700 "Cluster role with metadata.name={} already exists".format(name
)
703 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
705 cluster_role
= V1ClusterRole(
708 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
709 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
713 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
715 def _delete_cluster_role(self
, kubectl
: Kubectl
, name
: str):
716 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
718 def _create_service_account(
722 labels
: Dict
[str, str],
724 service_accounts
= kubectl
.clients
[CORE_CLIENT
].list_namespaced_service_account(
725 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
727 if len(service_accounts
.items
) > 0:
729 "Service account with metadata.name={} already exists".format(name
)
732 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=ADMIN_NAMESPACE
)
733 service_account
= V1ServiceAccount(metadata
=metadata
)
735 kubectl
.clients
[CORE_CLIENT
].create_namespaced_service_account(
736 ADMIN_NAMESPACE
, service_account
739 def _delete_service_account(self
, kubectl
: Kubectl
, name
: str):
740 kubectl
.clients
[CORE_CLIENT
].delete_namespaced_service_account(
741 name
, ADMIN_NAMESPACE
744 def _create_cluster_role_binding(
748 labels
: Dict
[str, str],
750 role_bindings
= kubectl
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
751 field_selector
="metadata.name={}".format(name
)
753 if len(role_bindings
.items
) > 0:
754 raise Exception("Generated rbac id already exists")
756 role_binding
= V1ClusterRoleBinding(
757 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
758 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
760 V1Subject(kind
="ServiceAccount", name
=name
, namespace
=ADMIN_NAMESPACE
)
763 kubectl
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
765 def _delete_cluster_role_binding(self
, kubectl
: Kubectl
, name
: str):
766 kubectl
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
768 async def _get_secret_data(self
, kubectl
: Kubectl
, name
: str) -> (str, str):
769 v1_core
= kubectl
.clients
[CORE_CLIENT
]
775 service_accounts
= v1_core
.list_namespaced_service_account(
776 ADMIN_NAMESPACE
, field_selector
="metadata.name={}".format(name
)
778 if len(service_accounts
.items
) == 0:
780 "Service account not found with metadata.name={}".format(name
)
782 service_account
= service_accounts
.items
[0]
783 if service_account
.secrets
and len(service_account
.secrets
) > 0:
784 secret_name
= service_account
.secrets
[0].name
785 if secret_name
is not None or not retries_limit
:
789 "Failed getting the secret from service account {}".format(name
)
791 secret
= v1_core
.list_namespaced_secret(
793 field_selector
="metadata.name={}".format(secret_name
),
796 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
797 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
800 base64
.b64decode(token
).decode("utf-8"),
801 base64
.b64decode(client_certificate_data
).decode("utf-8"),
805 def generate_kdu_instance_name(**kwargs
):
806 db_dict
= kwargs
.get("db_dict")
807 kdu_name
= kwargs
.get("kdu_name", None)
809 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
811 kdu_instance
= db_dict
["filter"]["_id"]