- # TODO: Remove these lines of code
- # async def has_model(self, model_name: str) -> bool:
- # """Check if a model exists in the controller
-
- # Checks to see if a model exists in the connected Juju controller.
-
- # :param model_name str: The name of the model
- # :return: A boolean indicating if the model exists
- # """
- # models = await self.controller.list_models()
-
- # if model_name in models:
- # return True
- # return False
-
- # def is_local_k8s(self, credentials: str,) -> bool:
- # """Check if a cluster is local
-
- # Checks if a cluster is running in the local host
-
- # :param credentials dict: A dictionary containing the k8s credentials
- # :returns: A boolean if the cluster is running locally
- # """
-
- # creds = yaml.safe_load(credentials)
-
- # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
- # for cluster in creds["clusters"]:
- # if "server" in cluster["cluster"]:
- # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
- # return True
-
- # return False
-
- # async def get_controller(self, cluster_uuid):
- # """Login to the Juju controller."""
-
- # config = self.get_config(cluster_uuid)
-
- # juju_endpoint = config["endpoint"]
- # juju_user = config["username"]
- # juju_secret = config["secret"]
- # juju_ca_cert = config["cacert"]
-
- # controller = Controller()
-
- # if juju_secret:
- # self.log.debug(
- # "Connecting to controller... ws://{} as {}".format(
- # juju_endpoint, juju_user,
- # )
- # )
- # try:
- # await controller.connect(
- # endpoint=juju_endpoint,
- # username=juju_user,
- # password=juju_secret,
- # cacert=juju_ca_cert,
- # )
- # self.log.debug("JujuApi: Logged into controller")
- # return controller
- # except Exception as ex:
- # self.log.debug(ex)
- # self.log.debug("Caught exception: {}".format(ex))
- # else:
- # self.log.fatal("VCA credentials not configured.")
-
- # TODO: Remove these commented lines
- # self.authenticated = False
- # if self.authenticated:
- # return
-
- # self.connecting = True
- # juju_public_key = None
- # self.authenticated = True
- # Test: Make sure we have the credentials loaded
- # async def logout(self):
- # """Logout of the Juju controller."""
- # self.log.debug("[logout]")
- # if not self.authenticated:
- # return False
-
- # for model in self.models:
- # self.log.debug("Logging out of model {}".format(model))
- # await self.models[model].disconnect()
-
- # if self.controller:
- # self.log.debug("Disconnecting controller {}".format(self.controller))
- # await self.controller.disconnect()
- # self.controller = None
-
- # self.authenticated = False
-
- # async def remove_cloud(self, cloud_name: str,) -> bool:
- # """Remove a k8s cloud from Juju
-
- # Removes a Kubernetes cloud from Juju.
-
- # :param cloud_name str: The name of the cloud to add.
-
- # :returns: True if successful, otherwise raises an exception.
- # """
-
- # # Remove the bootstrapped controller
- # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
- # process = await asyncio.create_subprocess_exec(
- # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- # )
-
- # _stdout, stderr = await process.communicate()
-
- # return_code = process.returncode
-
- # if return_code > 0:
- # raise Exception(stderr)
-
- # # Remove the cloud from the local config
- # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
- # process = await asyncio.create_subprocess_exec(
- # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- # )
-
- # _stdout, stderr = await process.communicate()
-
- # return_code = process.returncode
-
- # if return_code > 0:
- # raise Exception(stderr)
-
- # return True
-
- # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
- # """Save the cluster configuration
-
- # Saves the cluster information to the Mongo database
-
- # :param cluster_uuid str: The UUID of the cluster
- # :param config dict: A dictionary containing the cluster configuration
- # """
-
- # juju_db = self.db.get_one("admin", {"_id": "juju"})
-
- # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
- # self.db.encrypt_decrypt_fields(
- # config,
- # "encrypt",
- # ["secret", "cacert"],
- # schema_version="1.1",
- # salt=cluster_uuid,
- # )
- # k8sclusters.append({"_id": cluster_uuid, "config": config})
- # self.db.set_one(
- # table="admin",
- # q_filter={"_id": "juju"},
- # update_dict={"k8sclusters": k8sclusters},
- # )
-
- # Private methods to create/delete needed resources in the
- # Kubernetes cluster to create the K8s cloud in Juju
-
- def _create_cluster_role(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
- field_selector="metadata.name={}".format(name)
- )
-
- if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- # Cluster role
- cluster_role = V1ClusterRole(
- metadata=metadata,
- rules=[
- V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
- V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
- ],
- )
-
- kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
-
- def _delete_cluster_role(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
-
- def _create_service_account(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) > 0:
- raise Exception(
- "Service account with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- service_account = V1ServiceAccount(metadata=metadata)
-
- kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
- ADMIN_NAMESPACE, service_account
- )
-
- def _delete_service_account(self, kubectl: Kubectl, name: str):
- kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
- name, ADMIN_NAMESPACE
- )
-
- def _create_cluster_role_binding(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
- field_selector="metadata.name={}".format(name)
- )
- if len(role_bindings.items) > 0:
- raise Exception("Generated rbac id already exists")
-
- role_binding = V1ClusterRoleBinding(
- metadata=V1ObjectMeta(name=name, labels=labels),
- role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
- subjects=[
- V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
- ],
- )
- kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
-
- def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)