+
+ def get_k8s_cloud_credential(
+ self,
+ configuration: Configuration,
+ client_cert_data: str,
+ token: str = None,
+ ) -> client.CloudCredential:
+ attrs = {}
+ # TODO: Test with AKS
+ key = None # open(configuration.key_file, "r").read()
+ username = configuration.username
+ password = configuration.password
+
+ if client_cert_data:
+ attrs["ClientCertificateData"] = client_cert_data
+ if key:
+ attrs["ClientKeyData"] = key
+ if token:
+ if username or password:
+ raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
+ attrs["Token"] = token
+
+ auth_type = None
+ if key:
+ auth_type = "oauth2"
+ if client_cert_data:
+ auth_type = "oauth2withcert"
+ if not token:
+ raise JujuInvalidK8sConfiguration(
+ "missing token for auth type {}".format(auth_type)
+ )
+ elif username:
+ if not password:
+ self.log.debug(
+ "credential for user {} has empty password".format(username)
+ )
+ attrs["username"] = username
+ attrs["password"] = password
+ if client_cert_data:
+ auth_type = "userpasswithcert"
+ else:
+ auth_type = "userpass"
+ elif client_cert_data and token:
+ auth_type = "certificate"
+ else:
+ raise JujuInvalidK8sConfiguration("authentication method not supported")
+ return client.CloudCredential(auth_type=auth_type, attrs=attrs)