from osm_lcm import odu_workflows
from osm_lcm import vim_sdn
from osm_lcm.data_utils.list_utils import find_in_list
+from osm_lcm.n2vc.kubectl import Kubectl
+import yaml
MAP_PROFILE = {
"infra_controller_profiles": "infra-controllers",
db_content["current_operation"] = None
return resource_status, db_content
- async def common_check_list(self, op_id, checkings_list, db_collection, db_item):
+ async def common_check_list(
+ self, op_id, checkings_list, db_collection, db_item, kubectl=None
+ ):
try:
for checking in checkings_list:
if checking["enable"]:
item=checking["item"],
name=checking["name"],
namespace=checking["namespace"],
- flag=checking["flag"],
+ flag=checking.get("flag"),
+ deleted=checking.get("deleted", False),
timeout=checking["timeout"],
+ kubectl=kubectl,
)
if not status:
error_message = "Resources not ready: "
async def check_resource_status(self, key, op_id, op_params, content):
self.logger.info(
- f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}"
+ f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}."
)
+ self.logger.debug(f"Check resource status. Content: {content}")
check_resource_function = self._workflows.get(key, {}).get(
"check_resource_function"
)
)
return content_copy
+ def cluster_kubectl(self, db_cluster):
+ cluster_kubeconfig = db_cluster["credentials"]
+ kubeconfig_path = f"/tmp/{db_cluster['_id']}_kubeconfig.yaml"
+ with open(kubeconfig_path, "w") as kubeconfig_file:
+ yaml.safe_dump(cluster_kubeconfig, kubeconfig_file)
+ return Kubectl(config_file=kubeconfig_path)
+
class ClusterLcm(GitOpsLcm):
db_collection = "clusters"
"update_cluster": {
"check_resource_function": self.check_update_cluster,
},
+ "delete_cluster": {
+ "check_resource_function": self.check_delete_cluster,
+ },
}
self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
op_id, checkings_list, "clusters", db_cluster
)
- async def check_register_cluster(self, op_id, op_params, content):
- self.logger.info(
- f"check_register_cluster Operation {op_id}. Params: {op_params}."
- )
- # self.logger.debug(f"Content: {content}")
- db_cluster = content["cluster"]
- cluster_name = db_cluster["git_name"].lower()
- cluster_kustomization_name = cluster_name
- bootstrap = op_params.get("bootstrap", True)
- checkings_list = [
- {
- "item": "kustomization",
- "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
- "namespace": "managed-resources",
- "flag": "Ready",
- "timeout": self._checkloop_kustomization_timeout,
- "enable": bootstrap,
- "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
- },
- ]
- return await self.common_check_list(
- op_id, checkings_list, "clusters", db_cluster
- )
-
- async def check_update_cluster(self, op_id, op_params, content):
- self.logger.info(
- f"check_create_cluster Operation {op_id}. Params: {op_params}."
- )
- # self.logger.debug(f"Content: {content}")
- db_cluster = content["cluster"]
- cluster_name = db_cluster["git_name"].lower()
- cluster_kustomization_name = cluster_name
- db_vim_account = content["vim_account"]
- cloud_type = db_vim_account["vim_type"]
- nodepool_name = ""
- if cloud_type == "aws":
- nodepool_name = f"{cluster_name}-nodegroup"
- cluster_name = f"{cluster_name}-cluster"
- elif cloud_type == "gcp":
- nodepool_name = f"nodepool-{cluster_name}"
- if cloud_type in ("azure", "gcp", "aws"):
- checkings_list = [
- {
- "item": "kustomization",
- "name": cluster_kustomization_name,
- "namespace": "managed-resources",
- "flag": "Ready",
- "timeout": self._checkloop_kustomization_timeout,
- "enable": True,
- "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
- },
- {
- "item": f"cluster_{cloud_type}",
- "name": cluster_name,
- "namespace": "",
- "flag": "Synced",
- "timeout": self._checkloop_resource_timeout,
- "enable": True,
- "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.CLUSTER",
- },
- {
- "item": f"cluster_{cloud_type}",
- "name": cluster_name,
- "namespace": "",
- "flag": "Ready",
- "timeout": self._checkloop_resource_timeout,
- "enable": True,
- "resourceState": "IN_PROGRESS.RESOURCE_READY.CLUSTER",
- },
- ]
- else:
- return False, "Not suitable VIM account to check cluster status"
- if nodepool_name:
- nodepool_check = {
- "item": f"nodepool_{cloud_type}",
- "name": nodepool_name,
- "namespace": "",
- "flag": "Ready",
- "timeout": self._checkloop_resource_timeout,
- "enable": True,
- "resourceState": "IN_PROGRESS.RESOURCE_READY.NODEPOOL",
- }
- checkings_list.append(nodepool_check)
- return await self.common_check_list(
- op_id, checkings_list, "clusters", db_cluster
- )
-
def update_profile_state(self, db_cluster, workflow_status, resource_status):
profiles = [
"infra_controller_profiles",
"cluster": self.decrypted_copy(db_cluster),
}
+ # To get the vim account details
+ db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]})
+ workflow_content["vim_account"] = db_vim
+
# TODO: workaround until NBI rejects cluster deletion requests for registered clusters
# This if clause will be removed
if db_cluster["created"] == "false":
return
+ async def check_delete_cluster(self, op_id, op_params, content):
+ self.logger.info(
+ f"check_delete_cluster Operation {op_id}. Params: {op_params}."
+ )
+ self.logger.debug(f"Content: {content}")
+ db_cluster = content["cluster"]
+ cluster_name = db_cluster["git_name"].lower()
+ cluster_kustomization_name = cluster_name
+ db_vim_account = content["vim_account"]
+ cloud_type = db_vim_account["vim_type"]
+ if cloud_type == "aws":
+ cluster_name = f"{cluster_name}-cluster"
+ if cloud_type in ("azure", "gcp", "aws"):
+ checkings_list = [
+ {
+ "item": "kustomization",
+ "name": cluster_kustomization_name,
+ "namespace": "managed-resources",
+ "deleted": True,
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": True,
+ "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
+ },
+ {
+ "item": f"cluster_{cloud_type}",
+ "name": cluster_name,
+ "namespace": "",
+ "deleted": True,
+ "timeout": self._checkloop_resource_timeout,
+ "enable": True,
+ "resourceState": "IN_PROGRESS.RESOURCE_DELETED.CLUSTER",
+ },
+ ]
+ else:
+ return False, "Not suitable VIM account to check cluster status"
+ return await self.common_check_list(
+ op_id, checkings_list, "clusters", db_cluster
+ )
+
def delete_cluster(self, db_cluster):
# Actually, item_content is equal to db_cluster
# item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]})
return
+ async def check_register_cluster(self, op_id, op_params, content):
+ self.logger.info(
+ f"check_register_cluster Operation {op_id}. Params: {op_params}."
+ )
+ # self.logger.debug(f"Content: {content}")
+ db_cluster = content["cluster"]
+ cluster_name = db_cluster["git_name"].lower()
+ cluster_kustomization_name = cluster_name
+ bootstrap = op_params.get("bootstrap", True)
+ checkings_list = [
+ {
+ "item": "kustomization",
+ "name": f"{cluster_kustomization_name}-bstrp-fluxctrl",
+ "namespace": "managed-resources",
+ "flag": "Ready",
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": bootstrap,
+ "resourceState": "IN_PROGRESS.BOOTSTRAP_OK",
+ },
+ ]
+ return await self.common_check_list(
+ op_id, checkings_list, "clusters", db_cluster
+ )
+
async def deregister(self, params, order_id):
self.logger.info("cluster deregister enter")
self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
return
+ async def check_update_cluster(self, op_id, op_params, content):
+ self.logger.info(
+ f"check_update_cluster Operation {op_id}. Params: {op_params}."
+ )
+ self.logger.debug(f"Content: {content}")
+ return await self.check_dummy_operation(op_id, op_params, content)
+ # db_cluster = content["cluster"]
+ # cluster_id = db_cluster["_id"]
+ # cluster_kubectl = self.cluster_kubectl(db_cluster)
+ # cluster_name = db_cluster["git_name"].lower()
+ # cluster_kustomization_name = cluster_name
+ # checkings_list = [
+ # {
+ # "item": "kustomization",
+ # "name": cluster_kustomization_name,
+ # "namespace": "managed-resources",
+ # "flag": "Ready",
+ # "timeout": self._checkloop_kustomization_timeout,
+ # "enable": True,
+ # "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
+ # },
+ # ]
+ # return await self.common_check_list(
+ # op_id, checkings_list, "clusters", db_cluster, cluster_kubectl
+ # )
+
class CloudCredentialsLcm(GitOpsLcm):
db_collection = "vim_accounts"
:return: None
"""
super().__init__(msg, lcm_tasks, config)
+ self._workflows = {
+ "create_ksus": {
+ "check_resource_function": self.check_create_ksus,
+ },
+ "delete_ksus": {
+ "check_resource_function": self.check_delete_ksus,
+ },
+ }
+
+ def get_dbclusters_from_profile(self, profile_id, profile_type):
+ cluster_list = []
+ db_clusters = self.db.get_list("clusters")
+ self.logger.info(f"Getting list of clusters for {profile_type} {profile_id}")
+ for db_cluster in db_clusters:
+ if profile_id in db_cluster.get(profile_type, []):
+ self.logger.info(
+ f"Profile {profile_id} found in cluster {db_cluster['name']}"
+ )
+ cluster_list.append(db_cluster)
+ return cluster_list
async def create(self, params, order_id):
self.logger.info("ksu Create Enter")
] = f"{oka_type}/{db_oka['git_name'].lower()}/templates"
op_params.append(ksu_params)
+ # A single workflow is launched for all KSUs
_, workflow_name = await self.odu.launch_workflow(
"create_ksus", op_id, op_params, db_content
)
+ # Update workflow status in all KSUs
+ wf_status_list = []
for db_ksu, ksu_params in zip(db_content, op_params):
workflow_status = await self.check_workflow_and_update_db(
op_id, workflow_name, db_ksu
)
-
- if workflow_status:
+ wf_status_list.append(workflow_status)
+ # Update resource status in all KSUs
+ # TODO: Is an operation correct if n KSUs are right and 1 is not OK?
+ res_status_list = []
+ for db_ksu, ksu_params, wf_status in zip(db_content, op_params, wf_status_list):
+ if wf_status:
resource_status, db_ksu = await self.check_resource_and_update_db(
"create_ksus", op_id, ksu_params, db_ksu
)
-
+ else:
+ resource_status = False
+ res_status_list.append(resource_status)
self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu)
# Clean items used in the workflow, no matter if the workflow succeeded
self.logger.info(
f"clean_status is :{clean_status} and clean_msg is :{clean_msg}"
)
- self.logger.info(f"KSU Create EXIT with Resource Status {resource_status}")
+ self.logger.info(f"KSU Create EXIT with Resource Status {res_status_list}")
return
async def edit(self, params, order_id):
self.logger.info(f"KSU Move Exit with resource status: {resource_status}")
return
+
+ async def check_create_ksus(self, op_id, op_params, content):
+ self.logger.info(f"check_create_ksus Operation {op_id}. Params: {op_params}.")
+ self.logger.debug(f"Content: {content}")
+ db_ksu = content
+ kustomization_name = db_ksu["git_name"].lower()
+ oka_list = op_params["oka"]
+ oka_item = oka_list[0]
+ oka_params = oka_item.get("transformation", {})
+ target_ns = oka_params.get("namespace", "default")
+ profile_id = op_params.get("profile", {}).get("_id")
+ profile_type = op_params.get("profile", {}).get("profile_type")
+ self.logger.info(
+ f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+ )
+ dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
+ if not dbcluster_list:
+ self.logger.info(f"No clusters found for profile {profile_id}.")
+ for db_cluster in dbcluster_list:
+ try:
+ self.logger.info(
+ f"Checking status of KSU in cluster {db_cluster['name']}."
+ )
+ cluster_kubectl = self.cluster_kubectl(db_cluster)
+ checkings_list = [
+ {
+ "item": "kustomization",
+ "name": kustomization_name,
+ "namespace": target_ns,
+ "flag": "Ready",
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": True,
+ "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY",
+ },
+ ]
+ self.logger.info(
+ f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+ )
+ result, message = await self.common_check_list(
+ op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
+ )
+ if not result:
+ return False, message
+ except Exception as e:
+ self.logger.error(
+ f"Error checking KSU in cluster {db_cluster['name']}."
+ )
+ self.logger.error(e)
+ return False, f"Error checking KSU in cluster {db_cluster['name']}."
+ return True, "OK"
+
+ async def check_delete_ksus(self, op_id, op_params, content):
+ self.logger.info(f"check_delete_ksus Operation {op_id}. Params: {op_params}.")
+ self.logger.debug(f"Content: {content}")
+ db_ksu = content
+ kustomization_name = db_ksu["git_name"].lower()
+ oka_list = db_ksu["oka"]
+ oka_item = oka_list[0]
+ oka_params = oka_item.get("transformation", {})
+ target_ns = oka_params.get("namespace", "default")
+ profile_id = op_params.get("profile", {}).get("_id")
+ profile_type = op_params.get("profile", {}).get("profile_type")
+ self.logger.info(
+ f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+ )
+ dbcluster_list = self.get_dbclusters_from_profile(profile_id, profile_type)
+ if not dbcluster_list:
+ self.logger.info(f"No clusters found for profile {profile_id}.")
+ for db_cluster in dbcluster_list:
+ try:
+ self.logger.info(
+ f"Checking status of KSU in cluster {db_cluster['name']}."
+ )
+ cluster_kubectl = self.cluster_kubectl(db_cluster)
+ checkings_list = [
+ {
+ "item": "kustomization",
+ "name": kustomization_name,
+ "namespace": target_ns,
+ "deleted": True,
+ "timeout": self._checkloop_kustomization_timeout,
+ "enable": True,
+ "resourceState": "IN_PROGRESS.KUSTOMIZATION_DELETED",
+ },
+ ]
+ self.logger.info(
+ f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
+ )
+ result, message = await self.common_check_list(
+ op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
+ )
+ if not result:
+ return False, message
+ except Exception as e:
+ self.logger.error(
+ f"Error checking KSU in cluster {db_cluster['name']}."
+ )
+ self.logger.error(e)
+ return False, f"Error checking KSU in cluster {db_cluster['name']}."
+ return True, "OK"