blob: b23fb2a776e9ad83786f5f57b08a7f5bac126e71 [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
import yaml
import tempfile
import os
MAP_PROFILE = {
"infra_controller_profiles": "infra-controller-profiles",
"infra_config_profiles": "infra-config-profiles",
"resource_profiles": "managed-resources",
"app_profiles": "app-profiles",
}
def merge_model(base, override):
"""Recursively merge override dictionary into base dictionary."""
merge_model = base.copy()
for k, v in override.get("spec", {}).items():
if k != "ksus":
merge_model["spec"][k] = v
for ksu_override in override.get("spec", {}).get("ksus", []):
for ksu_base in merge_model.get("spec", {}).get("ksus", []):
if ksu_base.get("name") == ksu_override.get("name"):
for k, v in ksu_override.items():
if k != "patterns":
ksu_base[k] = v
continue
for pattern_override in ksu_override.get("patterns", []):
for pattern_base in ksu_base.get("patterns", []):
if pattern_base.get("name") == pattern_override.get("name"):
for kp, vp in pattern_override.items():
if kp != "bricks":
pattern_base[kp] = vp
continue
for brick_override in pattern_override.get(
"bricks", []
):
for brick_base in pattern_base.get(
"bricks", []
):
if brick_base.get(
"name"
) == brick_override.get("name"):
for kb, vb in brick_override.items():
if kb != "hrset-values":
brick_base[kb] = vb
continue
for (
hrset_override
) in brick_override.get(
"hrset-values", []
):
for (
hrset_base
) in brick_base.get(
"hrset-values", []
):
if hrset_base.get(
"name"
) == hrset_override.get(
"name"
):
hrset_base |= (
hrset_override
)
break
else:
brick_base[
"hrset-values"
].append(hrset_override)
break
else:
pattern_base["bricks"].append(
brick_override
)
break
else:
ksu_base["patterns"].append(pattern_override)
break
else:
merge_model["spec"]["ksus"].append(ksu_override)
return merge_model
async def launch_app(self, op_id, op_params, workflow_content, operation_type):
self.logger.info(
f"launch_app Enter. Operation {op_id}. Operation Type: {operation_type}"
)
# self.logger.debug(f"Operation Params: {op_params}")
# self.logger.debug(f"Content: {workflow_content}")
db_app = workflow_content["app"]
db_profile = workflow_content.get("profile")
profile_t = db_app.get("profile_type")
profile_type = MAP_PROFILE[profile_t]
profile_name = db_profile.get("git_name").lower()
app_name = db_app["git_name"].lower()
app_command = f"app {operation_type} $environment"
age_public_key = db_profile.get("age_pubkey")
sw_catalog_model = workflow_content.get("model")
self.logger.debug(f"SW catalog model: {sw_catalog_model}")
# Update the app model, extending it also with the model from op_params
if operation_type == "update":
model = op_params.get("model", db_app.get("app_model", {}))
else:
model = op_params.get("model", {})
app_model = merge_model(sw_catalog_model, model)
app_model["kind"] = "AppInstantiation"
app_model["metadata"]["name"] = app_name
for ksu in app_model.get("spec", {}).get("ksus", []):
for pattern in ksu.get("patterns", []):
for brick in pattern.get("bricks", []):
brick["public-age-key"] = age_public_key
self.logger.debug(f"App model: {app_model}")
if operation_type == "update":
params = op_params.get("params", db_app.get("params", {}))
else:
params = op_params.get("params", {})
params["PROFILE_TYPE"] = profile_type
params["PROFILE_NAME"] = profile_name
params["APPNAME"] = app_name
self.logger.debug(f"Params: {params}")
if operation_type == "update":
secret_params = op_params.get("secret_params", db_app.get("secret_params", {}))
else:
secret_params = op_params.get("secret_params", {})
self.logger.debug(f"Secret Params: {secret_params}")
# Create temporary folder for the app model and the parameters
temp_dir = tempfile.mkdtemp(prefix=f"app-{operation_type}-{op_id}-")
self.logger.debug(f"Temporary dir created: {temp_dir}")
with open(f"{temp_dir}/app_instance_model.yaml", "w") as f:
yaml.safe_dump(
app_model, f, indent=2, default_flow_style=False, sort_keys=False
)
os.makedirs(f"{temp_dir}/parameters/clear", exist_ok=True)
with open(f"{temp_dir}/parameters/clear/environment.yaml", "w") as f:
yaml.safe_dump(params, f, indent=2, default_flow_style=False, sort_keys=False)
# Create PVC and copy app model and parameters to PVC
app_model_pvc = f"temp-pvc-app-{op_id}"
src_files = [
f"{temp_dir}/app_instance_model.yaml",
f"{temp_dir}/parameters/clear/environment.yaml",
]
dest_files = [
"app_instance_model.yaml",
"parameters/clear/environment.yaml",
]
self.logger.debug(
f"Copying files to PVC {app_model_pvc}: {src_files} -> {dest_files}"
)
await self._kubectl.create_pvc_with_content(
name=app_model_pvc,
namespace="osm-workflows",
src_files=src_files,
dest_files=dest_files,
)
# Create secret with secret_params
secret_name = f"secret-app-{op_id}"
secret_namespace = "osm-workflows"
secret_key = "environment.yaml"
secret_value = yaml.safe_dump(
secret_params, indent=2, default_flow_style=False, sort_keys=False
)
try:
self.logger.debug(f"Testing kubectl: {self._kubectl}")
self.logger.debug(
f"Testing kubectl configuration: {self._kubectl.configuration}"
)
self.logger.debug(
f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
)
self.logger.debug(
f"Creating secret {secret_name} in namespace {secret_namespace}"
)
await self.create_secret(
secret_name,
secret_namespace,
secret_key,
secret_value,
)
except Exception as e:
self.logger.info(
f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}"
)
return (
False,
f"Cannot create secret {secret_name} in namespace {secret_namespace}: {e}",
)
# Create workflow to launch the app
workflow_template = "launcher-app.j2"
workflow_name = f"{operation_type}-app-{op_id}"
# Additional params for the workflow
osm_project_name = workflow_content.get("project_name", "osm_admin")
# Render workflow
manifest = self.render_jinja_template(
workflow_template,
output_file=None,
workflow_name=workflow_name,
app_command=app_command,
app_model_pvc=app_model_pvc,
app_secret_name=secret_name,
git_fleet_url=self._repo_fleet_url,
git_sw_catalogs_url=self._repo_sw_catalogs_url,
app_name=app_name,
profile_name=profile_name,
profile_type=profile_type,
osm_project_name=osm_project_name,
workflow_debug=self._workflow_debug,
workflow_dry_run=self._workflow_dry_run,
)
self.logger.debug(f"Workflow manifest: {manifest}")
# Submit workflow
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
api_group="argoproj.io",
api_plural="workflows",
api_version="v1alpha1",
)
workflow_resources = {
"app_model": app_model,
"secret_params": secret_params,
"params": params,
}
return True, workflow_name, workflow_resources
async def create_app(self, op_id, op_params, content):
self.logger.info(f"create_app Enter. Operation {op_id}")
# self.logger.debug(f"Operation Params: {op_params}")
# self.logger.debug(f"Content: {workflow_content}")
return await self.launch_app(op_id, op_params, content, "create")
async def update_app(self, op_id, op_params, content):
self.logger.info(f"update_app Enter. Operation {op_id}")
# self.logger.debug(f"Operation Params: {op_params}")
# self.logger.debug(f"Content: {workflow_content}")
return await self.launch_app(op_id, op_params, content, "update")
async def delete_app(self, op_id, op_params, content):
self.logger.info(f"delete_app Enter. Operation {op_id}")
# self.logger.debug(f"Operation Params: {op_params}")
# self.logger.debug(f"Content: {workflow_content}")
return await self.launch_app(op_id, op_params, content, "delete")
async def clean_items_app_launch(self, op_id, op_params, workflow_content):
self.logger.info(f"clean_items_app_launch Enter. Operation {op_id}")
# self.logger.debug(f"Operation Params: {op_params}")
# self.logger.debug(f"Content: {workflow_content}")
try:
secret_name = f"secret-app-{op_id}"
volume_name = f"temp-pvc-app-{op_id}"
items = {
"secrets": [
{
"name": secret_name,
"namespace": "osm-workflows",
}
],
"pvcs": [
{
"name": volume_name,
"namespace": "osm-workflows",
}
],
}
await self.clean_items(items)
return True, "OK"
except Exception as e:
return False, f"Error while cleaning items: {e}"