1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from typing
import Union
23 from n2vc
.config
import EnvironConfig
24 from n2vc
.definitions
import RelationEndpoint
25 from n2vc
.exceptions
import K8sException
26 from n2vc
.k8s_conn
import K8sConnector
27 from n2vc
.kubectl
import Kubectl
28 from .exceptions
import MethodNotImplemented
29 from n2vc
.libjuju
import Libjuju
30 from n2vc
.utils
import obj_to_dict
, obj_to_yaml
31 from n2vc
.store
import MotorStore
32 from n2vc
.vca
.cloud
import Cloud
33 from n2vc
.vca
.connection
import get_connection
36 RBAC_LABEL_KEY_NAME
= "rbac-id"
37 RBAC_STACK_PREFIX
= "juju-credential"
40 def generate_rbac_id():
41 return binascii
.hexlify(os
.urandom(4)).decode()
44 class K8sJujuConnector(K8sConnector
):
51 kubectl_command
: str = "/usr/bin/kubectl",
52 juju_command
: str = "/usr/bin/juju",
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
63 :param: loop: Asyncio loop
67 K8sConnector
.__init
__(
71 on_update_db
=on_update_db
,
75 self
.loop
= loop
or asyncio
.get_event_loop()
76 self
.log
.debug("Initializing K8S Juju connector")
78 db_uri
= EnvironConfig(prefixes
=["OSMLCM_", "OSMMON_"]).get("database_uri")
79 self
._store
= MotorStore(db_uri
)
80 self
.loading_libjuju
= asyncio
.Lock(loop
=self
.loop
)
82 self
.log
.debug("K8S Juju connector initialized")
83 # TODO: Remove these commented lines:
84 # self.authenticated = False
86 # self.juju_secret = ""
93 namespace
: str = "kube-system",
94 reuse_cluster_uuid
: str = None,
98 It prepares a given K8s cluster environment to run Juju bundles.
100 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
102 :param namespace: optional namespace to be used for juju. By default,
103 'kube-system' will be used
104 :param reuse_cluster_uuid: existing cluster uuid for reuse
105 :param: kwargs: Additional parameters
108 :return: uuid of the K8s cluster and True if connector has installed some
109 software in the cluster
110 (on error, an exception will be raised)
112 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
114 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
115 kubectl
= self
._get
_kubectl
(k8s_creds
)
117 # CREATING RESOURCES IN K8S
118 rbac_id
= generate_rbac_id()
119 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
120 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
122 # Create cleanup dictionary to clean up created resources
123 # if it fails in the middle of the process
126 kubectl
.create_cluster_role(
132 "delete": kubectl
.delete_cluster_role
,
133 "args": (metadata_name
,),
137 kubectl
.create_service_account(
143 "delete": kubectl
.delete_service_account
,
144 "args": (metadata_name
,),
148 kubectl
.create_cluster_role_binding(
154 "delete": kubectl
.delete_service_account
,
155 "args": (metadata_name
,),
158 token
, client_cert_data
= await kubectl
.get_secret_data(
162 default_storage_class
= kubectl
.get_default_storage_class()
163 await libjuju
.add_k8s(
167 client_cert_data
=client_cert_data
,
168 configuration
=kubectl
.configuration
,
169 storage_class
=default_storage_class
,
170 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
172 return cluster_uuid
, True
173 except Exception as e
:
174 self
.log
.error("Error initializing k8scluster: {}".format(e
))
175 if len(cleanup_data
) > 0:
176 self
.log
.debug("Cleaning up created resources in k8s cluster...")
177 for item
in cleanup_data
:
178 delete_function
= item
["delete"]
179 delete_args
= item
["args"]
180 delete_function(*delete_args
)
181 self
.log
.debug("Cleanup finished")
184 """Repo Management"""
190 _type
: str = "charm",
192 raise MethodNotImplemented()
194 async def repo_list(self
):
195 raise MethodNotImplemented()
197 async def repo_remove(
201 raise MethodNotImplemented()
203 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
205 Returns None as currently add_repo is not implemented
215 uninstall_sw
: bool = False,
220 Resets the Kubernetes cluster by removing the model that represents it.
222 :param cluster_uuid str: The UUID of the cluster to reset
223 :param force: Force reset
224 :param uninstall_sw: Boolean to uninstall sw
225 :param: kwargs: Additional parameters
228 :return: Returns True if successful or raises an exception.
232 self
.log
.debug("[reset] Removing k8s cloud")
233 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
235 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
237 cloud_creds
= await libjuju
.get_cloud_credentials(cloud
)
239 await libjuju
.remove_cloud(cluster_uuid
)
241 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
243 kubectl
= self
._get
_kubectl
(credentials
)
246 kubectl
.delete_cluster_role_binding
,
247 kubectl
.delete_service_account
,
248 kubectl
.delete_cluster_role
,
251 credential_attrs
= cloud_creds
[0].result
["attrs"]
252 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
253 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
254 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
255 for delete_func
in delete_functions
:
257 delete_func(metadata_name
)
258 except Exception as e
:
259 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
261 except Exception as e
:
262 self
.log
.debug("Caught exception during reset: {}".format(e
))
274 timeout
: float = 1800,
276 db_dict
: dict = None,
277 kdu_name
: str = None,
278 namespace
: str = None,
283 :param cluster_uuid str: The UUID of the cluster to install to
284 :param kdu_model str: The name or path of a bundle to install
285 :param kdu_instance: Kdu instance name
286 :param atomic bool: If set, waits until the model is active and resets
287 the cluster on failure.
288 :param timeout int: The time, in seconds, to wait for the install
290 :param params dict: Key-value pairs of instantiation parameters
291 :param kdu_name: Name of the KDU instance to be installed
292 :param namespace: K8s namespace to use for the KDU instance
293 :param kwargs: Additional parameters
296 :return: If successful, returns ?
298 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
302 raise K8sException("db_dict must be set")
304 raise K8sException("bundle must be set")
306 if bundle
.startswith("cs:"):
307 # For Juju Bundles provided by the Charm Store
309 elif bundle
.startswith("ch:"):
310 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
312 elif bundle
.startswith("http"):
316 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
317 os
.chdir(new_workdir
)
318 bundle
= "local:{}".format(kdu_model
)
320 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
322 # Create the new model
323 self
.log
.debug("Adding model: {}".format(kdu_instance
))
324 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
325 await libjuju
.add_model(kdu_instance
, cloud
)
328 # TODO: Instantiation parameters
331 "Juju bundle that models the KDU, in any of the following ways:
332 - <juju-repo>/<juju-bundle>
333 - <juju-bundle folder under k8s_models folder in the package>
334 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
336 - <URL_where_to_fetch_juju_bundle>
339 previous_workdir
= os
.getcwd()
340 except FileNotFoundError
:
341 previous_workdir
= "/app/storage"
343 self
.log
.debug("[install] deploying {}".format(bundle
))
344 await libjuju
.deploy(
345 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
347 os
.chdir(previous_workdir
)
348 if self
.on_update_db
:
349 await self
.on_update_db(
352 filter=db_dict
["filter"],
353 vca_id
=kwargs
.get("vca_id"),
362 total_timeout
: float = 1800,
365 """Scale an application in a model
367 :param: kdu_instance str: KDU instance name
368 :param: scale int: Scale to which to set this application
369 :param: resource_name str: Resource name (Application name)
370 :param: timeout float: The time, in seconds, to wait for the install
372 :param kwargs: Additional parameters
375 :return: If successful, returns True
379 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
380 await libjuju
.scale_application(
381 model_name
=kdu_instance
,
382 application_name
=resource_name
,
384 total_timeout
=total_timeout
,
386 except Exception as e
:
387 error_msg
= "Error scaling application {} in kdu instance {}: {}".format(
388 resource_name
, kdu_instance
, e
390 self
.log
.error(error_msg
)
391 raise K8sException(message
=error_msg
)
394 async def get_scale_count(
400 """Get an application scale count
402 :param: resource_name str: Resource name (Application name)
403 :param: kdu_instance str: KDU instance name
404 :param kwargs: Additional parameters
406 :return: Return application instance count
409 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
410 status
= await libjuju
.get_model_status(kdu_instance
)
411 return len(status
.applications
[resource_name
].units
)
412 except Exception as e
:
413 error_msg
= "Error getting scale count from application {} in kdu instance {}: {}".format(
414 resource_name
, kdu_instance
, e
416 self
.log
.error(error_msg
)
417 raise K8sException(message
=error_msg
)
419 async def instances_list(self
, cluster_uuid
: str) -> list:
421 returns a list of deployed releases in a cluster
423 :param cluster_uuid: the cluster
432 kdu_model
: str = None,
437 :param cluster_uuid str: The UUID of the cluster to upgrade
438 :param kdu_instance str: The unique name of the KDU instance
439 :param kdu_model str: The name or path of the bundle to upgrade to
440 :param params dict: Key-value pairs of instantiation parameters
442 :return: If successful, reference to the new revision number of the
446 # TODO: Loop through the bundle and upgrade each charm individually
449 The API doesn't have a concept of bundle upgrades, because there are
450 many possible changes: charm revision, disk, number of units, etc.
452 As such, we are only supporting a limited subset of upgrades. We'll
453 upgrade the charm revision but leave storage and scale untouched.
455 Scale changes should happen through OSM constructs, and changes to
456 storage would require a redeployment of the service, at least in this
459 raise MethodNotImplemented()
471 :param cluster_uuid str: The UUID of the cluster to rollback
472 :param kdu_instance str: The unique name of the KDU instance
473 :param revision int: The revision to revert to. If omitted, rolls back
474 the previous upgrade.
476 :return: If successful, returns the revision of active KDU instance,
477 or raises an exception
479 raise MethodNotImplemented()
489 """Uninstall a KDU instance
491 :param cluster_uuid str: The UUID of the cluster
492 :param kdu_instance str: The unique name of the KDU instance
493 :param kwargs: Additional parameters
496 :return: Returns True if successful, or raises an exception
499 self
.log
.debug("[uninstall] Destroying model")
500 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
502 await libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
504 # self.log.debug("[uninstall] Model destroyed and disconnecting")
505 # await controller.disconnect()
508 # TODO: Remove these commented lines
509 # if not self.authenticated:
510 # self.log.debug("[uninstall] Connecting to controller")
511 # await self.login(cluster_uuid)
513 async def exec_primitive(
515 cluster_uuid
: str = None,
516 kdu_instance
: str = None,
517 primitive_name
: str = None,
518 timeout
: float = 300,
520 db_dict
: dict = None,
523 """Exec primitive (Juju action)
525 :param cluster_uuid str: The UUID of the cluster
526 :param kdu_instance str: The unique name of the KDU instance
527 :param primitive_name: Name of action that will be executed
528 :param timeout: Timeout for action execution
529 :param params: Dictionary of all the parameters needed for the action
530 :param db_dict: Dictionary for any additional data
531 :param kwargs: Additional parameters
534 :return: Returns the output of the action
536 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
538 if not params
or "application-name" not in params
:
540 "Missing application-name argument, \
541 argument needed for K8s actions"
545 "[exec_primitive] Getting model "
546 "kdu_instance: {}".format(kdu_instance
)
548 application_name
= params
["application-name"]
549 actions
= await libjuju
.get_actions(application_name
, kdu_instance
)
550 if primitive_name
not in actions
:
551 raise K8sException("Primitive {} not found".format(primitive_name
))
552 output
, status
= await libjuju
.execute_action(
553 application_name
, kdu_instance
, primitive_name
, **params
556 if status
!= "completed":
558 "status is not completed: {} output: {}".format(status
, output
)
560 if self
.on_update_db
:
561 await self
.on_update_db(
562 cluster_uuid
, kdu_instance
, filter=db_dict
["filter"]
567 except Exception as e
:
568 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
569 self
.log
.error(error_msg
)
570 raise K8sException(message
=error_msg
)
574 async def inspect_kdu(
580 Inspects a bundle and returns a dictionary of config parameters and
581 their default values.
583 :param kdu_model str: The name or path of the bundle to inspect.
585 :return: If successful, returns a dictionary of available parameters
586 and their default values.
590 if not os
.path
.exists(kdu_model
):
591 raise K8sException("file {} not found".format(kdu_model
))
593 with
open(kdu_model
, "r") as f
:
594 bundle
= yaml
.safe_load(f
.read())
598 'description': 'Test bundle',
599 'bundle': 'kubernetes',
602 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
605 'password': 'manopw',
606 'root_password': 'osm4u',
609 'series': 'kubernetes'
614 # TODO: This should be returned in an agreed-upon format
615 kdu
= bundle
["applications"]
625 If available, returns the README of the bundle.
627 :param kdu_model str: The name or path of a bundle
629 :return: If found, returns the contents of the README.
633 files
= ["README", "README.txt", "README.md"]
634 path
= os
.path
.dirname(kdu_model
)
635 for file in os
.listdir(path
):
637 with
open(file, "r") as f
:
643 async def status_kdu(
647 complete_status
: bool = False,
648 yaml_format
: bool = False,
650 ) -> Union
[str, dict]:
651 """Get the status of the KDU
653 Get the current status of the KDU instance.
655 :param cluster_uuid str: The UUID of the cluster
656 :param kdu_instance str: The unique id of the KDU instance
657 :param complete_status: To get the complete_status of the KDU
658 :param yaml_format: To get the status in proper format for NSR record
659 :param: kwargs: Additional parameters
662 :return: Returns a dictionary containing namespace, state, resources,
663 and deployment_time and returns complete_status if complete_status is True
665 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
668 model_status
= await libjuju
.get_model_status(kdu_instance
)
670 if not complete_status
:
671 for name
in model_status
.applications
:
672 application
= model_status
.applications
[name
]
673 status
[name
] = {"status": application
["status"]["status"]}
676 return obj_to_yaml(model_status
)
678 return obj_to_dict(model_status
)
682 async def add_relation(
684 provider
: RelationEndpoint
,
685 requirer
: RelationEndpoint
,
688 Add relation between two charmed endpoints
690 :param: provider: Provider relation endpoint
691 :param: requirer: Requirer relation endpoint
693 self
.log
.debug(f
"adding new relation between {provider} and {requirer}")
694 cross_model_relation
= (
695 provider
.model_name
!= requirer
.model_name
696 or requirer
.vca_id
!= requirer
.vca_id
699 if cross_model_relation
:
700 # Cross-model relation
701 provider_libjuju
= await self
._get
_libjuju
(provider
.vca_id
)
702 requirer_libjuju
= await self
._get
_libjuju
(requirer
.vca_id
)
703 offer
= await provider_libjuju
.offer(provider
)
705 saas_name
= await requirer_libjuju
.consume(
706 requirer
.model_name
, offer
, provider_libjuju
708 await requirer_libjuju
.add_relation(
715 vca_id
= provider
.vca_id
716 model
= provider
.model_name
717 libjuju
= await self
._get
_libjuju
(vca_id
)
718 # add juju relations between two applications
719 await libjuju
.add_relation(
721 endpoint_1
=provider
.endpoint
,
722 endpoint_2
=requirer
.endpoint
,
724 except Exception as e
:
725 message
= f
"Error adding relation between {provider} and {requirer}: {e}"
726 self
.log
.error(message
)
727 raise Exception(message
=message
)
729 async def update_vca_status(self
, vcastatus
: dict, kdu_instance
: str, **kwargs
):
731 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
733 :param vcastatus dict: dict containing vcastatus
734 :param kdu_instance str: The unique id of the KDU instance
735 :param: kwargs: Additional parameters
740 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
742 for model_name
in vcastatus
:
743 # Adding executed actions
744 vcastatus
[model_name
][
746 ] = await libjuju
.get_executed_actions(kdu_instance
)
748 for application
in vcastatus
[model_name
]["applications"]:
749 # Adding application actions
750 vcastatus
[model_name
]["applications"][application
][
752 ] = await libjuju
.get_actions(application
, kdu_instance
)
753 # Adding application configs
754 vcastatus
[model_name
]["applications"][application
][
756 ] = await libjuju
.get_application_configs(kdu_instance
, application
)
758 except Exception as e
:
759 self
.log
.debug("Error in updating vca status: {}".format(str(e
)))
761 async def get_services(
762 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
764 """Return a list of services of a kdu_instance"""
766 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
767 kubectl
= self
._get
_kubectl
(credentials
)
768 return kubectl
.get_services(
769 field_selector
="metadata.namespace={}".format(kdu_instance
)
772 async def get_service(
773 self
, cluster_uuid
: str, service_name
: str, namespace
: str
775 """Return data for a specific service inside a namespace"""
777 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
778 kubectl
= self
._get
_kubectl
(credentials
)
779 return kubectl
.get_services(
780 field_selector
="metadata.name={},metadata.namespace={}".format(
781 service_name
, namespace
785 def get_credentials(self
, cluster_uuid
: str) -> str:
787 Get Cluster Kubeconfig
789 k8scluster
= self
.db
.get_one(
790 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
793 self
.db
.encrypt_decrypt_fields(
794 k8scluster
.get("credentials"),
796 ["password", "secret"],
797 schema_version
=k8scluster
["schema_version"],
798 salt
=k8scluster
["_id"],
801 return yaml
.safe_dump(k8scluster
.get("credentials"))
803 def _get_credential_name(self
, cluster_uuid
: str) -> str:
805 Get credential name for a k8s cloud
807 We cannot use the cluster_uuid for the credential name directly,
808 because it cannot start with a number, it must start with a letter.
809 Therefore, the k8s cloud credential name will be "cred-" followed
812 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
814 :return: Name to use for the credential name.
816 return "cred-{}".format(cluster_uuid
)
822 """Get the namespace UUID
823 Gets the namespace's unique name
825 :param cluster_uuid str: The UUID of the cluster
826 :returns: The namespace UUID, or raises an exception
831 def generate_kdu_instance_name(**kwargs
):
832 db_dict
= kwargs
.get("db_dict")
833 kdu_name
= kwargs
.get("kdu_name", None)
835 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
837 kdu_instance
= db_dict
["filter"]["_id"]
840 async def _get_libjuju(self
, vca_id
: str = None) -> Libjuju
:
844 :param: vca_id: VCA ID
845 If None, get a libjuju object with a Connection to the default VCA
846 Else, geta libjuju object with a Connection to the specified VCA
849 while self
.loading_libjuju
.locked():
850 await asyncio
.sleep(0.1)
852 async with self
.loading_libjuju
:
853 vca_connection
= await get_connection(self
._store
)
854 self
.libjuju
= Libjuju(vca_connection
, loop
=self
.loop
, log
=self
.log
)
857 vca_connection
= await get_connection(self
._store
, vca_id
)
865 def _get_kubectl(self
, credentials
: str) -> Kubectl
:
869 :param: kubeconfig_credentials: Kubeconfig credentials
871 kubecfg
= tempfile
.NamedTemporaryFile()
872 with
open(kubecfg
.name
, "w") as kubecfg_file
:
873 kubecfg_file
.write(credentials
)
874 return Kubectl(config_file
=kubecfg
.name
)