1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from typing
import Union
23 from n2vc
.config
import EnvironConfig
24 from n2vc
.definitions
import RelationEndpoint
25 from n2vc
.exceptions
import K8sException
26 from n2vc
.k8s_conn
import K8sConnector
27 from n2vc
.kubectl
import Kubectl
28 from .exceptions
import MethodNotImplemented
29 from n2vc
.libjuju
import Libjuju
30 from n2vc
.utils
import obj_to_dict
, obj_to_yaml
31 from n2vc
.store
import MotorStore
32 from n2vc
.vca
.cloud
import Cloud
33 from n2vc
.vca
.connection
import get_connection
36 RBAC_LABEL_KEY_NAME
= "rbac-id"
37 RBAC_STACK_PREFIX
= "juju-credential"
40 def generate_rbac_id():
41 return binascii
.hexlify(os
.urandom(4)).decode()
44 class K8sJujuConnector(K8sConnector
):
51 kubectl_command
: str = "/usr/bin/kubectl",
52 juju_command
: str = "/usr/bin/juju",
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
63 :param: loop: Asyncio loop
67 K8sConnector
.__init
__(
71 on_update_db
=on_update_db
,
75 self
.loop
= loop
or asyncio
.get_event_loop()
76 self
.log
.debug("Initializing K8S Juju connector")
78 db_uri
= EnvironConfig(prefixes
=["OSMLCM_", "OSMMON_"]).get("database_uri")
79 self
._store
= MotorStore(db_uri
)
80 self
.loading_libjuju
= asyncio
.Lock(loop
=self
.loop
)
82 self
.log
.debug("K8S Juju connector initialized")
83 # TODO: Remove these commented lines:
84 # self.authenticated = False
86 # self.juju_secret = ""
93 namespace
: str = "kube-system",
94 reuse_cluster_uuid
: str = None,
98 It prepares a given K8s cluster environment to run Juju bundles.
100 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
102 :param namespace: optional namespace to be used for juju. By default,
103 'kube-system' will be used
104 :param reuse_cluster_uuid: existing cluster uuid for reuse
105 :param: kwargs: Additional parameters
108 :return: uuid of the K8s cluster and True if connector has installed some
109 software in the cluster
110 (on error, an exception will be raised)
112 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
114 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
115 kubectl
= self
._get
_kubectl
(k8s_creds
)
117 # CREATING RESOURCES IN K8S
118 rbac_id
= generate_rbac_id()
119 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
120 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
122 # Create cleanup dictionary to clean up created resources
123 # if it fails in the middle of the process
126 self
.log
.debug("Initializing K8s cluster for juju")
127 kubectl
.create_cluster_role(
131 self
.log
.debug("Cluster role created")
134 "delete": kubectl
.delete_cluster_role
,
135 "args": (metadata_name
,),
139 kubectl
.create_service_account(
143 self
.log
.debug("Service account created")
146 "delete": kubectl
.delete_service_account
,
147 "args": (metadata_name
,),
151 kubectl
.create_cluster_role_binding(
155 self
.log
.debug("Role binding created")
158 "delete": kubectl
.delete_service_account
,
159 "args": (metadata_name
,),
162 token
, client_cert_data
= await kubectl
.get_secret_data(
166 default_storage_class
= kubectl
.get_default_storage_class()
167 self
.log
.debug("Default storage class: {}".format(default_storage_class
))
168 await libjuju
.add_k8s(
172 client_cert_data
=client_cert_data
,
173 configuration
=kubectl
.configuration
,
174 storage_class
=default_storage_class
,
175 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
177 self
.log
.debug("K8s cluster added to juju controller")
178 return cluster_uuid
, True
179 except Exception as e
:
180 self
.log
.error("Error initializing k8scluster: {}".format(e
), exc_info
=True)
181 if len(cleanup_data
) > 0:
182 self
.log
.debug("Cleaning up created resources in k8s cluster...")
183 for item
in cleanup_data
:
184 delete_function
= item
["delete"]
185 delete_args
= item
["args"]
186 delete_function(*delete_args
)
187 self
.log
.debug("Cleanup finished")
190 """Repo Management"""
196 _type
: str = "charm",
199 password
: str = None,
201 raise MethodNotImplemented()
203 async def repo_list(self
):
204 raise MethodNotImplemented()
206 async def repo_remove(
210 raise MethodNotImplemented()
212 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
214 Returns None as currently add_repo is not implemented
224 uninstall_sw
: bool = False,
229 Resets the Kubernetes cluster by removing the model that represents it.
231 :param cluster_uuid str: The UUID of the cluster to reset
232 :param force: Force reset
233 :param uninstall_sw: Boolean to uninstall sw
234 :param: kwargs: Additional parameters
237 :return: Returns True if successful or raises an exception.
241 self
.log
.debug("[reset] Removing k8s cloud")
242 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
244 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
246 cloud_creds
= await libjuju
.get_cloud_credentials(cloud
)
248 await libjuju
.remove_cloud(cluster_uuid
)
250 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
252 kubectl
= self
._get
_kubectl
(credentials
)
255 kubectl
.delete_cluster_role_binding
,
256 kubectl
.delete_service_account
,
257 kubectl
.delete_cluster_role
,
260 credential_attrs
= cloud_creds
[0].result
["attrs"]
261 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
262 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
263 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
264 for delete_func
in delete_functions
:
266 delete_func(metadata_name
)
267 except Exception as e
:
268 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
270 except Exception as e
:
271 self
.log
.debug("Caught exception during reset: {}".format(e
))
283 timeout
: float = 1800,
285 db_dict
: dict = None,
286 kdu_name
: str = None,
287 namespace
: str = None,
292 :param cluster_uuid str: The UUID of the cluster to install to
293 :param kdu_model str: The name or path of a bundle to install
294 :param kdu_instance: Kdu instance name
295 :param atomic bool: If set, waits until the model is active and resets
296 the cluster on failure.
297 :param timeout int: The time, in seconds, to wait for the install
299 :param params dict: Key-value pairs of instantiation parameters
300 :param kdu_name: Name of the KDU instance to be installed
301 :param namespace: K8s namespace to use for the KDU instance
302 :param kwargs: Additional parameters
305 :return: If successful, returns ?
307 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
311 raise K8sException("db_dict must be set")
313 raise K8sException("bundle must be set")
315 if bundle
.startswith("cs:"):
316 # For Juju Bundles provided by the Charm Store
318 elif bundle
.startswith("ch:"):
319 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
321 elif bundle
.startswith("http"):
325 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
326 os
.chdir(new_workdir
)
327 bundle
= "local:{}".format(kdu_model
)
329 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
331 # Create the new model
332 self
.log
.debug("Adding model: {}".format(kdu_instance
))
333 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
334 await libjuju
.add_model(kdu_instance
, cloud
)
337 # TODO: Instantiation parameters
340 "Juju bundle that models the KDU, in any of the following ways:
341 - <juju-repo>/<juju-bundle>
342 - <juju-bundle folder under k8s_models folder in the package>
343 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
345 - <URL_where_to_fetch_juju_bundle>
348 previous_workdir
= os
.getcwd()
349 except FileNotFoundError
:
350 previous_workdir
= "/app/storage"
352 self
.log
.debug("[install] deploying {}".format(bundle
))
353 await libjuju
.deploy(
354 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
356 os
.chdir(previous_workdir
)
357 if self
.on_update_db
:
358 await self
.on_update_db(
361 filter=db_dict
["filter"],
362 vca_id
=kwargs
.get("vca_id"),
371 total_timeout
: float = 1800,
374 """Scale an application in a model
376 :param: kdu_instance str: KDU instance name
377 :param: scale int: Scale to which to set the application
378 :param: resource_name str: The application name in the Juju Bundle
379 :param: timeout float: The time, in seconds, to wait for the install
381 :param kwargs: Additional parameters
384 :return: If successful, returns True
388 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
389 await libjuju
.scale_application(
390 model_name
=kdu_instance
,
391 application_name
=resource_name
,
393 total_timeout
=total_timeout
,
395 except Exception as e
:
396 error_msg
= "Error scaling application {} in kdu instance {}: {}".format(
397 resource_name
, kdu_instance
, e
399 self
.log
.error(error_msg
)
400 raise K8sException(message
=error_msg
)
403 async def get_scale_count(
409 """Get an application scale count
411 :param: resource_name str: The application name in the Juju Bundle
412 :param: kdu_instance str: KDU instance name
413 :param kwargs: Additional parameters
415 :return: Return application instance count
419 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
420 status
= await libjuju
.get_model_status(kdu_instance
)
421 return len(status
.applications
[resource_name
].units
)
422 except Exception as e
:
423 error_msg
= "Error getting scale count from application {} in kdu instance {}: {}".format(
424 resource_name
, kdu_instance
, e
426 self
.log
.error(error_msg
)
427 raise K8sException(message
=error_msg
)
429 async def instances_list(self
, cluster_uuid
: str) -> list:
431 returns a list of deployed releases in a cluster
433 :param cluster_uuid: the cluster
442 kdu_model
: str = None,
447 :param cluster_uuid str: The UUID of the cluster to upgrade
448 :param kdu_instance str: The unique name of the KDU instance
449 :param kdu_model str: The name or path of the bundle to upgrade to
450 :param params dict: Key-value pairs of instantiation parameters
452 :return: If successful, reference to the new revision number of the
456 # TODO: Loop through the bundle and upgrade each charm individually
459 The API doesn't have a concept of bundle upgrades, because there are
460 many possible changes: charm revision, disk, number of units, etc.
462 As such, we are only supporting a limited subset of upgrades. We'll
463 upgrade the charm revision but leave storage and scale untouched.
465 Scale changes should happen through OSM constructs, and changes to
466 storage would require a redeployment of the service, at least in this
469 raise MethodNotImplemented()
481 :param cluster_uuid str: The UUID of the cluster to rollback
482 :param kdu_instance str: The unique name of the KDU instance
483 :param revision int: The revision to revert to. If omitted, rolls back
484 the previous upgrade.
486 :return: If successful, returns the revision of active KDU instance,
487 or raises an exception
489 raise MethodNotImplemented()
499 """Uninstall a KDU instance
501 :param cluster_uuid str: The UUID of the cluster
502 :param kdu_instance str: The unique name of the KDU instance
503 :param kwargs: Additional parameters
506 :return: Returns True if successful, or raises an exception
509 self
.log
.debug("[uninstall] Destroying model")
510 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
512 await libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
514 # self.log.debug("[uninstall] Model destroyed and disconnecting")
515 # await controller.disconnect()
518 # TODO: Remove these commented lines
519 # if not self.authenticated:
520 # self.log.debug("[uninstall] Connecting to controller")
521 # await self.login(cluster_uuid)
523 async def exec_primitive(
525 cluster_uuid
: str = None,
526 kdu_instance
: str = None,
527 primitive_name
: str = None,
528 timeout
: float = 300,
530 db_dict
: dict = None,
533 """Exec primitive (Juju action)
535 :param cluster_uuid str: The UUID of the cluster
536 :param kdu_instance str: The unique name of the KDU instance
537 :param primitive_name: Name of action that will be executed
538 :param timeout: Timeout for action execution
539 :param params: Dictionary of all the parameters needed for the action
540 :param db_dict: Dictionary for any additional data
541 :param kwargs: Additional parameters
544 :return: Returns the output of the action
546 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
548 if not params
or "application-name" not in params
:
550 "Missing application-name argument, \
551 argument needed for K8s actions"
555 "[exec_primitive] Getting model "
556 "kdu_instance: {}".format(kdu_instance
)
558 application_name
= params
["application-name"]
559 actions
= await libjuju
.get_actions(application_name
, kdu_instance
)
560 if primitive_name
not in actions
:
561 raise K8sException("Primitive {} not found".format(primitive_name
))
562 output
, status
= await libjuju
.execute_action(
563 application_name
, kdu_instance
, primitive_name
, **params
566 if status
!= "completed":
568 "status is not completed: {} output: {}".format(status
, output
)
570 if self
.on_update_db
:
571 await self
.on_update_db(
572 cluster_uuid
, kdu_instance
, filter=db_dict
["filter"]
577 except Exception as e
:
578 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
579 self
.log
.error(error_msg
)
580 raise K8sException(message
=error_msg
)
584 async def inspect_kdu(
590 Inspects a bundle and returns a dictionary of config parameters and
591 their default values.
593 :param kdu_model str: The name or path of the bundle to inspect.
595 :return: If successful, returns a dictionary of available parameters
596 and their default values.
600 if not os
.path
.exists(kdu_model
):
601 raise K8sException("file {} not found".format(kdu_model
))
603 with
open(kdu_model
, "r") as f
:
604 bundle
= yaml
.safe_load(f
.read())
608 'description': 'Test bundle',
609 'bundle': 'kubernetes',
612 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
615 'password': 'manopw',
616 'root_password': 'osm4u',
619 'series': 'kubernetes'
624 # TODO: This should be returned in an agreed-upon format
625 kdu
= bundle
["applications"]
635 If available, returns the README of the bundle.
637 :param kdu_model str: The name or path of a bundle
639 :return: If found, returns the contents of the README.
643 files
= ["README", "README.txt", "README.md"]
644 path
= os
.path
.dirname(kdu_model
)
645 for file in os
.listdir(path
):
647 with
open(file, "r") as f
:
653 async def status_kdu(
657 complete_status
: bool = False,
658 yaml_format
: bool = False,
660 ) -> Union
[str, dict]:
661 """Get the status of the KDU
663 Get the current status of the KDU instance.
665 :param cluster_uuid str: The UUID of the cluster
666 :param kdu_instance str: The unique id of the KDU instance
667 :param complete_status: To get the complete_status of the KDU
668 :param yaml_format: To get the status in proper format for NSR record
669 :param: kwargs: Additional parameters
672 :return: Returns a dictionary containing namespace, state, resources,
673 and deployment_time and returns complete_status if complete_status is True
675 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
678 model_status
= await libjuju
.get_model_status(kdu_instance
)
680 if not complete_status
:
681 for name
in model_status
.applications
:
682 application
= model_status
.applications
[name
]
683 status
[name
] = {"status": application
["status"]["status"]}
686 return obj_to_yaml(model_status
)
688 return obj_to_dict(model_status
)
692 async def add_relation(
694 provider
: RelationEndpoint
,
695 requirer
: RelationEndpoint
,
698 Add relation between two charmed endpoints
700 :param: provider: Provider relation endpoint
701 :param: requirer: Requirer relation endpoint
703 self
.log
.debug(f
"adding new relation between {provider} and {requirer}")
704 cross_model_relation
= (
705 provider
.model_name
!= requirer
.model_name
706 or requirer
.vca_id
!= requirer
.vca_id
709 if cross_model_relation
:
710 # Cross-model relation
711 provider_libjuju
= await self
._get
_libjuju
(provider
.vca_id
)
712 requirer_libjuju
= await self
._get
_libjuju
(requirer
.vca_id
)
713 offer
= await provider_libjuju
.offer(provider
)
715 saas_name
= await requirer_libjuju
.consume(
716 requirer
.model_name
, offer
, provider_libjuju
718 await requirer_libjuju
.add_relation(
725 vca_id
= provider
.vca_id
726 model
= provider
.model_name
727 libjuju
= await self
._get
_libjuju
(vca_id
)
728 # add juju relations between two applications
729 await libjuju
.add_relation(
731 endpoint_1
=provider
.endpoint
,
732 endpoint_2
=requirer
.endpoint
,
734 except Exception as e
:
735 message
= f
"Error adding relation between {provider} and {requirer}: {e}"
736 self
.log
.error(message
)
737 raise Exception(message
=message
)
739 async def update_vca_status(self
, vcastatus
: dict, kdu_instance
: str, **kwargs
):
741 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
743 :param vcastatus dict: dict containing vcastatus
744 :param kdu_instance str: The unique id of the KDU instance
745 :param: kwargs: Additional parameters
750 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
752 for model_name
in vcastatus
:
753 # Adding executed actions
754 vcastatus
[model_name
][
756 ] = await libjuju
.get_executed_actions(kdu_instance
)
758 for application
in vcastatus
[model_name
]["applications"]:
759 # Adding application actions
760 vcastatus
[model_name
]["applications"][application
][
762 ] = await libjuju
.get_actions(application
, kdu_instance
)
763 # Adding application configs
764 vcastatus
[model_name
]["applications"][application
][
766 ] = await libjuju
.get_application_configs(kdu_instance
, application
)
768 except Exception as e
:
769 self
.log
.debug("Error in updating vca status: {}".format(str(e
)))
771 async def get_services(
772 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
774 """Return a list of services of a kdu_instance"""
776 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
777 kubectl
= self
._get
_kubectl
(credentials
)
778 return kubectl
.get_services(
779 field_selector
="metadata.namespace={}".format(kdu_instance
)
782 async def get_service(
783 self
, cluster_uuid
: str, service_name
: str, namespace
: str
785 """Return data for a specific service inside a namespace"""
787 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
788 kubectl
= self
._get
_kubectl
(credentials
)
789 return kubectl
.get_services(
790 field_selector
="metadata.name={},metadata.namespace={}".format(
791 service_name
, namespace
795 def get_credentials(self
, cluster_uuid
: str) -> str:
797 Get Cluster Kubeconfig
799 k8scluster
= self
.db
.get_one(
800 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
803 self
.db
.encrypt_decrypt_fields(
804 k8scluster
.get("credentials"),
806 ["password", "secret"],
807 schema_version
=k8scluster
["schema_version"],
808 salt
=k8scluster
["_id"],
811 return yaml
.safe_dump(k8scluster
.get("credentials"))
813 def _get_credential_name(self
, cluster_uuid
: str) -> str:
815 Get credential name for a k8s cloud
817 We cannot use the cluster_uuid for the credential name directly,
818 because it cannot start with a number, it must start with a letter.
819 Therefore, the k8s cloud credential name will be "cred-" followed
822 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
824 :return: Name to use for the credential name.
826 return "cred-{}".format(cluster_uuid
)
832 """Get the namespace UUID
833 Gets the namespace's unique name
835 :param cluster_uuid str: The UUID of the cluster
836 :returns: The namespace UUID, or raises an exception
841 def generate_kdu_instance_name(**kwargs
):
842 db_dict
= kwargs
.get("db_dict")
843 kdu_name
= kwargs
.get("kdu_name", None)
845 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
847 kdu_instance
= db_dict
["filter"]["_id"]
850 async def _get_libjuju(self
, vca_id
: str = None) -> Libjuju
:
854 :param: vca_id: VCA ID
855 If None, get a libjuju object with a Connection to the default VCA
856 Else, geta libjuju object with a Connection to the specified VCA
859 while self
.loading_libjuju
.locked():
860 await asyncio
.sleep(0.1)
862 async with self
.loading_libjuju
:
863 vca_connection
= await get_connection(self
._store
)
864 self
.libjuju
= Libjuju(vca_connection
, loop
=self
.loop
, log
=self
.log
)
867 vca_connection
= await get_connection(self
._store
, vca_id
)
875 def _get_kubectl(self
, credentials
: str) -> Kubectl
:
879 :param: kubeconfig_credentials: Kubeconfig credentials
881 kubecfg
= tempfile
.NamedTemporaryFile()
882 with
open(kubecfg
.name
, "w") as kubecfg_file
:
883 kubecfg_file
.write(credentials
)
884 return Kubectl(config_file
=kubecfg
.name
)