1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from typing
import Union
23 from n2vc
.config
import EnvironConfig
24 from n2vc
.exceptions
import K8sException
25 from n2vc
.k8s_conn
import K8sConnector
26 from n2vc
.kubectl
import Kubectl
27 from .exceptions
import MethodNotImplemented
28 from n2vc
.libjuju
import Libjuju
29 from n2vc
.utils
import obj_to_dict
, obj_to_yaml
30 from n2vc
.store
import MotorStore
31 from n2vc
.vca
.cloud
import Cloud
32 from n2vc
.vca
.connection
import get_connection
35 RBAC_LABEL_KEY_NAME
= "rbac-id"
36 RBAC_STACK_PREFIX
= "juju-credential"
39 def generate_rbac_id():
40 return binascii
.hexlify(os
.urandom(4)).decode()
43 class K8sJujuConnector(K8sConnector
):
50 kubectl_command
: str = "/usr/bin/kubectl",
51 juju_command
: str = "/usr/bin/juju",
57 :param fs: file system for kubernetes and helm configuration
58 :param db: Database object
59 :param kubectl_command: path to kubectl executable
60 :param helm_command: path to helm executable
62 :param: loop: Asyncio loop
66 K8sConnector
.__init
__(
70 on_update_db
=on_update_db
,
74 self
.loop
= loop
or asyncio
.get_event_loop()
75 self
.log
.debug("Initializing K8S Juju connector")
77 db_uri
= EnvironConfig(prefixes
=["OSMLCM_", "OSMMON_"]).get("database_uri")
78 self
._store
= MotorStore(db_uri
)
79 self
.loading_libjuju
= asyncio
.Lock(loop
=self
.loop
)
81 self
.log
.debug("K8S Juju connector initialized")
82 # TODO: Remove these commented lines:
83 # self.authenticated = False
85 # self.juju_secret = ""
92 namespace
: str = "kube-system",
93 reuse_cluster_uuid
: str = None,
97 It prepares a given K8s cluster environment to run Juju bundles.
99 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
101 :param namespace: optional namespace to be used for juju. By default,
102 'kube-system' will be used
103 :param reuse_cluster_uuid: existing cluster uuid for reuse
104 :param: kwargs: Additional parameters
107 :return: uuid of the K8s cluster and True if connector has installed some
108 software in the cluster
109 (on error, an exception will be raised)
111 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
113 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
114 kubectl
= self
._get
_kubectl
(k8s_creds
)
116 # CREATING RESOURCES IN K8S
117 rbac_id
= generate_rbac_id()
118 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
119 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
121 # Create cleanup dictionary to clean up created resources
122 # if it fails in the middle of the process
125 kubectl
.create_cluster_role(
131 "delete": kubectl
.delete_cluster_role
,
132 "args": (metadata_name
,),
136 kubectl
.create_service_account(
142 "delete": kubectl
.delete_service_account
,
143 "args": (metadata_name
,),
147 kubectl
.create_cluster_role_binding(
153 "delete": kubectl
.delete_service_account
,
154 "args": (metadata_name
,),
157 token
, client_cert_data
= await kubectl
.get_secret_data(
161 default_storage_class
= kubectl
.get_default_storage_class()
162 await libjuju
.add_k8s(
166 client_cert_data
=client_cert_data
,
167 configuration
=kubectl
.configuration
,
168 storage_class
=default_storage_class
,
169 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
171 return cluster_uuid
, True
172 except Exception as e
:
173 self
.log
.error("Error initializing k8scluster: {}".format(e
))
174 if len(cleanup_data
) > 0:
175 self
.log
.debug("Cleaning up created resources in k8s cluster...")
176 for item
in cleanup_data
:
177 delete_function
= item
["delete"]
178 delete_args
= item
["args"]
179 delete_function(*delete_args
)
180 self
.log
.debug("Cleanup finished")
183 """Repo Management"""
189 _type
: str = "charm",
191 raise MethodNotImplemented()
193 async def repo_list(self
):
194 raise MethodNotImplemented()
196 async def repo_remove(
200 raise MethodNotImplemented()
202 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
204 Returns None as currently add_repo is not implemented
214 uninstall_sw
: bool = False,
219 Resets the Kubernetes cluster by removing the model that represents it.
221 :param cluster_uuid str: The UUID of the cluster to reset
222 :param force: Force reset
223 :param uninstall_sw: Boolean to uninstall sw
224 :param: kwargs: Additional parameters
227 :return: Returns True if successful or raises an exception.
231 self
.log
.debug("[reset] Removing k8s cloud")
232 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
234 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
236 cloud_creds
= await libjuju
.get_cloud_credentials(cloud
)
238 await libjuju
.remove_cloud(cluster_uuid
)
240 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
242 kubectl
= self
._get
_kubectl
(credentials
)
245 kubectl
.delete_cluster_role_binding
,
246 kubectl
.delete_service_account
,
247 kubectl
.delete_cluster_role
,
250 credential_attrs
= cloud_creds
[0].result
["attrs"]
251 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
252 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
253 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
254 for delete_func
in delete_functions
:
256 delete_func(metadata_name
)
257 except Exception as e
:
258 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
260 except Exception as e
:
261 self
.log
.debug("Caught exception during reset: {}".format(e
))
273 timeout
: float = 1800,
275 db_dict
: dict = None,
276 kdu_name
: str = None,
277 namespace
: str = None,
282 :param cluster_uuid str: The UUID of the cluster to install to
283 :param kdu_model str: The name or path of a bundle to install
284 :param kdu_instance: Kdu instance name
285 :param atomic bool: If set, waits until the model is active and resets
286 the cluster on failure.
287 :param timeout int: The time, in seconds, to wait for the install
289 :param params dict: Key-value pairs of instantiation parameters
290 :param kdu_name: Name of the KDU instance to be installed
291 :param namespace: K8s namespace to use for the KDU instance
292 :param kwargs: Additional parameters
295 :return: If successful, returns ?
297 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
301 raise K8sException("db_dict must be set")
303 raise K8sException("bundle must be set")
305 if bundle
.startswith("cs:"):
306 # For Juju Bundles provided by the Charm Store
308 elif bundle
.startswith("ch:"):
309 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
311 elif bundle
.startswith("http"):
315 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
316 os
.chdir(new_workdir
)
317 bundle
= "local:{}".format(kdu_model
)
319 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
321 # Create the new model
322 self
.log
.debug("Adding model: {}".format(kdu_instance
))
323 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
324 await libjuju
.add_model(kdu_instance
, cloud
)
327 # TODO: Instantiation parameters
330 "Juju bundle that models the KDU, in any of the following ways:
331 - <juju-repo>/<juju-bundle>
332 - <juju-bundle folder under k8s_models folder in the package>
333 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
335 - <URL_where_to_fetch_juju_bundle>
338 previous_workdir
= os
.getcwd()
339 except FileNotFoundError
:
340 previous_workdir
= "/app/storage"
342 self
.log
.debug("[install] deploying {}".format(bundle
))
343 await libjuju
.deploy(
344 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
346 os
.chdir(previous_workdir
)
347 if self
.on_update_db
:
348 await self
.on_update_db(
351 filter=db_dict
["filter"],
352 vca_id
=kwargs
.get("vca_id"),
361 total_timeout
: float = 1800,
364 """Scale an application in a model
366 :param: kdu_instance str: KDU instance name
367 :param: scale int: Scale to which to set this application
368 :param: resource_name str: Resource name (Application name)
369 :param: timeout float: The time, in seconds, to wait for the install
371 :param kwargs: Additional parameters
374 :return: If successful, returns True
378 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
379 await libjuju
.scale_application(
380 model_name
=kdu_instance
,
381 application_name
=resource_name
,
383 total_timeout
=total_timeout
,
385 except Exception as e
:
386 error_msg
= "Error scaling application {} in kdu instance {}: {}".format(
387 resource_name
, kdu_instance
, e
389 self
.log
.error(error_msg
)
390 raise K8sException(message
=error_msg
)
393 async def get_scale_count(
399 """Get an application scale count
401 :param: resource_name str: Resource name (Application name)
402 :param: kdu_instance str: KDU instance name
403 :param kwargs: Additional parameters
405 :return: Return application instance count
408 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
409 status
= await libjuju
.get_model_status(kdu_instance
)
410 return len(status
.applications
[resource_name
].units
)
411 except Exception as e
:
412 error_msg
= "Error getting scale count from application {} in kdu instance {}: {}".format(
413 resource_name
, kdu_instance
, e
415 self
.log
.error(error_msg
)
416 raise K8sException(message
=error_msg
)
418 async def instances_list(self
, cluster_uuid
: str) -> list:
420 returns a list of deployed releases in a cluster
422 :param cluster_uuid: the cluster
431 kdu_model
: str = None,
436 :param cluster_uuid str: The UUID of the cluster to upgrade
437 :param kdu_instance str: The unique name of the KDU instance
438 :param kdu_model str: The name or path of the bundle to upgrade to
439 :param params dict: Key-value pairs of instantiation parameters
441 :return: If successful, reference to the new revision number of the
445 # TODO: Loop through the bundle and upgrade each charm individually
448 The API doesn't have a concept of bundle upgrades, because there are
449 many possible changes: charm revision, disk, number of units, etc.
451 As such, we are only supporting a limited subset of upgrades. We'll
452 upgrade the charm revision but leave storage and scale untouched.
454 Scale changes should happen through OSM constructs, and changes to
455 storage would require a redeployment of the service, at least in this
458 raise MethodNotImplemented()
470 :param cluster_uuid str: The UUID of the cluster to rollback
471 :param kdu_instance str: The unique name of the KDU instance
472 :param revision int: The revision to revert to. If omitted, rolls back
473 the previous upgrade.
475 :return: If successful, returns the revision of active KDU instance,
476 or raises an exception
478 raise MethodNotImplemented()
488 """Uninstall a KDU instance
490 :param cluster_uuid str: The UUID of the cluster
491 :param kdu_instance str: The unique name of the KDU instance
492 :param kwargs: Additional parameters
495 :return: Returns True if successful, or raises an exception
498 self
.log
.debug("[uninstall] Destroying model")
499 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
501 await libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
503 # self.log.debug("[uninstall] Model destroyed and disconnecting")
504 # await controller.disconnect()
507 # TODO: Remove these commented lines
508 # if not self.authenticated:
509 # self.log.debug("[uninstall] Connecting to controller")
510 # await self.login(cluster_uuid)
512 async def exec_primitive(
514 cluster_uuid
: str = None,
515 kdu_instance
: str = None,
516 primitive_name
: str = None,
517 timeout
: float = 300,
519 db_dict
: dict = None,
522 """Exec primitive (Juju action)
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
526 :param primitive_name: Name of action that will be executed
527 :param timeout: Timeout for action execution
528 :param params: Dictionary of all the parameters needed for the action
529 :param db_dict: Dictionary for any additional data
530 :param kwargs: Additional parameters
533 :return: Returns the output of the action
535 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
537 if not params
or "application-name" not in params
:
539 "Missing application-name argument, \
540 argument needed for K8s actions"
544 "[exec_primitive] Getting model "
545 "kdu_instance: {}".format(kdu_instance
)
547 application_name
= params
["application-name"]
548 actions
= await libjuju
.get_actions(application_name
, kdu_instance
)
549 if primitive_name
not in actions
:
550 raise K8sException("Primitive {} not found".format(primitive_name
))
551 output
, status
= await libjuju
.execute_action(
552 application_name
, kdu_instance
, primitive_name
, **params
555 if status
!= "completed":
557 "status is not completed: {} output: {}".format(status
, output
)
559 if self
.on_update_db
:
560 await self
.on_update_db(
561 cluster_uuid
, kdu_instance
, filter=db_dict
["filter"]
566 except Exception as e
:
567 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
568 self
.log
.error(error_msg
)
569 raise K8sException(message
=error_msg
)
573 async def inspect_kdu(
579 Inspects a bundle and returns a dictionary of config parameters and
580 their default values.
582 :param kdu_model str: The name or path of the bundle to inspect.
584 :return: If successful, returns a dictionary of available parameters
585 and their default values.
589 if not os
.path
.exists(kdu_model
):
590 raise K8sException("file {} not found".format(kdu_model
))
592 with
open(kdu_model
, "r") as f
:
593 bundle
= yaml
.safe_load(f
.read())
597 'description': 'Test bundle',
598 'bundle': 'kubernetes',
601 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
604 'password': 'manopw',
605 'root_password': 'osm4u',
608 'series': 'kubernetes'
613 # TODO: This should be returned in an agreed-upon format
614 kdu
= bundle
["applications"]
624 If available, returns the README of the bundle.
626 :param kdu_model str: The name or path of a bundle
628 :return: If found, returns the contents of the README.
632 files
= ["README", "README.txt", "README.md"]
633 path
= os
.path
.dirname(kdu_model
)
634 for file in os
.listdir(path
):
636 with
open(file, "r") as f
:
642 async def status_kdu(
646 complete_status
: bool = False,
647 yaml_format
: bool = False,
649 ) -> Union
[str, dict]:
650 """Get the status of the KDU
652 Get the current status of the KDU instance.
654 :param cluster_uuid str: The UUID of the cluster
655 :param kdu_instance str: The unique id of the KDU instance
656 :param complete_status: To get the complete_status of the KDU
657 :param yaml_format: To get the status in proper format for NSR record
658 :param: kwargs: Additional parameters
661 :return: Returns a dictionary containing namespace, state, resources,
662 and deployment_time and returns complete_status if complete_status is True
664 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
667 model_status
= await libjuju
.get_model_status(kdu_instance
)
669 if not complete_status
:
670 for name
in model_status
.applications
:
671 application
= model_status
.applications
[name
]
672 status
[name
] = {"status": application
["status"]["status"]}
675 return obj_to_yaml(model_status
)
677 return obj_to_dict(model_status
)
681 async def update_vca_status(self
, vcastatus
: dict, kdu_instance
: str, **kwargs
):
683 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
685 :param vcastatus dict: dict containing vcastatus
686 :param kdu_instance str: The unique id of the KDU instance
687 :param: kwargs: Additional parameters
692 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
694 for model_name
in vcastatus
:
695 # Adding executed actions
696 vcastatus
[model_name
][
698 ] = await libjuju
.get_executed_actions(kdu_instance
)
700 for application
in vcastatus
[model_name
]["applications"]:
701 # Adding application actions
702 vcastatus
[model_name
]["applications"][application
][
704 ] = await libjuju
.get_actions(application
, kdu_instance
)
705 # Adding application configs
706 vcastatus
[model_name
]["applications"][application
][
708 ] = await libjuju
.get_application_configs(kdu_instance
, application
)
710 except Exception as e
:
711 self
.log
.debug("Error in updating vca status: {}".format(str(e
)))
713 async def get_services(
714 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
716 """Return a list of services of a kdu_instance"""
718 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
719 kubectl
= self
._get
_kubectl
(credentials
)
720 return kubectl
.get_services(
721 field_selector
="metadata.namespace={}".format(kdu_instance
)
724 async def get_service(
725 self
, cluster_uuid
: str, service_name
: str, namespace
: str
727 """Return data for a specific service inside a namespace"""
729 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
730 kubectl
= self
._get
_kubectl
(credentials
)
731 return kubectl
.get_services(
732 field_selector
="metadata.name={},metadata.namespace={}".format(
733 service_name
, namespace
737 def get_credentials(self
, cluster_uuid
: str) -> str:
739 Get Cluster Kubeconfig
741 k8scluster
= self
.db
.get_one(
742 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
745 self
.db
.encrypt_decrypt_fields(
746 k8scluster
.get("credentials"),
748 ["password", "secret"],
749 schema_version
=k8scluster
["schema_version"],
750 salt
=k8scluster
["_id"],
753 return yaml
.safe_dump(k8scluster
.get("credentials"))
755 def _get_credential_name(self
, cluster_uuid
: str) -> str:
757 Get credential name for a k8s cloud
759 We cannot use the cluster_uuid for the credential name directly,
760 because it cannot start with a number, it must start with a letter.
761 Therefore, the k8s cloud credential name will be "cred-" followed
764 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
766 :return: Name to use for the credential name.
768 return "cred-{}".format(cluster_uuid
)
774 """Get the namespace UUID
775 Gets the namespace's unique name
777 :param cluster_uuid str: The UUID of the cluster
778 :returns: The namespace UUID, or raises an exception
783 def generate_kdu_instance_name(**kwargs
):
784 db_dict
= kwargs
.get("db_dict")
785 kdu_name
= kwargs
.get("kdu_name", None)
787 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
789 kdu_instance
= db_dict
["filter"]["_id"]
792 async def _get_libjuju(self
, vca_id
: str = None) -> Libjuju
:
796 :param: vca_id: VCA ID
797 If None, get a libjuju object with a Connection to the default VCA
798 Else, geta libjuju object with a Connection to the specified VCA
801 while self
.loading_libjuju
.locked():
802 await asyncio
.sleep(0.1)
804 async with self
.loading_libjuju
:
805 vca_connection
= await get_connection(self
._store
)
806 self
.libjuju
= Libjuju(vca_connection
, loop
=self
.loop
, log
=self
.log
)
809 vca_connection
= await get_connection(self
._store
, vca_id
)
817 def _get_kubectl(self
, credentials
: str) -> Kubectl
:
821 :param: kubeconfig_credentials: Kubeconfig credentials
823 kubecfg
= tempfile
.NamedTemporaryFile()
824 with
open(kubecfg
.name
, "w") as kubecfg_file
:
825 kubecfg_file
.write(credentials
)
826 return Kubectl(config_file
=kubecfg
.name
)