1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from n2vc
.config
import EnvironConfig
23 from n2vc
.exceptions
import K8sException
24 from n2vc
.k8s_conn
import K8sConnector
25 from n2vc
.kubectl
import Kubectl
26 from .exceptions
import MethodNotImplemented
27 from n2vc
.libjuju
import Libjuju
28 from n2vc
.utils
import obj_to_dict
, obj_to_yaml
29 from n2vc
.store
import MotorStore
30 from n2vc
.vca
.cloud
import Cloud
31 from n2vc
.vca
.connection
import get_connection
34 RBAC_LABEL_KEY_NAME
= "rbac-id"
35 RBAC_STACK_PREFIX
= "juju-credential"
38 def generate_rbac_id():
39 return binascii
.hexlify(os
.urandom(4)).decode()
42 class K8sJujuConnector(K8sConnector
):
49 kubectl_command
: str = "/usr/bin/kubectl",
50 juju_command
: str = "/usr/bin/juju",
56 :param fs: file system for kubernetes and helm configuration
57 :param db: Database object
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
61 :param: loop: Asyncio loop
65 K8sConnector
.__init
__(
69 on_update_db
=on_update_db
,
73 self
.loop
= loop
or asyncio
.get_event_loop()
74 self
.log
.debug("Initializing K8S Juju connector")
76 db_uri
= EnvironConfig(prefixes
=["OSMLCM_", "OSMMON_"]).get("database_uri")
77 self
._store
= MotorStore(db_uri
)
78 self
.loading_libjuju
= asyncio
.Lock(loop
=self
.loop
)
80 self
.log
.debug("K8S Juju connector initialized")
81 # TODO: Remove these commented lines:
82 # self.authenticated = False
84 # self.juju_secret = ""
91 namespace
: str = "kube-system",
92 reuse_cluster_uuid
: str = None,
96 It prepares a given K8s cluster environment to run Juju bundles.
98 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
100 :param namespace: optional namespace to be used for juju. By default,
101 'kube-system' will be used
102 :param reuse_cluster_uuid: existing cluster uuid for reuse
103 :param: kwargs: Additional parameters
106 :return: uuid of the K8s cluster and True if connector has installed some
107 software in the cluster
108 (on error, an exception will be raised)
110 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
112 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
113 kubectl
= self
._get
_kubectl
(k8s_creds
)
115 # CREATING RESOURCES IN K8S
116 rbac_id
= generate_rbac_id()
117 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
118 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
120 # Create cleanup dictionary to clean up created resources
121 # if it fails in the middle of the process
124 kubectl
.create_cluster_role(
130 "delete": kubectl
.delete_cluster_role
,
131 "args": (metadata_name
,),
135 kubectl
.create_service_account(
141 "delete": kubectl
.delete_service_account
,
142 "args": (metadata_name
,),
146 kubectl
.create_cluster_role_binding(
152 "delete": kubectl
.delete_service_account
,
153 "args": (metadata_name
,),
156 token
, client_cert_data
= await kubectl
.get_secret_data(
160 default_storage_class
= kubectl
.get_default_storage_class()
161 await libjuju
.add_k8s(
165 client_cert_data
=client_cert_data
,
166 configuration
=kubectl
.configuration
,
167 storage_class
=default_storage_class
,
168 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
170 return cluster_uuid
, True
171 except Exception as e
:
172 self
.log
.error("Error initializing k8scluster: {}".format(e
))
173 if len(cleanup_data
) > 0:
174 self
.log
.debug("Cleaning up created resources in k8s cluster...")
175 for item
in cleanup_data
:
176 delete_function
= item
["delete"]
177 delete_args
= item
["args"]
178 delete_function(*delete_args
)
179 self
.log
.debug("Cleanup finished")
182 """Repo Management"""
188 _type
: str = "charm",
190 raise MethodNotImplemented()
192 async def repo_list(self
):
193 raise MethodNotImplemented()
195 async def repo_remove(
199 raise MethodNotImplemented()
201 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
203 Returns None as currently add_repo is not implemented
213 uninstall_sw
: bool = False,
218 Resets the Kubernetes cluster by removing the model that represents it.
220 :param cluster_uuid str: The UUID of the cluster to reset
221 :param force: Force reset
222 :param uninstall_sw: Boolean to uninstall sw
223 :param: kwargs: Additional parameters
226 :return: Returns True if successful or raises an exception.
230 self
.log
.debug("[reset] Removing k8s cloud")
231 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
233 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
235 cloud_creds
= await libjuju
.get_cloud_credentials(cloud
)
237 await libjuju
.remove_cloud(cluster_uuid
)
239 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
241 kubectl
= self
._get
_kubectl
(credentials
)
244 kubectl
.delete_cluster_role_binding
,
245 kubectl
.delete_service_account
,
246 kubectl
.delete_cluster_role
,
249 credential_attrs
= cloud_creds
[0].result
["attrs"]
250 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
251 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
252 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
253 for delete_func
in delete_functions
:
255 delete_func(metadata_name
)
256 except Exception as e
:
257 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
259 except Exception as e
:
260 self
.log
.debug("Caught exception during reset: {}".format(e
))
272 timeout
: float = 1800,
274 db_dict
: dict = None,
275 kdu_name
: str = None,
276 namespace
: str = None,
281 :param cluster_uuid str: The UUID of the cluster to install to
282 :param kdu_model str: The name or path of a bundle to install
283 :param kdu_instance: Kdu instance name
284 :param atomic bool: If set, waits until the model is active and resets
285 the cluster on failure.
286 :param timeout int: The time, in seconds, to wait for the install
288 :param params dict: Key-value pairs of instantiation parameters
289 :param kdu_name: Name of the KDU instance to be installed
290 :param namespace: K8s namespace to use for the KDU instance
291 :param kwargs: Additional parameters
294 :return: If successful, returns ?
296 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
300 raise K8sException("db_dict must be set")
302 raise K8sException("bundle must be set")
304 if bundle
.startswith("cs:"):
306 elif bundle
.startswith("http"):
310 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
311 os
.chdir(new_workdir
)
312 bundle
= "local:{}".format(kdu_model
)
314 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
316 # Create the new model
317 self
.log
.debug("Adding model: {}".format(kdu_instance
))
318 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
319 await libjuju
.add_model(kdu_instance
, cloud
)
322 # TODO: Instantiation parameters
325 "Juju bundle that models the KDU, in any of the following ways:
326 - <juju-repo>/<juju-bundle>
327 - <juju-bundle folder under k8s_models folder in the package>
328 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
330 - <URL_where_to_fetch_juju_bundle>
333 previous_workdir
= os
.getcwd()
334 except FileNotFoundError
:
335 previous_workdir
= "/app/storage"
337 self
.log
.debug("[install] deploying {}".format(bundle
))
338 await libjuju
.deploy(
339 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
341 os
.chdir(previous_workdir
)
342 if self
.on_update_db
:
343 await self
.on_update_db(
346 filter=db_dict
["filter"],
347 vca_id
=kwargs
.get("vca_id"),
356 total_timeout
: float = 1800,
359 """Scale an application in a model
361 :param: kdu_instance str: KDU instance name
362 :param: scale int: Scale to which to set this application
363 :param: resource_name str: Resource name (Application name)
364 :param: timeout float: The time, in seconds, to wait for the install
366 :param kwargs: Additional parameters
369 :return: If successful, returns True
373 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
374 await libjuju
.scale_application(
375 model_name
=kdu_instance
,
376 application_name
=resource_name
,
378 total_timeout
=total_timeout
,
380 except Exception as e
:
381 error_msg
= "Error scaling application {} in kdu instance {}: {}".format(
382 resource_name
, kdu_instance
, e
384 self
.log
.error(error_msg
)
385 raise K8sException(message
=error_msg
)
388 async def get_scale_count(
394 """Get an application scale count
396 :param: resource_name str: Resource name (Application name)
397 :param: kdu_instance str: KDU instance name
398 :param kwargs: Additional parameters
400 :return: Return application instance count
403 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
404 status
= await libjuju
.get_model_status(kdu_instance
)
405 return len(status
.applications
[resource_name
].units
)
406 except Exception as e
:
407 error_msg
= "Error getting scale count from application {} in kdu instance {}: {}".format(
408 resource_name
, kdu_instance
, e
410 self
.log
.error(error_msg
)
411 raise K8sException(message
=error_msg
)
413 async def instances_list(self
, cluster_uuid
: str) -> list:
415 returns a list of deployed releases in a cluster
417 :param cluster_uuid: the cluster
426 kdu_model
: str = None,
431 :param cluster_uuid str: The UUID of the cluster to upgrade
432 :param kdu_instance str: The unique name of the KDU instance
433 :param kdu_model str: The name or path of the bundle to upgrade to
434 :param params dict: Key-value pairs of instantiation parameters
436 :return: If successful, reference to the new revision number of the
440 # TODO: Loop through the bundle and upgrade each charm individually
443 The API doesn't have a concept of bundle upgrades, because there are
444 many possible changes: charm revision, disk, number of units, etc.
446 As such, we are only supporting a limited subset of upgrades. We'll
447 upgrade the charm revision but leave storage and scale untouched.
449 Scale changes should happen through OSM constructs, and changes to
450 storage would require a redeployment of the service, at least in this
453 raise MethodNotImplemented()
465 :param cluster_uuid str: The UUID of the cluster to rollback
466 :param kdu_instance str: The unique name of the KDU instance
467 :param revision int: The revision to revert to. If omitted, rolls back
468 the previous upgrade.
470 :return: If successful, returns the revision of active KDU instance,
471 or raises an exception
473 raise MethodNotImplemented()
483 """Uninstall a KDU instance
485 :param cluster_uuid str: The UUID of the cluster
486 :param kdu_instance str: The unique name of the KDU instance
487 :param kwargs: Additional parameters
490 :return: Returns True if successful, or raises an exception
493 self
.log
.debug("[uninstall] Destroying model")
494 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
496 await libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
498 # self.log.debug("[uninstall] Model destroyed and disconnecting")
499 # await controller.disconnect()
502 # TODO: Remove these commented lines
503 # if not self.authenticated:
504 # self.log.debug("[uninstall] Connecting to controller")
505 # await self.login(cluster_uuid)
507 async def exec_primitive(
509 cluster_uuid
: str = None,
510 kdu_instance
: str = None,
511 primitive_name
: str = None,
512 timeout
: float = 300,
514 db_dict
: dict = None,
517 """Exec primitive (Juju action)
519 :param cluster_uuid str: The UUID of the cluster
520 :param kdu_instance str: The unique name of the KDU instance
521 :param primitive_name: Name of action that will be executed
522 :param timeout: Timeout for action execution
523 :param params: Dictionary of all the parameters needed for the action
524 :param db_dict: Dictionary for any additional data
525 :param kwargs: Additional parameters
528 :return: Returns the output of the action
530 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
532 if not params
or "application-name" not in params
:
534 "Missing application-name argument, \
535 argument needed for K8s actions"
539 "[exec_primitive] Getting model "
540 "kdu_instance: {}".format(kdu_instance
)
542 application_name
= params
["application-name"]
543 actions
= await libjuju
.get_actions(application_name
, kdu_instance
)
544 if primitive_name
not in actions
:
545 raise K8sException("Primitive {} not found".format(primitive_name
))
546 output
, status
= await libjuju
.execute_action(
547 application_name
, kdu_instance
, primitive_name
, **params
550 if status
!= "completed":
552 "status is not completed: {} output: {}".format(status
, output
)
554 if self
.on_update_db
:
555 await self
.on_update_db(
556 cluster_uuid
, kdu_instance
, filter=db_dict
["filter"]
561 except Exception as e
:
562 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
563 self
.log
.error(error_msg
)
564 raise K8sException(message
=error_msg
)
568 async def inspect_kdu(
574 Inspects a bundle and returns a dictionary of config parameters and
575 their default values.
577 :param kdu_model str: The name or path of the bundle to inspect.
579 :return: If successful, returns a dictionary of available parameters
580 and their default values.
584 if not os
.path
.exists(kdu_model
):
585 raise K8sException("file {} not found".format(kdu_model
))
587 with
open(kdu_model
, "r") as f
:
588 bundle
= yaml
.safe_load(f
.read())
592 'description': 'Test bundle',
593 'bundle': 'kubernetes',
596 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
599 'password': 'manopw',
600 'root_password': 'osm4u',
603 'series': 'kubernetes'
608 # TODO: This should be returned in an agreed-upon format
609 kdu
= bundle
["applications"]
619 If available, returns the README of the bundle.
621 :param kdu_model str: The name or path of a bundle
623 :return: If found, returns the contents of the README.
627 files
= ["README", "README.txt", "README.md"]
628 path
= os
.path
.dirname(kdu_model
)
629 for file in os
.listdir(path
):
631 with
open(file, "r") as f
:
637 async def status_kdu(
641 complete_status
: bool = False,
642 yaml_format
: bool = False,
645 """Get the status of the KDU
647 Get the current status of the KDU instance.
649 :param cluster_uuid str: The UUID of the cluster
650 :param kdu_instance str: The unique id of the KDU instance
651 :param complete_status: To get the complete_status of the KDU
652 :param yaml_format: To get the status in proper format for NSR record
653 :param: kwargs: Additional parameters
656 :return: Returns a dictionary containing namespace, state, resources,
657 and deployment_time and returns complete_status if complete_status is True
659 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
662 model_status
= await libjuju
.get_model_status(kdu_instance
)
664 if not complete_status
:
665 for name
in model_status
.applications
:
666 application
= model_status
.applications
[name
]
667 status
[name
] = {"status": application
["status"]["status"]}
670 return obj_to_yaml(model_status
)
672 return obj_to_dict(model_status
)
676 async def update_vca_status(self
, vcastatus
: dict, kdu_instance
: str, **kwargs
):
678 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
680 :param vcastatus dict: dict containing vcastatus
681 :param kdu_instance str: The unique id of the KDU instance
682 :param: kwargs: Additional parameters
687 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
689 for model_name
in vcastatus
:
690 # Adding executed actions
691 vcastatus
[model_name
][
693 ] = await libjuju
.get_executed_actions(kdu_instance
)
695 for application
in vcastatus
[model_name
]["applications"]:
696 # Adding application actions
697 vcastatus
[model_name
]["applications"][application
][
699 ] = await libjuju
.get_actions(application
, kdu_instance
)
700 # Adding application configs
701 vcastatus
[model_name
]["applications"][application
][
703 ] = await libjuju
.get_application_configs(kdu_instance
, application
)
705 except Exception as e
:
706 self
.log
.debug("Error in updating vca status: {}".format(str(e
)))
708 async def get_services(
709 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
711 """Return a list of services of a kdu_instance"""
713 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
714 kubectl
= self
._get
_kubectl
(credentials
)
715 return kubectl
.get_services(
716 field_selector
="metadata.namespace={}".format(kdu_instance
)
719 async def get_service(
720 self
, cluster_uuid
: str, service_name
: str, namespace
: str
722 """Return data for a specific service inside a namespace"""
724 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
725 kubectl
= self
._get
_kubectl
(credentials
)
726 return kubectl
.get_services(
727 field_selector
="metadata.name={},metadata.namespace={}".format(
728 service_name
, namespace
732 def get_credentials(self
, cluster_uuid
: str) -> str:
734 Get Cluster Kubeconfig
736 k8scluster
= self
.db
.get_one(
737 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
740 self
.db
.encrypt_decrypt_fields(
741 k8scluster
.get("credentials"),
743 ["password", "secret"],
744 schema_version
=k8scluster
["schema_version"],
745 salt
=k8scluster
["_id"],
748 return yaml
.safe_dump(k8scluster
.get("credentials"))
750 def _get_credential_name(self
, cluster_uuid
: str) -> str:
752 Get credential name for a k8s cloud
754 We cannot use the cluster_uuid for the credential name directly,
755 because it cannot start with a number, it must start with a letter.
756 Therefore, the k8s cloud credential name will be "cred-" followed
759 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
761 :return: Name to use for the credential name.
763 return "cred-{}".format(cluster_uuid
)
769 """Get the namespace UUID
770 Gets the namespace's unique name
772 :param cluster_uuid str: The UUID of the cluster
773 :returns: The namespace UUID, or raises an exception
778 def generate_kdu_instance_name(**kwargs
):
779 db_dict
= kwargs
.get("db_dict")
780 kdu_name
= kwargs
.get("kdu_name", None)
782 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
784 kdu_instance
= db_dict
["filter"]["_id"]
787 async def _get_libjuju(self
, vca_id
: str = None) -> Libjuju
:
791 :param: vca_id: VCA ID
792 If None, get a libjuju object with a Connection to the default VCA
793 Else, geta libjuju object with a Connection to the specified VCA
796 while self
.loading_libjuju
.locked():
797 await asyncio
.sleep(0.1)
799 async with self
.loading_libjuju
:
800 vca_connection
= await get_connection(self
._store
)
801 self
.libjuju
= Libjuju(vca_connection
, loop
=self
.loop
, log
=self
.log
)
804 vca_connection
= await get_connection(self
._store
, vca_id
)
812 def _get_kubectl(self
, credentials
: str) -> Kubectl
:
816 :param: kubeconfig_credentials: Kubeconfig credentials
818 kubecfg
= tempfile
.NamedTemporaryFile()
819 with
open(kubecfg
.name
, "w") as kubecfg_file
:
820 kubecfg_file
.write(credentials
)
821 return Kubectl(config_file
=kubecfg
.name
)