import logging
from time import time
+import traceback
from osm_lcm.lcm_utils import LcmBase
from copy import deepcopy
from osm_lcm import odu_workflows
from osm_lcm import vim_sdn
-class ClusterLcm(LcmBase):
+class GitOpsLcm(LcmBase):
+ def __init__(self, msg, lcm_tasks, config):
+ self.logger = logging.getLogger("lcm.gitops")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+ self._checkloop_kustomization_timeout = 900
+ self._checkloop_resource_timeout = 900
+ self._workflows = {}
+ super().__init__(msg, self.logger)
+
+ async def check_dummy_operation(self, op_id, op_params, content):
+ self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+ return True, "OK"
+
+ async def common_check_list(self, checkings_list):
+ try:
+ for checking in checkings_list:
+ if checking["enable"]:
+ status, message = await self.odu.readiness_loop(
+ item=checking["item"],
+ name=checking["name"],
+ namespace=checking["namespace"],
+ flag=checking["flag"],
+ timeout=checking["timeout"],
+ )
+ if not status:
+ return status, message
+ except Exception as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.debug(f"Exception: {e}", exc_info=True)
+ return False, f"Unexpected exception: {e}"
+ return True, "OK"
+
+ async def check_resource_status(self, key, op_id, op_params, content):
+ self.logger.info(
+ f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
+ )
+ check_resource_function = self._workflows.get(key, {}).get(
+ "check_resource_function"
+ )
+ self.logger.info("check_resource function : {}".format(check_resource_function))
+ if check_resource_function:
+ return await check_resource_function(op_id, op_params, content)
+ else:
+ return await self.check_dummy_operation(op_id, op_params, content)
+
+
+class ClusterLcm(GitOpsLcm):
db_collection = "clusters"
def __init__(self, msg, lcm_tasks, config):
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+ super().__init__(msg, lcm_tasks, config)
+ self._workflows = {
+ "create_cluster": {
+ "check_resource_function": self.check_create_cluster,
+ },
+ "deregister_cluster": {
+ "check_resource_function": self.check_deregister_cluster,
+ },
+ }
self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
- super().__init__(msg, self.logger)
-
async def create(self, op_id, op_params, content):
self.logger.info("cluster Create Enter")
db_cluster = content["cluster"]
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_cluster", op_id, op_params, content
)
self.logger.info(
self.update_profile_state(db_cluster, workflow_status, resource_status)
return
+ async def check_create_cluster(self, op_id, op_params, content):
+ self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+ db_cluster = content["cluster"]
+ cluster_name = db_cluster["git_name"].lower()
+ cluster_kustomization_name = cluster_name
+ db_vim_account = content["vim_account"]
+ cloud_type = db_vim_account["vim_type"]
+ nodepool_name = ""
+ if cloud_type == "aws":
+ nodepool_name = f"{cluster_name}-nodegroup"
+ cluster_name = f"{cluster_name}-cluster"
+ elif cloud_type == "gcp":
+ nodepool_name = f"nodepool-{cluster_name}"
+ bootstrap = op_params.get("bootstrap", True)
+ if cloud_type in ("azure", "gcp", "aws"):
+ checkings_list = [
+ {
+ "item": "kustomization",
+ "name": cluster_kustomization_name,
+ "namespace": "managed-resources",
+ "flag": "Ready",
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": True,
+ },
+ {
+ "item": f"cluster_{cloud_type}",
+ "name": cluster_name,
+ "namespace": "",
+ "flag": "Synced",
+ "timeout": self._checkloop_resource_timeout,
+ "enable": True,
+ },
+ {
+ "item": f"cluster_{cloud_type}",
+ "name": cluster_name,
+ "namespace": "",
+ "flag": "Ready",
+ "timeout": self._checkloop_resource_timeout,
+ "enable": True,
+ },
+ {
+ "item": "kustomization",
+ "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
+ "namespace": "managed-resources",
+ "flag": "Ready",
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": bootstrap,
+ },
+ ]
+ else:
+ return False, "Not suitable VIM account to check cluster status"
+ if nodepool_name:
+ nodepool_check = {
+ "item": f"nodepool_{cloud_type}",
+ "name": nodepool_name,
+ "namespace": "",
+ "flag": "Ready",
+ "timeout": self._checkloop_resource_timeout,
+ "enable": True,
+ }
+ checkings_list.insert(3, nodepool_check)
+ return await self.common_check_list(checkings_list)
+
def update_profile_state(self, db_cluster, workflow_status, resource_status):
profiles = [
"infra_controller_profiles",
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_cluster", op_id, op_params, content
)
self.logger.info(
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"attach_profile_to_cluster", op_id, op_params, content
)
self.logger.info(
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"detach_profile_from_cluster", op_id, op_params, content
)
self.logger.info(
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"register_cluster", op_id, op_params, content
)
self.logger.info(
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"deregister_cluster", op_id, op_params, content
)
self.logger.info(
self.db.del_one("clusters", {"_id": db_cluster["_id"]})
return
+ async def check_deregister_cluster(self, op_id, op_params, content):
+ self.logger.info("check_deregister_cluster Enter")
+ self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
+ # Clean secrets
+ self.logger.info("Cleaning kubeconfig")
+ cluster_name = content["cluster"]["git_name"].lower()
+ items = {
+ "secrets": [
+ {
+ "name": f"kubeconfig-{cluster_name}",
+ "namespace": "managed-resources",
+ },
+ ]
+ }
+
+ try:
+ await self.odu.clean_items(items)
+ except Exception as e:
+ return False, f"Error while cleaning items: {e}"
+ return True, "OK"
+
async def get_creds(self, op_id, db_cluster):
self.logger.info("Cluster get creds Enter")
result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster)
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"update_cluster", op_id, op_params, content
)
self.logger.info(
return
-class CloudCredentialsLcm(LcmBase):
+class CloudCredentialsLcm(GitOpsLcm):
db_collection = "vim_accounts"
def __init__(self, msg, lcm_tasks, config):
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-
- super().__init__(msg, self.logger)
+ super().__init__(msg, lcm_tasks, config)
async def add(self, op_id, op_params, content):
self.logger.info("Cloud Credentials create")
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_cloud_credentials", op_id, op_params, content
)
self.logger.info(
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"update_cloud_credentials", op_id, op_params, content
)
self.logger.info(
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_cloud_credentials", op_id, op_params, content
)
self.logger.info(
return
-class K8sAppLcm(LcmBase):
+class K8sAppLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-
- super().__init__(msg, self.logger)
+ super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("App Create Enter")
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
return
-class K8sResourceLcm(LcmBase):
+class K8sResourceLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-
- super().__init__(msg, self.logger)
+ super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("Resource Create Enter")
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
return
-class K8sInfraControllerLcm(LcmBase):
+class K8sInfraControllerLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-
- super().__init__(msg, self.logger)
+ super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("Infra controller Create Enter")
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
return
-class K8sInfraConfigLcm(LcmBase):
+class K8sInfraConfigLcm(GitOpsLcm):
def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-
- super().__init__(msg, self.logger)
+ super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("Infra config Create Enter")
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_profile", op_id, op_params, content
)
self.logger.info(
content = self.update_operation_history(content, workflow_status, None)
self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_profile", op_id, op_params, content
)
self.logger.info(
return
-class OkaLcm(LcmBase):
+class OkaLcm(GitOpsLcm):
db_collection = "okas"
def __init__(self, msg, lcm_tasks, config):
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-
- super().__init__(msg, self.logger)
+ super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("OKA Create Enter")
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_oka", op_id, op_params, db_content
)
self.logger.info(
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"update_oka", op_id, op_params, db_content
)
self.logger.info(
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_oka", op_id, op_params, db_content
)
self.logger.info(
return
-class KsuLcm(LcmBase):
+class KsuLcm(GitOpsLcm):
db_collection = "ksus"
def __init__(self, msg, lcm_tasks, config):
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
-
- self.logger = logging.getLogger("lcm.gitops")
- self.lcm_tasks = lcm_tasks
- self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
-
- super().__init__(msg, self.logger)
+ super().__init__(msg, lcm_tasks, config)
async def create(self, op_id, op_params, content):
self.logger.info("ksu Create Enter")
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"create_ksus", op_id, op_params, content
)
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"update_ksus", op_id, op_params, content
)
self.logger.info(
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"delete_ksus", op_id, op_params, content
)
self.logger.info(
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"clone_ksus", op_id, op_params, db_content
)
self.logger.info(
self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content)
if workflow_status:
- resource_status, resource_msg = await self.odu.check_resource_status(
+ resource_status, resource_msg = await self.check_resource_status(
"move_ksus", op_id, op_params, db_content
)
self.logger.info(
# self._kubeconfig = kubeconfig # TODO: get it from config
self.gitops_config = config["gitops"]
self.logger.debug(f"Config: {self.gitops_config}")
- self._kubeconfig = self.gitops_config["mgmtcluster_kubeconfig"]
- self._odu_checkloop_kustomization_timeout = 900
- self._odu_checkloop_resource_timeout = 900
self._odu_checkloop_retry_time = 15
+ self._kubeconfig = self.gitops_config["mgmtcluster_kubeconfig"]
self._kubectl = kubectl.Kubectl(config_file=self._kubeconfig)
self._repo_base_url = self.gitops_config["git_base_url"]
self._repo_user = self.gitops_config["user"]
"create_cluster": {
"workflow_function": self.create_cluster,
"clean_function": self.clean_items_cluster_create,
- "check_resource_function": self.check_create_cluster,
},
"update_cluster": {
"workflow_function": self.update_cluster,
"clean_function": self.clean_items_cluster_update,
- "check_resource_function": self.check_update_cluster,
},
"delete_cluster": {
"workflow_function": self.delete_cluster,
- "check_resource_function": self.check_delete_cluster,
},
"register_cluster": {
"workflow_function": self.register_cluster,
"clean_function": self.clean_items_cluster_register,
- "check_resource_function": self.check_register_cluster,
},
"deregister_cluster": {
"workflow_function": self.deregister_cluster,
- "check_resource_function": self.check_deregister_cluster,
},
"create_profile": {
"workflow_function": self.create_profile,
- "check_resource_function": self.check_create_profile,
},
"delete_profile": {
"workflow_function": self.delete_profile,
- "check_resource_function": self.check_delete_profile,
},
"attach_profile_to_cluster": {
"workflow_function": self.attach_profile_to_cluster,
- "check_resource_function": self.check_attach_profile_to_cluster,
},
"detach_profile_from_cluster": {
"workflow_function": self.detach_profile_from_cluster,
- "check_resource_function": self.check_detach_profile_from_cluster,
},
"create_oka": {
"workflow_function": self.create_oka,
- "check_resource_function": self.check_create_oka,
},
"update_oka": {
"workflow_function": self.update_oka,
- "check_resource_function": self.check_update_oka,
},
"delete_oka": {
"workflow_function": self.delete_oka,
- "check_resource_function": self.check_delete_oka,
},
"create_ksus": {
"workflow_function": self.create_ksus,
"clean_function": self.clean_items_ksu_create,
- "check_resource_function": self.check_create_ksus,
},
"update_ksus": {
"workflow_function": self.update_ksus,
"clean_function": self.clean_items_ksu_update,
- "check_resource_function": self.check_update_ksus,
},
"delete_ksus": {
"workflow_function": self.delete_ksus,
- "check_resource_function": self.check_delete_ksus,
},
"clone_ksu": {
"workflow_function": self.clone_ksu,
- "check_resource_function": self.check_clone_ksu,
},
"move_ksu": {
"workflow_function": self.move_ksu,
- "check_resource_function": self.check_move_ksu,
},
"create_cloud_credentials": {
"workflow_function": self.create_cloud_credentials,
"clean_function": self.clean_items_cloud_credentials_create,
- "check_resource_function": self.check_create_cloud_credentials,
},
"update_cloud_credentials": {
"workflow_function": self.update_cloud_credentials,
"clean_function": self.clean_items_cloud_credentials_update,
- "check_resource_function": self.check_update_cloud_credentials,
},
"delete_cloud_credentials": {
"workflow_function": self.delete_cloud_credentials,
- "check_resource_function": self.check_delete_cloud_credentials,
},
"dummy_operation": {
"workflow_function": self.dummy_operation,
- "check_resource_function": self.check_dummy_operation,
},
}
delete_cloud_credentials,
clean_items_cloud_credentials_create,
clean_items_cloud_credentials_update,
- check_create_cloud_credentials,
- check_update_cloud_credentials,
- check_delete_cloud_credentials,
)
from osm_lcm.odu_libs.cluster_mgmt import (
create_cluster,
clean_items_cluster_create,
clean_items_cluster_update,
clean_items_cluster_register,
- check_create_cluster,
- check_update_cluster,
- check_delete_cluster,
- check_register_cluster,
- check_deregister_cluster,
get_cluster_credentials,
)
from osm_lcm.odu_libs.ksu import (
move_ksu,
clean_items_ksu_create,
clean_items_ksu_update,
- check_create_ksus,
- check_update_ksus,
- check_delete_ksus,
- check_clone_ksu,
- check_move_ksu,
)
from osm_lcm.odu_libs.oka import (
create_oka,
update_oka,
delete_oka,
- check_create_oka,
- check_update_oka,
- check_delete_oka,
)
from osm_lcm.odu_libs.profiles import (
create_profile,
delete_profile,
attach_profile_to_cluster,
detach_profile_from_cluster,
- check_create_profile,
- check_delete_profile,
- check_attach_profile_to_cluster,
- check_detach_profile_from_cluster,
)
from osm_lcm.odu_libs.workflows import (
check_workflow_status,
readiness_loop,
- common_check_list,
)
from osm_lcm.odu_libs.render import (
render_jinja_template,
self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
return content["workflow_name"]
- async def check_resource_status(self, key, op_id, op_params, content):
- self.logger.info(
- f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
- )
- check_resource_function = self._workflows[key]["check_resource_function"]
- self.logger.info("check_resource function : {}".format(check_resource_function))
- return await check_resource_function(op_id, op_params, content)
-
- async def check_dummy_operation(self, op_id, op_params, content):
- self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}")
- return True, "OK"
-
async def clean_items(self, items):
# Delete secrets
for secret in items.get("secrets", []):