blob: 93ba14b6464e66002d2f3271f7a724b85a74da5d [file] [log] [blame]
#######################################################################################
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
import logging
from osm_lcm.lcm_utils import LcmBase
from osm_lcm.n2vc import kubectl
from osm_lcm.odu_libs import (
vim_mgmt as odu_vim_mgmt,
cluster_mgmt as odu_cluster_mgmt,
nodegroup as odu_nodegroup,
app as odu_app,
ksu as odu_ksu,
oka as odu_oka,
profiles as odu_profiles,
workflows,
render as odu_render,
common as odu_common,
)
class OduWorkflow(LcmBase):
"""
Class to manage the workflows for the OSM Deployment Unit (ODU).
This class is responsible for executing various workflows related to
cluster management, profile management, and other operations.
"""
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
self.logger = logging.getLogger("lcm.gitops")
self.lcm_tasks = lcm_tasks
self.logger.info("Msg: {} lcm_tasks: {} ".format(msg, lcm_tasks))
# self._kubeconfig = kubeconfig # TODO: get it from config
self.gitops_config = config["gitops"]
self.logger.debug(f"Gitops Config: {self.gitops_config}")
self._odu_checkloop_retry_time = 15
self._kubeconfig = self.gitops_config.get("mgmtcluster_kubeconfig")
self._kubectl = kubectl.Kubectl(config_file=self._kubeconfig)
self._repo_base_url = self.gitops_config.get("git_base_url")
self._repo_user = self.gitops_config.get("user")
self._repo_fleet_url = self.gitops_config.get(
"fleet_repo_url", f"{self._repo_base_url}/{self._repo_user}/fleet-osm.git"
)
self._repo_sw_catalogs_url = self.gitops_config.get(
"sw_catalogs_repo_url",
f"{self._repo_base_url}/{self._repo_user}/sw-catalogs-osm.git",
)
self._pubkey = self.gitops_config["pubkey"]
self._workflow_debug = str(self.gitops_config["workflow_debug"]).lower()
self._workflow_dry_run = str(self.gitops_config["workflow_dry_run"]).lower()
self._workflows = {
"create_cluster": {
"workflow_function": self.create_cluster,
"clean_function": self.clean_items_cluster_create,
},
"update_cluster": {
"workflow_function": self.update_cluster,
"clean_function": self.clean_items_cluster_update,
},
"delete_cluster": {
"workflow_function": self.delete_cluster,
},
"register_cluster": {
"workflow_function": self.register_cluster,
"clean_function": self.clean_items_cluster_register,
},
"deregister_cluster": {
"workflow_function": self.deregister_cluster,
},
"purge_cluster": {
"workflow_function": self.purge_cluster,
"clean_function": self.clean_items_cluster_purge,
},
"create_profile": {
"workflow_function": self.create_profile,
},
"delete_profile": {
"workflow_function": self.delete_profile,
},
"attach_profile_to_cluster": {
"workflow_function": self.attach_profile_to_cluster,
},
"detach_profile_from_cluster": {
"workflow_function": self.detach_profile_from_cluster,
},
"create_oka": {
"workflow_function": self.create_oka,
"clean_function": self.clean_items_oka_create,
},
"update_oka": {
"workflow_function": self.update_oka,
"clean_function": self.clean_items_oka_update,
},
"delete_oka": {
"workflow_function": self.delete_oka,
"clean_function": self.clean_items_oka_delete,
},
"create_ksus": {
"workflow_function": self.create_ksus,
"clean_function": self.clean_items_ksu_create,
},
"update_ksus": {
"workflow_function": self.update_ksus,
"clean_function": self.clean_items_ksu_update,
},
"delete_ksus": {
"workflow_function": self.delete_ksus,
},
"clone_ksu": {
"workflow_function": self.clone_ksu,
},
"move_ksu": {
"workflow_function": self.move_ksu,
},
"create_app": {
"workflow_function": self.create_app,
"clean_function": self.clean_items_app_launch,
},
"update_app": {
"workflow_function": self.update_app,
"clean_function": self.clean_items_app_launch,
},
"delete_app": {
"workflow_function": self.delete_app,
"clean_function": self.clean_items_app_launch,
},
"create_cloud_credentials": {
"workflow_function": self.create_cloud_credentials,
"clean_function": self.clean_items_cloud_credentials_create,
},
"update_cloud_credentials": {
"workflow_function": self.update_cloud_credentials,
"clean_function": self.clean_items_cloud_credentials_update,
},
"delete_cloud_credentials": {
"workflow_function": self.delete_cloud_credentials,
},
"dummy_operation": {
"workflow_function": self.dummy_operation,
},
"add_nodegroup": {
"workflow_function": self.add_nodegroup,
"clean_function": self.clean_items_nodegroup_add,
},
"scale_nodegroup": {
"workflow_function": self.scale_nodegroup,
},
"delete_nodegroup": {
"workflow_function": self.delete_nodegroup,
"clean_function": self.clean_items_nodegroup_delete,
},
}
super().__init__(msg, self.logger)
@property
def kubeconfig(self):
return self._kubeconfig
# Imported methods
create_cloud_credentials = odu_vim_mgmt.create_cloud_credentials
update_cloud_credentials = odu_vim_mgmt.update_cloud_credentials
delete_cloud_credentials = odu_vim_mgmt.delete_cloud_credentials
clean_items_cloud_credentials_create = (
odu_vim_mgmt.clean_items_cloud_credentials_create
)
clean_items_cloud_credentials_update = (
odu_vim_mgmt.clean_items_cloud_credentials_update
)
create_cluster = odu_cluster_mgmt.create_cluster
update_cluster = odu_cluster_mgmt.update_cluster
delete_cluster = odu_cluster_mgmt.delete_cluster
register_cluster = odu_cluster_mgmt.register_cluster
deregister_cluster = odu_cluster_mgmt.deregister_cluster
purge_cluster = odu_cluster_mgmt.purge_cluster
clean_items_cluster_create = odu_cluster_mgmt.clean_items_cluster_create
clean_items_cluster_update = odu_cluster_mgmt.clean_items_cluster_update
clean_items_cluster_register = odu_cluster_mgmt.clean_items_cluster_register
clean_items_cluster_purge = odu_cluster_mgmt.clean_items_cluster_purge
get_cluster_credentials = odu_cluster_mgmt.get_cluster_credentials
add_nodegroup = odu_nodegroup.add_nodegroup
scale_nodegroup = odu_nodegroup.scale_nodegroup
delete_nodegroup = odu_nodegroup.delete_nodegroup
clean_items_nodegroup_add = odu_nodegroup.clean_items_nodegroup_add
clean_items_nodegroup_delete = odu_nodegroup.clean_items_nodegroup_delete
create_ksus = odu_ksu.create_ksus
update_ksus = odu_ksu.update_ksus
delete_ksus = odu_ksu.delete_ksus
clone_ksu = odu_ksu.clone_ksu
move_ksu = odu_ksu.move_ksu
clean_items_ksu_create = odu_ksu.clean_items_ksu_create
clean_items_ksu_update = odu_ksu.clean_items_ksu_update
clean_items_ksu_delete = odu_ksu.clean_items_ksu_delete
create_oka = odu_oka.create_oka
update_oka = odu_oka.update_oka
delete_oka = odu_oka.delete_oka
clean_items_oka_create = odu_oka.clean_items_oka_create
clean_items_oka_update = odu_oka.clean_items_oka_update
clean_items_oka_delete = odu_oka.clean_items_oka_delete
create_profile = odu_profiles.create_profile
delete_profile = odu_profiles.delete_profile
attach_profile_to_cluster = odu_profiles.attach_profile_to_cluster
detach_profile_from_cluster = odu_profiles.detach_profile_from_cluster
check_workflow_status = workflows.check_workflow_status
readiness_loop = workflows.readiness_loop
render_jinja_template = odu_render.render_jinja_template
render_yaml_template = odu_render.render_yaml_template
create_secret = odu_common.create_secret
delete_secret = odu_common.delete_secret
create_configmap = odu_common.create_configmap
delete_configmap = odu_common.delete_configmap
create_app = odu_app.create_app
update_app = odu_app.update_app
delete_app = odu_app.delete_app
launch_app = odu_app.launch_app
clean_items_app_launch = odu_app.clean_items_app_launch
async def launch_workflow(self, key, op_id, op_params, content):
self.logger.info(
f"Workflow is getting into launch. Key: {key}. Operation: {op_id}"
)
# self.logger.debug(f"Operation Params: {op_params}")
# self.logger.debug(f"Content: {content}")
workflow_function = self._workflows[key]["workflow_function"]
self.logger.info("workflow function : {}".format(workflow_function))
try:
result, workflow_name, workflow_resources = await workflow_function(
op_id, op_params, content
)
return result, workflow_name, workflow_resources
except Exception as e:
self.logger.error(f"Error launching workflow: {e}")
return False, str(e), None
async def dummy_clean_items(self, op_id, op_params, content):
self.logger.info(
f"dummy_clean_items Enter. Operation {op_id}. Params: {op_params}"
)
self.logger.debug(f"Content: {content}")
return True, "OK"
async def clean_items_workflow(self, key, op_id, op_params, content):
self.logger.info(
f"Cleaning items created during workflow launch. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
)
clean_items_function = self._workflows[key].get(
"clean_function", self.dummy_clean_items
)
self.logger.info("clean items function : {}".format(clean_items_function))
return await clean_items_function(op_id, op_params, content)
async def dummy_operation(self, op_id, op_params, content):
self.logger.info("Empty operation status Enter")
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return True, content["workflow_name"], None
async def clean_items(self, items):
# Delete pods
for pod in items.get("pods", []):
name = pod["name"]
namespace = pod["namespace"]
self.logger.info(f"Deleting pod {name} in namespace {namespace}")
self.logger.debug(f"Testing kubectl: {self._kubectl}")
self.logger.debug(
f"Testing kubectl configuration: {self._kubectl.configuration}"
)
self.logger.debug(
f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
)
await self._kubectl.delete_pod(name, namespace)
# Delete secrets
for secret in items.get("secrets", []):
name = secret["name"]
namespace = secret["namespace"]
self.logger.info(f"Deleting secret {name} in namespace {namespace}")
self.logger.debug(f"Testing kubectl: {self._kubectl}")
self.logger.debug(
f"Testing kubectl configuration: {self._kubectl.configuration}"
)
self.logger.debug(
f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
)
self.delete_secret(name, namespace)
# Delete pvcs
for pvc in items.get("pvcs", []):
name = pvc["name"]
namespace = pvc["namespace"]
self.logger.info(f"Deleting pvc {name} in namespace {namespace}")
self.logger.debug(f"Testing kubectl: {self._kubectl}")
self.logger.debug(
f"Testing kubectl configuration: {self._kubectl.configuration}"
)
self.logger.debug(
f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
)
await self._kubectl.delete_pvc(name, namespace)
# Delete configmaps
for configmap in items.get("configmaps", []):
name = configmap["name"]
namespace = configmap["namespace"]
self.logger.info(f"Deleting configmap {name} in namespace {namespace}")
self.logger.debug(f"Testing kubectl: {self._kubectl}")
self.logger.debug(
f"Testing kubectl configuration: {self._kubectl.configuration}"
)
self.logger.debug(
f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
)
self.delete_configmap(name, namespace)
async def list_object(self, api_group, api_plural, api_version):
self.logger.info(
f"Api group: {api_group} Api plural: {api_plural} Api version: {api_version}"
)
generic_object = await self._kubectl.list_generic_object(
api_group=api_group,
api_plural=api_plural,
api_version=api_version,
namespace="",
)
return generic_object