1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from typing
import Union
23 from n2vc
.config
import EnvironConfig
24 from n2vc
.definitions
import RelationEndpoint
25 from n2vc
.exceptions
import K8sException
26 from n2vc
.k8s_conn
import K8sConnector
27 from n2vc
.kubectl
import Kubectl
28 from .exceptions
import MethodNotImplemented
29 from n2vc
.libjuju
import Libjuju
30 from n2vc
.utils
import obj_to_dict
, obj_to_yaml
31 from n2vc
.store
import MotorStore
32 from n2vc
.vca
.cloud
import Cloud
33 from n2vc
.vca
.connection
import get_connection
36 RBAC_LABEL_KEY_NAME
= "rbac-id"
37 RBAC_STACK_PREFIX
= "juju-credential"
40 def generate_rbac_id():
41 return binascii
.hexlify(os
.urandom(4)).decode()
44 class K8sJujuConnector(K8sConnector
):
51 kubectl_command
: str = "/usr/bin/kubectl",
52 juju_command
: str = "/usr/bin/juju",
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
63 :param: loop: Asyncio loop
67 K8sConnector
.__init
__(
71 on_update_db
=on_update_db
,
75 self
.loop
= loop
or asyncio
.get_event_loop()
76 self
.log
.debug("Initializing K8S Juju connector")
78 db_uri
= EnvironConfig(prefixes
=["OSMLCM_", "OSMMON_"]).get("database_uri")
79 self
._store
= MotorStore(db_uri
)
80 self
.loading_libjuju
= asyncio
.Lock(loop
=self
.loop
)
81 self
.uninstall_locks
= {}
83 self
.log
.debug("K8S Juju connector initialized")
84 # TODO: Remove these commented lines:
85 # self.authenticated = False
87 # self.juju_secret = ""
94 namespace
: str = "kube-system",
95 reuse_cluster_uuid
: str = None,
99 It prepares a given K8s cluster environment to run Juju bundles.
101 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
103 :param namespace: optional namespace to be used for juju. By default,
104 'kube-system' will be used
105 :param reuse_cluster_uuid: existing cluster uuid for reuse
106 :param: kwargs: Additional parameters
109 :return: uuid of the K8s cluster and True if connector has installed some
110 software in the cluster
111 (on error, an exception will be raised)
113 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
115 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
116 kubectl
= self
._get
_kubectl
(k8s_creds
)
118 # CREATING RESOURCES IN K8S
119 rbac_id
= generate_rbac_id()
120 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
121 labels
= {RBAC_STACK_PREFIX
: rbac_id
}
123 # Create cleanup dictionary to clean up created resources
124 # if it fails in the middle of the process
127 self
.log
.debug("Initializing K8s cluster for juju")
128 kubectl
.create_cluster_role(
132 self
.log
.debug("Cluster role created")
135 "delete": kubectl
.delete_cluster_role
,
136 "args": (metadata_name
,),
140 kubectl
.create_service_account(
144 self
.log
.debug("Service account created")
147 "delete": kubectl
.delete_service_account
,
148 "args": (metadata_name
,),
152 kubectl
.create_cluster_role_binding(
156 self
.log
.debug("Role binding created")
159 "delete": kubectl
.delete_service_account
,
160 "args": (metadata_name
,),
163 token
, client_cert_data
= await kubectl
.get_secret_data(
167 default_storage_class
= kubectl
.get_default_storage_class()
168 self
.log
.debug("Default storage class: {}".format(default_storage_class
))
169 await libjuju
.add_k8s(
173 client_cert_data
=client_cert_data
,
174 configuration
=kubectl
.configuration
,
175 storage_class
=default_storage_class
,
176 credential_name
=self
._get
_credential
_name
(cluster_uuid
),
178 self
.log
.debug("K8s cluster added to juju controller")
179 return cluster_uuid
, True
180 except Exception as e
:
181 self
.log
.error("Error initializing k8scluster: {}".format(e
), exc_info
=True)
182 if len(cleanup_data
) > 0:
183 self
.log
.debug("Cleaning up created resources in k8s cluster...")
184 for item
in cleanup_data
:
185 delete_function
= item
["delete"]
186 delete_args
= item
["args"]
187 delete_function(*delete_args
)
188 self
.log
.debug("Cleanup finished")
191 """Repo Management"""
197 _type
: str = "charm",
200 password
: str = None,
202 raise MethodNotImplemented()
204 async def repo_list(self
):
205 raise MethodNotImplemented()
207 async def repo_remove(
211 raise MethodNotImplemented()
213 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
215 Returns None as currently add_repo is not implemented
225 uninstall_sw
: bool = False,
230 Resets the Kubernetes cluster by removing the model that represents it.
232 :param cluster_uuid str: The UUID of the cluster to reset
233 :param force: Force reset
234 :param uninstall_sw: Boolean to uninstall sw
235 :param: kwargs: Additional parameters
238 :return: Returns True if successful or raises an exception.
242 self
.log
.debug("[reset] Removing k8s cloud")
243 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
245 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
247 cloud_creds
= await libjuju
.get_cloud_credentials(cloud
)
249 await libjuju
.remove_cloud(cluster_uuid
)
251 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
253 kubectl
= self
._get
_kubectl
(credentials
)
256 kubectl
.delete_cluster_role_binding
,
257 kubectl
.delete_service_account
,
258 kubectl
.delete_cluster_role
,
261 credential_attrs
= cloud_creds
[0].result
["attrs"]
262 if RBAC_LABEL_KEY_NAME
in credential_attrs
:
263 rbac_id
= credential_attrs
[RBAC_LABEL_KEY_NAME
]
264 metadata_name
= "{}-{}".format(RBAC_STACK_PREFIX
, rbac_id
)
265 for delete_func
in delete_functions
:
267 delete_func(metadata_name
)
268 except Exception as e
:
269 self
.log
.warning("Cannot remove resource in K8s {}".format(e
))
271 except Exception as e
:
272 self
.log
.debug("Caught exception during reset: {}".format(e
))
284 timeout
: float = 1800,
286 db_dict
: dict = None,
287 kdu_name
: str = None,
288 namespace
: str = None,
293 :param cluster_uuid str: The UUID of the cluster to install to
294 :param kdu_model str: The name or path of a bundle to install
295 :param kdu_instance: Kdu instance name
296 :param atomic bool: If set, waits until the model is active and resets
297 the cluster on failure.
298 :param timeout int: The time, in seconds, to wait for the install
300 :param params dict: Key-value pairs of instantiation parameters
301 :param kdu_name: Name of the KDU instance to be installed
302 :param namespace: K8s namespace to use for the KDU instance
303 :param kwargs: Additional parameters
306 :return: If successful, returns ?
308 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
312 raise K8sException("db_dict must be set")
314 raise K8sException("bundle must be set")
316 if bundle
.startswith("cs:"):
317 # For Juju Bundles provided by the Charm Store
319 elif bundle
.startswith("ch:"):
320 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
322 elif bundle
.startswith("http"):
326 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
327 os
.chdir(new_workdir
)
328 bundle
= "local:{}".format(kdu_model
)
330 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
332 # Create the new model
333 self
.log
.debug("Adding model: {}".format(kdu_instance
))
334 cloud
= Cloud(cluster_uuid
, self
._get
_credential
_name
(cluster_uuid
))
335 await libjuju
.add_model(kdu_instance
, cloud
)
338 # TODO: Instantiation parameters
341 "Juju bundle that models the KDU, in any of the following ways:
342 - <juju-repo>/<juju-bundle>
343 - <juju-bundle folder under k8s_models folder in the package>
344 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
346 - <URL_where_to_fetch_juju_bundle>
349 previous_workdir
= os
.getcwd()
350 except FileNotFoundError
:
351 previous_workdir
= "/app/storage"
353 self
.log
.debug("[install] deploying {}".format(bundle
))
354 await libjuju
.deploy(
355 bundle
, model_name
=kdu_instance
, wait
=atomic
, timeout
=timeout
357 os
.chdir(previous_workdir
)
358 if self
.on_update_db
:
359 await self
.on_update_db(
362 filter=db_dict
["filter"],
363 vca_id
=kwargs
.get("vca_id"),
372 total_timeout
: float = 1800,
375 """Scale an application in a model
377 :param: kdu_instance str: KDU instance name
378 :param: scale int: Scale to which to set the application
379 :param: resource_name str: The application name in the Juju Bundle
380 :param: timeout float: The time, in seconds, to wait for the install
382 :param kwargs: Additional parameters
385 :return: If successful, returns True
389 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
390 await libjuju
.scale_application(
391 model_name
=kdu_instance
,
392 application_name
=resource_name
,
394 total_timeout
=total_timeout
,
396 except Exception as e
:
397 error_msg
= "Error scaling application {} in kdu instance {}: {}".format(
398 resource_name
, kdu_instance
, e
400 self
.log
.error(error_msg
)
401 raise K8sException(message
=error_msg
)
404 async def get_scale_count(
410 """Get an application scale count
412 :param: resource_name str: The application name in the Juju Bundle
413 :param: kdu_instance str: KDU instance name
414 :param kwargs: Additional parameters
416 :return: Return application instance count
420 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
421 status
= await libjuju
.get_model_status(kdu_instance
)
422 return len(status
.applications
[resource_name
].units
)
423 except Exception as e
:
424 error_msg
= "Error getting scale count from application {} in kdu instance {}: {}".format(
425 resource_name
, kdu_instance
, e
427 self
.log
.error(error_msg
)
428 raise K8sException(message
=error_msg
)
430 async def instances_list(self
, cluster_uuid
: str) -> list:
432 returns a list of deployed releases in a cluster
434 :param cluster_uuid: the cluster
443 kdu_model
: str = None,
448 :param cluster_uuid str: The UUID of the cluster to upgrade
449 :param kdu_instance str: The unique name of the KDU instance
450 :param kdu_model str: The name or path of the bundle to upgrade to
451 :param params dict: Key-value pairs of instantiation parameters
453 :return: If successful, reference to the new revision number of the
457 # TODO: Loop through the bundle and upgrade each charm individually
460 The API doesn't have a concept of bundle upgrades, because there are
461 many possible changes: charm revision, disk, number of units, etc.
463 As such, we are only supporting a limited subset of upgrades. We'll
464 upgrade the charm revision but leave storage and scale untouched.
466 Scale changes should happen through OSM constructs, and changes to
467 storage would require a redeployment of the service, at least in this
470 raise MethodNotImplemented()
482 :param cluster_uuid str: The UUID of the cluster to rollback
483 :param kdu_instance str: The unique name of the KDU instance
484 :param revision int: The revision to revert to. If omitted, rolls back
485 the previous upgrade.
487 :return: If successful, returns the revision of active KDU instance,
488 or raises an exception
490 raise MethodNotImplemented()
500 """Uninstall a KDU instance
502 :param cluster_uuid str: The UUID of the cluster
503 :param kdu_instance str: The unique name of the KDU instance
504 :param kwargs: Additional parameters
507 :return: Returns True if successful, or raises an exception
510 self
.log
.debug("[uninstall] Destroying model")
512 will_not_delete
= False
513 if kdu_instance
not in self
.uninstall_locks
:
514 self
.uninstall_locks
[kdu_instance
] = asyncio
.Lock(loop
=self
.loop
)
515 delete_lock
= self
.uninstall_locks
[kdu_instance
]
517 while delete_lock
.locked():
518 will_not_delete
= True
519 await asyncio
.sleep(0.1)
522 self
.log
.info("Model {} deleted by another worker.".format(kdu_instance
))
526 async with delete_lock
:
527 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
529 await libjuju
.destroy_model(kdu_instance
, total_timeout
=3600)
531 self
.uninstall_locks
.pop(kdu_instance
)
533 self
.log
.debug(f
"[uninstall] Model {kdu_instance} destroyed")
536 async def upgrade_charm(
540 charm_id
: str = None,
541 charm_type
: str = None,
542 timeout
: float = None,
544 """This method upgrade charms in VNFs
547 ee_id: Execution environment id
548 path: Local path to the charm
550 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
551 timeout: (Float) Timeout for the ns update operation
554 The output of the update operation if status equals to "completed"
557 "KDUs deployed with Juju Bundle do not support charm upgrade"
560 async def exec_primitive(
562 cluster_uuid
: str = None,
563 kdu_instance
: str = None,
564 primitive_name
: str = None,
565 timeout
: float = 300,
567 db_dict
: dict = None,
570 """Exec primitive (Juju action)
572 :param cluster_uuid str: The UUID of the cluster
573 :param kdu_instance str: The unique name of the KDU instance
574 :param primitive_name: Name of action that will be executed
575 :param timeout: Timeout for action execution
576 :param params: Dictionary of all the parameters needed for the action
577 :param db_dict: Dictionary for any additional data
578 :param kwargs: Additional parameters
581 :return: Returns the output of the action
583 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
585 if not params
or "application-name" not in params
:
587 "Missing application-name argument, \
588 argument needed for K8s actions"
592 "[exec_primitive] Getting model "
593 "kdu_instance: {}".format(kdu_instance
)
595 application_name
= params
["application-name"]
596 actions
= await libjuju
.get_actions(application_name
, kdu_instance
)
597 if primitive_name
not in actions
:
598 raise K8sException("Primitive {} not found".format(primitive_name
))
599 output
, status
= await libjuju
.execute_action(
600 application_name
, kdu_instance
, primitive_name
, **params
603 if status
!= "completed":
605 "status is not completed: {} output: {}".format(status
, output
)
607 if self
.on_update_db
:
608 await self
.on_update_db(
609 cluster_uuid
, kdu_instance
, filter=db_dict
["filter"]
614 except Exception as e
:
615 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
616 self
.log
.error(error_msg
)
617 raise K8sException(message
=error_msg
)
621 async def inspect_kdu(
627 Inspects a bundle and returns a dictionary of config parameters and
628 their default values.
630 :param kdu_model str: The name or path of the bundle to inspect.
632 :return: If successful, returns a dictionary of available parameters
633 and their default values.
637 if not os
.path
.exists(kdu_model
):
638 raise K8sException("file {} not found".format(kdu_model
))
640 with
open(kdu_model
, "r") as f
:
641 bundle
= yaml
.safe_load(f
.read())
645 'description': 'Test bundle',
646 'bundle': 'kubernetes',
649 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
652 'password': 'manopw',
653 'root_password': 'osm4u',
656 'series': 'kubernetes'
661 # TODO: This should be returned in an agreed-upon format
662 kdu
= bundle
["applications"]
672 If available, returns the README of the bundle.
674 :param kdu_model str: The name or path of a bundle
676 :return: If found, returns the contents of the README.
680 files
= ["README", "README.txt", "README.md"]
681 path
= os
.path
.dirname(kdu_model
)
682 for file in os
.listdir(path
):
684 with
open(file, "r") as f
:
690 async def status_kdu(
694 complete_status
: bool = False,
695 yaml_format
: bool = False,
697 ) -> Union
[str, dict]:
698 """Get the status of the KDU
700 Get the current status of the KDU instance.
702 :param cluster_uuid str: The UUID of the cluster
703 :param kdu_instance str: The unique id of the KDU instance
704 :param complete_status: To get the complete_status of the KDU
705 :param yaml_format: To get the status in proper format for NSR record
706 :param: kwargs: Additional parameters
709 :return: Returns a dictionary containing namespace, state, resources,
710 and deployment_time and returns complete_status if complete_status is True
712 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
715 model_status
= await libjuju
.get_model_status(kdu_instance
)
717 if not complete_status
:
718 for name
in model_status
.applications
:
719 application
= model_status
.applications
[name
]
720 status
[name
] = {"status": application
["status"]["status"]}
723 return obj_to_yaml(model_status
)
725 return obj_to_dict(model_status
)
729 async def add_relation(
731 provider
: RelationEndpoint
,
732 requirer
: RelationEndpoint
,
735 Add relation between two charmed endpoints
737 :param: provider: Provider relation endpoint
738 :param: requirer: Requirer relation endpoint
740 self
.log
.debug(f
"adding new relation between {provider} and {requirer}")
741 cross_model_relation
= (
742 provider
.model_name
!= requirer
.model_name
743 or requirer
.vca_id
!= requirer
.vca_id
746 if cross_model_relation
:
747 # Cross-model relation
748 provider_libjuju
= await self
._get
_libjuju
(provider
.vca_id
)
749 requirer_libjuju
= await self
._get
_libjuju
(requirer
.vca_id
)
750 offer
= await provider_libjuju
.offer(provider
)
752 saas_name
= await requirer_libjuju
.consume(
753 requirer
.model_name
, offer
, provider_libjuju
755 await requirer_libjuju
.add_relation(
762 vca_id
= provider
.vca_id
763 model
= provider
.model_name
764 libjuju
= await self
._get
_libjuju
(vca_id
)
765 # add juju relations between two applications
766 await libjuju
.add_relation(
768 endpoint_1
=provider
.endpoint
,
769 endpoint_2
=requirer
.endpoint
,
771 except Exception as e
:
772 message
= f
"Error adding relation between {provider} and {requirer}: {e}"
773 self
.log
.error(message
)
774 raise Exception(message
=message
)
776 async def update_vca_status(self
, vcastatus
: dict, kdu_instance
: str, **kwargs
):
778 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
780 :param vcastatus dict: dict containing vcastatus
781 :param kdu_instance str: The unique id of the KDU instance
782 :param: kwargs: Additional parameters
787 libjuju
= await self
._get
_libjuju
(kwargs
.get("vca_id"))
789 for model_name
in vcastatus
:
790 # Adding executed actions
791 vcastatus
[model_name
][
793 ] = await libjuju
.get_executed_actions(kdu_instance
)
795 for application
in vcastatus
[model_name
]["applications"]:
796 # Adding application actions
797 vcastatus
[model_name
]["applications"][application
]["actions"] = {}
798 # Adding application configs
799 vcastatus
[model_name
]["applications"][application
][
801 ] = await libjuju
.get_application_configs(kdu_instance
, application
)
803 except Exception as e
:
804 self
.log
.debug("Error in updating vca status: {}".format(str(e
)))
806 async def get_services(
807 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
809 """Return a list of services of a kdu_instance"""
811 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
812 kubectl
= self
._get
_kubectl
(credentials
)
813 return kubectl
.get_services(
814 field_selector
="metadata.namespace={}".format(kdu_instance
)
817 async def get_service(
818 self
, cluster_uuid
: str, service_name
: str, namespace
: str
820 """Return data for a specific service inside a namespace"""
822 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
823 kubectl
= self
._get
_kubectl
(credentials
)
824 return kubectl
.get_services(
825 field_selector
="metadata.name={},metadata.namespace={}".format(
826 service_name
, namespace
830 def get_credentials(self
, cluster_uuid
: str) -> str:
832 Get Cluster Kubeconfig
834 k8scluster
= self
.db
.get_one(
835 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
838 self
.db
.encrypt_decrypt_fields(
839 k8scluster
.get("credentials"),
841 ["password", "secret"],
842 schema_version
=k8scluster
["schema_version"],
843 salt
=k8scluster
["_id"],
846 return yaml
.safe_dump(k8scluster
.get("credentials"))
848 def _get_credential_name(self
, cluster_uuid
: str) -> str:
850 Get credential name for a k8s cloud
852 We cannot use the cluster_uuid for the credential name directly,
853 because it cannot start with a number, it must start with a letter.
854 Therefore, the k8s cloud credential name will be "cred-" followed
857 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
859 :return: Name to use for the credential name.
861 return "cred-{}".format(cluster_uuid
)
867 """Get the namespace UUID
868 Gets the namespace's unique name
870 :param cluster_uuid str: The UUID of the cluster
871 :returns: The namespace UUID, or raises an exception
876 def generate_kdu_instance_name(**kwargs
):
877 db_dict
= kwargs
.get("db_dict")
878 kdu_name
= kwargs
.get("kdu_name", None)
880 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
882 kdu_instance
= db_dict
["filter"]["_id"]
885 async def _get_libjuju(self
, vca_id
: str = None) -> Libjuju
:
889 :param: vca_id: VCA ID
890 If None, get a libjuju object with a Connection to the default VCA
891 Else, geta libjuju object with a Connection to the specified VCA
894 while self
.loading_libjuju
.locked():
895 await asyncio
.sleep(0.1)
897 async with self
.loading_libjuju
:
898 vca_connection
= await get_connection(self
._store
)
899 self
.libjuju
= Libjuju(vca_connection
, loop
=self
.loop
, log
=self
.log
)
902 vca_connection
= await get_connection(self
._store
, vca_id
)
910 def _get_kubectl(self
, credentials
: str) -> Kubectl
:
914 :param: kubeconfig_credentials: Kubeconfig credentials
916 kubecfg
= tempfile
.NamedTemporaryFile()
917 with
open(kubecfg
.name
, "w") as kubecfg_file
:
918 kubecfg_file
.write(credentials
)
919 return Kubectl(config_file
=kubecfg
.name
)