##
import abc
import asyncio
+from typing import Union
import random
import time
import shlex
if self._stable_repo_url == "None":
self._stable_repo_url = None
+ # Lock to avoid concurrent execution of helm commands
+ self.cmd_lock = asyncio.Lock()
+
def _get_namespace(self, cluster_uuid: str) -> str:
"""
Obtains the namespace used by the cluster with the uuid passed by argument
# sync local dir
self.fs.sync(from_path=cluster_uuid)
- # helm repo update
- command = "env KUBECONFIG={} {} repo update".format(
- paths["kube_config"], self._helm_command
- )
- self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
-
# helm repo add name url
command = ("env KUBECONFIG={} {} repo add {} {}").format(
paths["kube_config"], self._helm_command, name, url
if cert:
temp_cert_file = os.path.join(
- self.fs.path, "{}/helmcerts/".format(cluster_id), "temp.crt"
+ self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
)
os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
with open(temp_cert_file, "w") as the_cert:
command=command, raise_exception_on_error=True, env=env
)
+ # helm repo update
+ command = "env KUBECONFIG={} {} repo update {}".format(
+ paths["kube_config"], self._helm_command, name
+ )
+ self.log.debug("updating repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_uuid)
+
+ async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
+ self.log.debug(
+ "Cluster {}, updating {} repository {}".format(
+ cluster_uuid, repo_type, name
+ )
+ )
+
+ # init_env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
+
+ # helm repo update
+ command = "{} repo update {}".format(self._helm_command, name)
+ self.log.debug("updating repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
# sync fs
self.fs.reverse_sync(from_path=cluster_uuid)
return True
+ def _is_helm_chart_a_file(self, chart_name: str):
+ return chart_name.count("/") > 1
+
async def _install_impl(
self,
cluster_id: str,
# version
kdu_model, version = self._split_version(kdu_model)
+ repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_id, repo)
+
command = self._get_install_command(
kdu_model,
kdu_instance,
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=False,
)
)
namespace=namespace,
db_dict=db_dict,
operation="install",
- run_once=True,
- check_every=0,
)
if rc != 0:
# version
kdu_model, version = self._split_version(kdu_model)
+ repo = self._split_repo(kdu_model)
+ if repo:
+ await self.repo_update(cluster_uuid, repo)
+
command = self._get_upgrade_command(
kdu_model,
kdu_instance,
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="upgrade",
- run_once=False,
)
)
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="upgrade",
- run_once=True,
- check_every=0,
)
if rc != 0:
kdu_model, version = self._split_version(kdu_model)
repo_url = await self._find_repo(kdu_model, cluster_uuid)
- if not repo_url:
- raise K8sException(
- "Repository not found for kdu_model {}".format(kdu_model)
- )
_, replica_str = await self._get_replica_count_url(
kdu_model, repo_url, resource_name
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="scale",
- run_once=False,
)
)
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="scale",
- run_once=True,
- check_every=0,
)
if rc != 0:
cluster_uuid: The UUID of the cluster
resource_name: Resource name
kdu_instance: KDU instance name
- kdu_model: The name or path of a bundle
+ kdu_model: The name or path of an Helm Chart
kwargs: Additional parameters
Returns:
)
replicas = await self._get_replica_count_instance(
- kdu_instance, instance_info["namespace"], paths["kube_config"]
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ kubeconfig=paths["kube_config"],
+ resource_name=resource_name,
)
# Get default value if scale count is not found from provided values
if not replicas:
- repo_url = await self._find_repo(kdu_model, cluster_uuid)
- if not repo_url:
- raise K8sException(
- "Repository not found for kdu_model {}".format(kdu_model)
- )
-
+ repo_url = await self._find_repo(
+ kdu_model=kdu_model, cluster_uuid=cluster_uuid
+ )
replicas, _ = await self._get_replica_count_url(
- kdu_model, repo_url, resource_name
+ kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
)
if not replicas:
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=False,
)
)
namespace=instance_info["namespace"],
db_dict=db_dict,
operation="rollback",
- run_once=True,
- check_every=0,
)
if rc != 0:
self.log.debug("Instance {} not found".format(kdu_instance))
return None
+ async def upgrade_charm(
+ self,
+ ee_id: str = None,
+ path: str = None,
+ charm_id: str = None,
+ charm_type: str = None,
+ timeout: float = None,
+ ) -> str:
+ """This method upgrade charms in VNFs
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ The output of the update operation if status equals to "completed"
+ """
+ raise K8sException("KDUs deployed with Helm do not support charm upgrade")
+
async def exec_primitive(
self,
cluster_uuid: str = None,
return service
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
+ async def status_kdu(
+ self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
+ ) -> Union[str, dict]:
"""
This call would retrieve tha current state of a given KDU instance. It would be
would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance
:param kwargs: Additional parameters (None yet)
+ :param yaml_format: if the return shall be returned as an YAML string or as a
+ dictionary
:return: If successful, it will return the following vector of arguments:
- K8s `namespace` in the cluster where the KDU lives
- `state` of the KDU instance. It can be:
cluster_id=cluster_uuid,
kdu_instance=kdu_instance,
namespace=instance["namespace"],
+ yaml_format=yaml_format,
show_error_log=True,
- return_text=True,
)
# sync fs
)
async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+ """Method to obtain the Helm Chart package's values
+
+ Args:
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+
+ Returns:
+ str: the values of the Helm Chart package
+ """
self.log.debug(
"inspect kdu_model values {} from (optional) repo: {}".format(
cluster_id: str,
kdu_instance: str,
namespace: str = None,
+ yaml_format: bool = False,
show_error_log: bool = False,
- return_text: bool = False,
- ):
+ ) -> Union[str, dict]:
"""
Implements the helm version dependent method to obtain status of a helm instance
"""
def _get_inspect_command(
self, show_command: str, kdu_model: str, repo_str: str, version: str
):
- """
- Obtain command to be executed to obtain information about the kdu
+ """Generates the command to obtain the information about an Helm Chart package
+ (´helm show ...´ command)
+
+ Args:
+ show_command: the second part of the command (`helm show <show_command>`)
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+ version: constraint with specific version of the Chart to use
+
+ Returns:
+ str: the generated Helm Chart command
"""
@abc.abstractmethod
environ.update(env)
try:
- process = await asyncio.create_subprocess_exec(
- *command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- env=environ,
- )
+ async with self.cmd_lock:
+ process = await asyncio.create_subprocess_exec(
+ *command,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ env=environ,
+ )
- # wait for command terminate
- stdout, stderr = await process.communicate()
+ # wait for command terminate
+ stdout, stderr = await process.communicate()
- return_code = process.returncode
+ return_code = process.returncode
output = ""
if stdout:
environ.update(env)
try:
- read, write = os.pipe()
- await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
- os.close(write)
- process_2 = await asyncio.create_subprocess_exec(
- *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
- )
- os.close(read)
- stdout, stderr = await process_2.communicate()
+ async with self.cmd_lock:
+ read, write = os.pipe()
+ await asyncio.create_subprocess_exec(
+ *command1, stdout=write, env=environ
+ )
+ os.close(write)
+ process_2 = await asyncio.create_subprocess_exec(
+ *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
+ )
+ os.close(read)
+ stdout, stderr = await process_2.communicate()
- return_code = process_2.returncode
+ return_code = process_2.returncode
output = ""
if stdout:
async def _exec_inspect_command(
self, inspect_command: str, kdu_model: str, repo_url: str = None
):
- """Obtains information about a kdu, no cluster (no env)."""
+ """Obtains information about an Helm Chart package (´helm show´ command)
+
+ Args:
+ inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
+ kdu_model: The name or path of an Helm Chart
+ repo_url: Helm Chart repository url
+
+ Returns:
+ str: the requested info about the Helm Chart package
+ """
repo_str = ""
if repo_url:
async def _get_replica_count_url(
self,
kdu_model: str,
- repo_url: str,
+ repo_url: str = None,
resource_name: str = None,
):
"""Get the replica count value in the Helm Chart Values.
Args:
- kdu_model: The name or path of a bundle
+ kdu_model: The name or path of an Helm Chart
repo_url: Helm Chart repository url
resource_name: Resource name
"""
kdu_values = yaml.load(
- await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
+ await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
+ Loader=yaml.SafeLoader,
)
if not kdu_values:
operation: str,
kdu_instance: str,
namespace: str = None,
- check_every: float = 10,
db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- namespace=namespace,
- return_text=False,
- )
- status = detailed_status.get("info").get("description")
- self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug(
- "_store_status exception: {}".format(str(e)), exc_info=True
- )
- pass
- finally:
- if run_once:
- return
+ ) -> None:
+ """
+ Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
+
+ :param cluster_id (str): the cluster where the KDU instance is deployed
+ :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
+ :param kdu_instance (str): The KDU instance in relation to which the status is obtained
+ :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
+ :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
+ values for the keys:
+ - "collection": The Mongo DB collection to write to
+ - "filter": The query filter to use in the update process
+ - "path": The dot separated keys which targets the object to be updated
+ Defaults to None.
+ """
+
+ try:
+ detailed_status = await self._status_kdu(
+ cluster_id=cluster_id,
+ kdu_instance=kdu_instance,
+ yaml_format=False,
+ namespace=namespace,
+ )
+
+ status = detailed_status.get("info").get("description")
+ self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
+
+ # write status to db
+ result = await self.write_app_status_to_db(
+ db_dict=db_dict,
+ status=str(status),
+ detailed_status=str(detailed_status),
+ operation=operation,
+ )
+
+ if not result:
+ self.log.info("Error writing in database. Task exiting...")
+
+ except asyncio.CancelledError as e:
+ self.log.warning(
+ f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
+ )
+ except Exception as e:
+ self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
for key in params:
value = params.get(key)
if "!!yaml" in str(value):
- value = yaml.load(value[7:])
+ value = yaml.safe_load(value[7:])
params2[key] = value
values_file = get_random_number() + ".yaml"
def _split_version(self, kdu_model: str) -> (str, str):
version = None
- if ":" in kdu_model:
+ if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
parts = kdu_model.split(sep=":")
if len(parts) == 2:
version = str(parts[1])
kdu_model = parts[0]
return kdu_model, version
+ def _split_repo(self, kdu_model: str) -> str:
+ repo_name = None
+ idx = kdu_model.find("/")
+ if idx >= 0:
+ repo_name = kdu_model[:idx]
+ return repo_name
+
async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
+ """Obtain the Helm repository for an Helm Chart
+
+ Args:
+ kdu_model (str): the KDU model associated with the Helm Chart instantiation
+ cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
+
+ Returns:
+ str: the repository URL; if Helm Chart is a local one, the function returns None
+ """
+
repo_url = None
idx = kdu_model.find("/")
if idx >= 0: