blob: 1f43649dc65e801a640103e8e2ef71da4d442647 [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
import yaml
async def create_ksus(self, op_id, op_params_list, content_list):
self.logger.info("Create KSU workflow Enter")
self.logger.info(
f"Operation {op_id}. Params: {op_params_list}. Content: {content_list}"
)
if len(content_list) > 1:
raise Exception("There is no ODU workflow yet able to manage multiple KSUs")
db_ksu = content_list[0]
ksu_params = op_params_list[0]
oka_list = ksu_params["oka"]
if len(oka_list) > 1:
raise Exception(
"There is no ODU workflow yet able to manage multiple OKAs for a KSU"
)
oka_path = oka_list[0]["sw_catalog_path"]
workflow_template = "launcher-create-ksu-hr.j2"
workflow_name = f"create-ksus-{op_id}"
ksu_name = db_ksu["git_name"].lower()
# Additional params for the workflow
osm_project_name = "osm_admin" # TODO: get project name from db_ksu
kustomization_name = ksu_name
helmrelease_name = ksu_name
target_ns = ksu_params.get("namespace")
profile_type = ksu_params.get("profile", {}).get("profile_type")
profile_name = ksu_params.get("profile", {}).get("name")
age_public_key = ksu_params.get("profile", {}).get("age_pubkey")
substitute_environment = ksu_params.get("substitute_environment", "false")
substitution_filter = ksu_params.get("substitution_filter", "")
custom_env_vars = ksu_params.get("custom_env_vars", "")
if custom_env_vars:
custom_env_vars = "|\n" + "\n".join(
[" " * 12 + f"{k}={v}" for k, v in custom_env_vars.items()]
)
inline_values = ksu_params.get("inline_values", "")
if inline_values:
yaml_string = yaml.safe_dump(
inline_values, sort_keys=False, default_flow_style=False
)
inline_values = "|\n" + "\n".join(
[" " * 8 + line for line in yaml_string.splitlines()]
)
is_preexisting_cm = "false"
values_configmap_name = f"cm-{ksu_name}"
cm_values = ksu_params.get("configmap_values", "")
if cm_values:
yaml_string = yaml.safe_dump(
cm_values, sort_keys=False, default_flow_style=False
)
custom_env_vars = "|\n" + "\n".join(
[" " * 8 + line for line in yaml_string.splitlines()]
)
is_preexisting_secret = "false"
secret_values = ksu_params.get("secret_values", "")
if secret_values:
values_secret_name = f"secret-{ksu_name}"
reference_secret_for_values = f"ref-secret-{ksu_name}"
reference_key_for_values = f"ref-key-{ksu_name}"
secret_values = yaml.safe_dump(
secret_values, sort_keys=False, default_flow_style=False
)
else:
values_secret_name = ""
reference_secret_for_values = ""
reference_key_for_values = ""
sync = "true"
if secret_values:
secret_namespace = "osm-workflows"
# Create secret
await self.create_secret(
reference_secret_for_values,
secret_namespace,
reference_key_for_values,
secret_values,
)
# Render workflow
manifest = self.render_jinja_template(
workflow_template,
output_file=None,
workflow_name=workflow_name,
git_fleet_url=f"{self._repo_base_url}/{self._repo_user}/fleet-osm.git",
git_sw_catalogs_url=f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
templates_path=oka_path,
substitute_environment=substitute_environment,
substitution_filter=substitution_filter,
custom_env_vars=custom_env_vars,
kustomization_name=kustomization_name,
helmrelease_name=helmrelease_name,
inline_values=inline_values,
is_preexisting_secret=is_preexisting_secret,
target_ns=target_ns,
age_public_key=age_public_key,
values_secret_name=values_secret_name,
reference_secret_for_values=reference_secret_for_values,
reference_key_for_values=reference_key_for_values,
is_preexisting_cm=is_preexisting_cm,
values_configmap_name=values_configmap_name,
cm_values=cm_values,
ksu_name=ksu_name,
profile_name=profile_name,
profile_type=profile_type,
osm_project_name=osm_project_name,
sync=sync,
workflow_debug=self._workflow_debug,
workflow_dry_run=self._workflow_dry_run,
)
self.logger.debug(f"Workflow manifest: {manifest}")
# Submit workflow
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
api_group="argoproj.io",
api_plural="workflows",
api_version="v1alpha1",
)
return workflow_name
async def update_ksus(self, op_id, op_params_list, content_list):
self.logger.info("Update KSU workflow Enter")
self.logger.info(
f"Operation {op_id}. Params: {op_params_list}. Content: {content_list}"
)
if len(content_list) > 1:
raise Exception("There is no ODU workflow yet able to manage multiple KSUs")
db_ksu = content_list[0]
ksu_params = op_params_list[0]
oka_list = ksu_params["oka"]
if len(oka_list) > 1:
raise Exception(
"There is no ODU workflow yet able to manage multiple OKAs for a KSU"
)
oka_path = oka_list[0]["sw_catalog_path"]
workflow_template = "launcher-update-ksu-hr.j2"
workflow_name = f"update-ksus-{op_id}"
ksu_name = db_ksu["git_name"].lower()
# Additional params for the workflow
osm_project_name = "osm_admin" # TODO: get project name from db_ksu
kustomization_name = ksu_name
helmrelease_name = ksu_name
target_ns = ksu_params.get("namespace")
profile_type = ksu_params.get("profile", {}).get("profile_type")
profile_name = ksu_params.get("profile", {}).get("name")
age_public_key = ksu_params.get("profile", {}).get("age_pubkey")
substitute_environment = ksu_params.get("substitute_environment", "false")
substitution_filter = ksu_params.get("substitution_filter", "")
custom_env_vars = ksu_params.get("custom_env_vars", "")
if custom_env_vars:
custom_env_vars = "|\n" + "\n".join(
[" " * 12 + f"{k}={v}" for k, v in custom_env_vars.items()]
)
inline_values = ksu_params.get("inline_values", "")
if inline_values:
yaml_string = yaml.safe_dump(
inline_values, sort_keys=False, default_flow_style=False
)
inline_values = "|\n" + "\n".join(
[" " * 8 + line for line in yaml_string.splitlines()]
)
is_preexisting_cm = "false"
values_configmap_name = f"cm-{ksu_name}"
cm_values = ksu_params.get("configmap_values", "")
if cm_values:
yaml_string = yaml.safe_dump(
cm_values, sort_keys=False, default_flow_style=False
)
custom_env_vars = "|\n" + "\n".join(
[" " * 8 + line for line in yaml_string.splitlines()]
)
is_preexisting_secret = "false"
secret_values = ksu_params.get("secret_values", "")
if secret_values:
values_secret_name = f"secret-{ksu_name}"
reference_secret_for_values = f"ref-secret-{ksu_name}"
reference_key_for_values = f"ref-key-{ksu_name}"
secret_values = yaml.safe_dump(
secret_values, sort_keys=False, default_flow_style=False
)
else:
values_secret_name = ""
reference_secret_for_values = ""
reference_key_for_values = ""
if secret_values:
secret_namespace = "osm-workflows"
# Create secret
await self.create_secret(
reference_secret_for_values,
secret_namespace,
reference_key_for_values,
secret_values,
)
# Render workflow
manifest = self.render_jinja_template(
workflow_template,
output_file=None,
workflow_name=workflow_name,
git_fleet_url=f"{self._repo_base_url}/{self._repo_user}/fleet-osm.git",
git_sw_catalogs_url=f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
templates_path=oka_path,
substitute_environment=substitute_environment,
substitution_filter=substitution_filter,
custom_env_vars=custom_env_vars,
kustomization_name=kustomization_name,
helmrelease_name=helmrelease_name,
inline_values=inline_values,
is_preexisting_secret=is_preexisting_secret,
target_ns=target_ns,
age_public_key=age_public_key,
values_secret_name=values_secret_name,
reference_secret_for_values=reference_secret_for_values,
reference_key_for_values=reference_key_for_values,
is_preexisting_cm=is_preexisting_cm,
values_configmap_name=values_configmap_name,
cm_values=cm_values,
ksu_name=ksu_name,
profile_name=profile_name,
profile_type=profile_type,
osm_project_name=osm_project_name,
workflow_debug=self._workflow_debug,
workflow_dry_run=self._workflow_dry_run,
)
self.logger.debug(f"Workflow manifest: {manifest}")
# Submit workflow
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
api_group="argoproj.io",
api_plural="workflows",
api_version="v1alpha1",
)
return workflow_name
async def delete_ksus(self, op_id, op_params_list, content_list):
self.logger.info("Delete KSU workflow Enter")
self.logger.info(
f"Operation {op_id}. Params: {op_params_list}. Content: {content_list}"
)
if len(content_list) > 1:
raise Exception("There is no ODU workflow yet able to manage multiple KSUs")
db_ksu = content_list[0]
ksu_params = op_params_list[0]
workflow_template = "launcher-delete-ksu.j2"
workflow_name = f"delete-ksus-{op_id}"
ksu_name = db_ksu["git_name"].lower()
# Additional params for the workflow
osm_project_name = "osm_admin" # TODO: get project name from db_ksu
profile_name = ksu_params.get("profile", {}).get("name")
profile_type = ksu_params.get("profile", {}).get("profile_type")
# Render workflow
manifest = self.render_jinja_template(
workflow_template,
output_file=None,
workflow_name=workflow_name,
git_fleet_url=f"{self._repo_base_url}/{self._repo_user}/fleet-osm.git",
git_sw_catalogs_url=f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
ksu_name=ksu_name,
profile_name=profile_name,
profile_type=profile_type,
osm_project_name=osm_project_name,
workflow_debug=self._workflow_debug,
workflow_dry_run=self._workflow_dry_run,
)
self.logger.debug(f"Workflow manifest: {manifest}")
# Submit workflow
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
api_group="argoproj.io",
api_plural="workflows",
api_version="v1alpha1",
)
return workflow_name
async def clone_ksu(self, op_id, op_params, content):
self.logger.info("Clone KSU workflow Enter")
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
workflow_name = f"clone-ksu-{content['_id']}"
return workflow_name
async def move_ksu(self, op_id, op_params, content):
self.logger.info("Move KSU workflow Enter")
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
workflow_name = f"move-ksu-{content['_id']}"
return workflow_name
async def check_create_ksus(self, op_id, op_params, content):
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, "OK"
async def check_update_ksus(self, op_id, op_params, content):
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, "OK"
async def check_delete_ksus(self, op_id, op_params, content):
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, "OK"
async def check_clone_ksu(self, op_id, op_params, content):
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, "OK"
async def check_move_ksu(self, op_id, op_params, content):
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, "OK"