self._helm_command = helm_command
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+ # exception if main post renderer executable is not present
+ self.main_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get(
+ "mainpostrendererpath"
+ )
+ if self.main_post_renderer_path:
+ self._check_file_exists(
+ filename=self.main_post_renderer_path, exception_if_not_exists=True
+ )
+
+ # exception if podLabels post renderer executable is not present
+ self.podLabels_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get(
+ "podlabelspostrendererpath"
+ )
+ if self.podLabels_post_renderer_path:
+ self._check_file_exists(
+ filename=self.podLabels_post_renderer_path, exception_if_not_exists=True
+ )
+
# obtain stable repo url from config or apply default
self._stable_repo_url = self.config.get("stablerepourl")
if self._stable_repo_url == "None":
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ labels: dict = None,
kdu_name: str = None,
namespace: str = None,
):
kdu_model,
kdu_instance,
namespace,
+ labels,
params_str,
version,
atomic,
params: dict = None,
db_dict: dict = None,
namespace: str = None,
+ reset_values: bool = False,
+ reuse_values: bool = True,
+ reset_then_reuse_values: bool = False,
force: bool = False,
):
self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
+ labels_dict = None
+ if db_dict and await self._contains_labels(
+ kdu_instance, namespace, paths["kube_config"], env
+ ):
+ labels_dict = await self._labels_dict(db_dict, kdu_instance)
+
command = self._get_upgrade_command(
kdu_model,
kdu_instance,
namespace,
params_str,
+ labels_dict,
version,
atomic,
timeout,
paths["kube_config"],
+ reset_values,
+ reuse_values,
+ reset_then_reuse_values,
force,
)
kdu_model, repo_url, resource_name
)
+ labels_dict = None
+ if db_dict and await self._contains_labels(
+ kdu_instance, instance_info["namespace"], paths["kube_config"], env
+ ):
+ labels_dict = await self._labels_dict(db_dict, kdu_instance)
+
command = self._get_upgrade_scale_command(
kdu_model,
kdu_instance,
instance_info["namespace"],
scale,
+ labels_dict,
version,
atomic,
replica_str,
kdu_model,
kdu_instance,
namespace,
+ labels,
params_str,
version,
atomic,
kdu_instance,
namespace,
count,
+ labels,
version,
atomic,
replicas,
kdu_instance,
namespace,
params_str,
+ labels,
version,
atomic,
timeout,
kubeconfig,
+ reset_values,
+ reuse_values,
+ reset_then_reuse_values,
force,
) -> str:
"""Generates the command to upgrade a Helm Chart release
The --wait flag will be set automatically if --atomic is used
timeout (float): The time, in seconds, to wait
kubeconfig (str): Kubeconfig file path
+ reset_values(bool): If set, helm resets values instead of reusing previous values.
+ reuse_values(bool): If set, helm reuses previous values.
+ reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
Returns:
str: command to upgrade a Helm Chart release
return replicas
+ async def _labels_dict(self, db_dict, kdu_instance):
+ # get the network service registry
+ ns_id = db_dict["filter"]["_id"]
+ try:
+ db_nsr = self.db.get_one("nsrs", {"_id": ns_id})
+ except Exception as e:
+ print("nsr {} not found: {}".format(ns_id, e))
+ nsd_id = db_nsr["nsd"]["_id"]
+
+ # get the kdu registry
+ for index, kdu in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
+ if kdu["kdu-instance"] == kdu_instance:
+ db_kdur = kdu
+ break
+ else:
+ # No kdur found, could be the case of an EE chart
+ return {}
+
+ kdu_name = db_kdur["kdu-name"]
+ member_vnf_index = db_kdur["member-vnf-index"]
+ # get the vnf registry
+ try:
+ db_vnfr = self.db.get_one(
+ "vnfrs",
+ {"nsr-id-ref": ns_id, "member-vnf-index-ref": member_vnf_index},
+ )
+ except Exception as e:
+ print("vnfr {} not found: {}".format(member_vnf_index, e))
+
+ vnf_id = db_vnfr["_id"]
+ vnfd_id = db_vnfr["vnfd-id"]
+
+ return {
+ "managed-by": "osm.etsi.org",
+ "osm.etsi.org/ns-id": ns_id,
+ "osm.etsi.org/nsd-id": nsd_id,
+ "osm.etsi.org/vnf-id": vnf_id,
+ "osm.etsi.org/vnfd-id": vnfd_id,
+ "osm.etsi.org/kdu-id": kdu_instance,
+ "osm.etsi.org/kdu-name": kdu_name,
+ }
+
+ async def _contains_labels(self, kdu_instance, namespace, kube_config, env):
+ command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
+ kube_config,
+ self._helm_command,
+ quote(kdu_instance),
+ quote(namespace),
+ )
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ manifests = yaml.safe_load_all(output)
+ for manifest in manifests:
+ # Check if the manifest has metadata and labels
+ if (
+ manifest is not None
+ and "metadata" in manifest
+ and "labels" in manifest["metadata"]
+ ):
+ labels = {
+ "managed-by",
+ "osm.etsi.org/kdu-id",
+ "osm.etsi.org/kdu-name",
+ "osm.etsi.org/ns-id",
+ "osm.etsi.org/nsd-id",
+ "osm.etsi.org/vnf-id",
+ "osm.etsi.org/vnfd-id",
+ }
+ if labels.issubset(manifest["metadata"]["labels"].keys()):
+ return True
+ return False
+
async def _store_status(
self,
cluster_id: str,