| # -*- coding: utf-8 -*- |
| |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| __author__ = ( |
| "Shrinithi R <shrinithi.r@tataelxsi.co.in>", |
| "Shahithya Y <shahithya.y@tataelxsi.co.in>", |
| ) |
| |
| import copy |
| import logging |
| from time import time |
| import traceback |
| from osm_lcm.lcm_utils import LcmBase |
| from copy import deepcopy |
| from osm_lcm import odu_workflows |
| from osm_lcm import vim_sdn |
| |
| |
| class GitOpsLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| self._checkloop_kustomization_timeout = 900 |
| self._checkloop_resource_timeout = 900 |
| self._workflows = {} |
| super().__init__(msg, self.logger) |
| |
| async def check_dummy_operation(self, op_id, op_params, content): |
| self.logger.info(f"Operation {op_id}. Params: {op_params}. Content: {content}") |
| return True, "OK" |
| |
| def update_operation_history( |
| self, content, workflow_status=None, resource_status=None |
| ): |
| self.logger.info( |
| f"Update Operation History in DB. Workflow status: {workflow_status}. Resource status: {resource_status}" |
| ) |
| self.logger.debug(f"Content: {content}") |
| |
| op_id = content["current_operation"] |
| self.logger.debug("OP_id: {}".format(op_id)) |
| op_num = 0 |
| for operation in content["operationHistory"]: |
| self.logger.debug("Operations: {}".format(operation)) |
| if operation["op_id"] == op_id: |
| self.logger.debug("Found operation number: {}".format(op_num)) |
| now = time() |
| if workflow_status: |
| content["operationHistory"][op_num]["workflowState"] = "COMPLETED" |
| content["operationHistory"][op_num]["result"] = True |
| else: |
| content["operationHistory"][op_num]["workflowState"] = "ERROR" |
| content["operationHistory"][op_num]["operationState"] = "FAILED" |
| content["operationHistory"][op_num]["result"] = False |
| |
| if resource_status: |
| content["operationHistory"][op_num]["resourceState"] = "READY" |
| content["operationHistory"][op_num]["operationState"] = "COMPLETED" |
| content["operationHistory"][op_num]["result"] = True |
| else: |
| content["operationHistory"][op_num]["resourceState"] = "NOT_READY" |
| content["operationHistory"][op_num]["operationState"] = "FAILED" |
| content["operationHistory"][op_num]["result"] = False |
| |
| content["operationHistory"][op_num]["endDate"] = now |
| break |
| op_num += 1 |
| self.logger.debug("content: {}".format(content)) |
| |
| return content |
| |
| async def common_check_list(self, checkings_list, db_collection, db_item): |
| try: |
| for checking in checkings_list: |
| if checking["enable"]: |
| status, message = await self.odu.readiness_loop( |
| item=checking["item"], |
| name=checking["name"], |
| namespace=checking["namespace"], |
| flag=checking["flag"], |
| timeout=checking["timeout"], |
| ) |
| if not status: |
| return status, message |
| else: |
| db_item["resourceState"] = checking["resourceState"] |
| db_item = self.update_operation_history( |
| db_item, "COMPLETED", checking["resourceState"] |
| ) |
| self.db.set_one(db_collection, {"_id": db_item["_id"]}, db_item) |
| except Exception as e: |
| self.logger.debug(traceback.format_exc()) |
| self.logger.debug(f"Exception: {e}", exc_info=True) |
| return False, f"Unexpected exception: {e}" |
| return True, "OK" |
| |
| async def check_resource_status(self, key, op_id, op_params, content): |
| self.logger.info( |
| f"Check resource status. Key: {key}. Operation: {op_id}. Params: {op_params}. Content: {content}" |
| ) |
| check_resource_function = self._workflows.get(key, {}).get( |
| "check_resource_function" |
| ) |
| self.logger.info("check_resource function : {}".format(check_resource_function)) |
| if check_resource_function: |
| return await check_resource_function(op_id, op_params, content) |
| else: |
| return await self.check_dummy_operation(op_id, op_params, content) |
| |
| def decrypting_key(self, content): |
| # This deep copy is for to be passed to ODU workflows. |
| cluster_copy = copy.deepcopy(content) |
| |
| # decrypting the key |
| self.db.encrypt_decrypt_fields( |
| cluster_copy, |
| "decrypt", |
| ["age_pubkey", "age_privkey"], |
| schema_version="1.11", |
| salt=cluster_copy["_id"], |
| ) |
| db_cluster_copy = { |
| "cluster": cluster_copy, |
| } |
| return db_cluster_copy |
| |
| |
| class ClusterLcm(GitOpsLcm): |
| db_collection = "clusters" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| self._workflows = { |
| "create_cluster": { |
| "check_resource_function": self.check_create_cluster, |
| }, |
| } |
| self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("cluster Create Enter") |
| db_cluster = content["cluster"] |
| |
| db_cluster_copy = self.decrypting_key(db_cluster) |
| |
| # vim account details |
| db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]}) |
| db_cluster_copy["vim_account"] = db_vim |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "CREATED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_CREATION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # Clean items used in the workflow, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "create_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| self.update_profile_state(db_cluster, workflow_status, resource_status) |
| return |
| |
| async def check_create_cluster(self, op_id, op_params, content): |
| self.logger.info( |
| f"check_create_cluster Operation {op_id}. Params: {op_params}." |
| ) |
| # self.logger.debug(f"Content: {content}") |
| db_cluster = content["cluster"] |
| cluster_name = db_cluster["git_name"].lower() |
| cluster_kustomization_name = cluster_name |
| db_vim_account = content["vim_account"] |
| cloud_type = db_vim_account["vim_type"] |
| nodepool_name = "" |
| if cloud_type == "aws": |
| nodepool_name = f"{cluster_name}-nodegroup" |
| cluster_name = f"{cluster_name}-cluster" |
| elif cloud_type == "gcp": |
| nodepool_name = f"nodepool-{cluster_name}" |
| bootstrap = op_params.get("bootstrap", True) |
| if cloud_type in ("azure", "gcp", "aws"): |
| checkings_list = [ |
| { |
| "item": "kustomization", |
| "name": cluster_kustomization_name, |
| "namespace": "managed-resources", |
| "flag": "Ready", |
| "timeout": self._checkloop_kustomization_timeout, |
| "enable": True, |
| "resourceState": "IN_PROGRESS.KUSTOMIZATION_READY", |
| }, |
| { |
| "item": f"cluster_{cloud_type}", |
| "name": cluster_name, |
| "namespace": "", |
| "flag": "Synced", |
| "timeout": self._checkloop_resource_timeout, |
| "enable": True, |
| "resourceState": "IN_PROGRESS.RESOURCE_SYNCED.CLUSTER", |
| }, |
| { |
| "item": f"cluster_{cloud_type}", |
| "name": cluster_name, |
| "namespace": "", |
| "flag": "Ready", |
| "timeout": self._checkloop_resource_timeout, |
| "enable": True, |
| "resourceState": "IN_PROGRESS.RESOURCE_READY.CLUSTER", |
| }, |
| { |
| "item": "kustomization", |
| "name": f"{cluster_kustomization_name}-bstrp-fluxctrl", |
| "namespace": "managed-resources", |
| "flag": "Ready", |
| "timeout": self._checkloop_kustomization_timeout, |
| "enable": bootstrap, |
| "resourceState": "IN_PROGRESS.BOOTSTRAP_OK", |
| }, |
| ] |
| else: |
| return False, "Not suitable VIM account to check cluster status" |
| if nodepool_name: |
| nodepool_check = { |
| "item": f"nodepool_{cloud_type}", |
| "name": nodepool_name, |
| "namespace": "", |
| "flag": "Ready", |
| "timeout": self._checkloop_resource_timeout, |
| "enable": True, |
| "resourceState": "IN_PROGRESS.RESOURCE_READY.NODEPOOL", |
| } |
| checkings_list.insert(3, nodepool_check) |
| return await self.common_check_list(checkings_list, "clusters", db_cluster) |
| |
| def update_profile_state(self, db_cluster, workflow_status, resource_status): |
| profiles = [ |
| "infra_controller_profiles", |
| "infra_config_profiles", |
| "app_profiles", |
| "resource_profiles", |
| ] |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| for profile_type in profiles: |
| profile_id = db_cluster[profile_type] |
| self.logger.info("profile id is : {}".format(profile_id)) |
| db_collection = profiles_collection[profile_type] |
| self.logger.info("the db_collection is :{}".format(db_collection)) |
| db_profile = self.db.get_one(db_collection, {"_id": profile_id}) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| db_profile["state"] = db_cluster["state"] |
| db_profile["resourceState"] = db_cluster["resourceState"] |
| db_profile["operatingState"] = db_cluster["operatingState"] |
| db_profile["age_pubkey"] = db_cluster["age_pubkey"] |
| db_profile["age_privkey"] = db_profile["age_privkey"] |
| db_profile = self.update_operation_history( |
| db_profile, workflow_status, resource_status |
| ) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile) |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("cluster delete Enter") |
| db_cluster = content["cluster"] |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "DELETED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_DELETION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # To delete it from DB |
| if db_cluster["state"] == "DELETED": |
| self.delete_cluster(db_cluster) |
| return |
| |
| def delete_cluster(self, db_cluster): |
| # Actually, item_content is equal to db_cluster |
| # item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]}) |
| # self.logger.debug("item_content is : {}".format(item_content)) |
| |
| # detach profiles |
| update_dict = None |
| profiles_to_detach = [ |
| "infra_controller_profiles", |
| "infra_config_profiles", |
| "app_profiles", |
| "resource_profiles", |
| ] |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| for profile_type in profiles_to_detach: |
| if db_cluster.get(profile_type): |
| self.logger.debug("the profile_type is :{}".format(profile_type)) |
| profile_ids = db_cluster[profile_type] |
| self.logger.debug("the profile_ids is :{}".format(profile_ids)) |
| profile_ids_copy = deepcopy(profile_ids) |
| self.logger.debug( |
| "the profile_ids_copy is :{}".format(profile_ids_copy) |
| ) |
| for profile_id in profile_ids_copy: |
| self.logger.debug("the profile_id is :{}".format(profile_id)) |
| db_collection = profiles_collection[profile_type] |
| self.logger.debug("the db_collection is :{}".format(db_collection)) |
| db_profile = self.db.get_one(db_collection, {"_id": profile_id}) |
| self.logger.debug("the db_profile is :{}".format(db_profile)) |
| self.logger.debug( |
| "the item_content name is :{}".format(db_cluster["name"]) |
| ) |
| self.logger.debug( |
| "the db_profile name is :{}".format(db_profile["name"]) |
| ) |
| if db_cluster["name"] == db_profile["name"]: |
| self.logger.debug("it is getting into if default") |
| self.db.del_one(db_collection, {"_id": profile_id}) |
| else: |
| self.logger.debug("it is getting into else non default") |
| profile_ids.remove(profile_id) |
| update_dict = {profile_type: profile_ids} |
| self.logger.debug(f"the update dict is :{update_dict}") |
| self.db.set_one( |
| "clusters", {"_id": db_cluster["_id"]}, update_dict |
| ) |
| self.db.del_one("clusters", {"_id": db_cluster["_id"]}) |
| self.logger.debug("the id is :{}".format(db_cluster["_id"])) |
| |
| async def attach_profile(self, op_id, op_params, content): |
| self.logger.info("profile attach Enter") |
| db_cluster = content["cluster"] |
| db_profile = content["profile"] |
| profile_type = db_profile["profile_type"] |
| profile_id = db_profile["_id"] |
| self.logger.info("profile type is : {}".format(profile_type)) |
| self.logger.info("profile id is : {}".format(profile_id)) |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "attach_profile_to_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "attach_profile_to_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| profile_list = db_cluster[profile_type] |
| self.logger.info("profile list is : {}".format(profile_list)) |
| if resource_status: |
| self.logger.info("it is getting into resource status true") |
| profile_list.append(profile_id) |
| self.logger.info("profile list is : {}".format(profile_list)) |
| db_cluster[profile_type] = profile_list |
| self.logger.info("db cluster is : {}".format(db_cluster)) |
| # update_dict = {item: profile_list} |
| # self.logger.info("the update_dict is :{}".format(update_dict)) |
| # self.db.set_one(self.topic, filter_q, update_dict) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| return |
| |
| async def detach_profile(self, op_id, op_params, content): |
| self.logger.info("profile dettach Enter") |
| db_cluster = content["cluster"] |
| db_profile = content["profile"] |
| profile_type = db_profile["profile_type"] |
| profile_id = db_profile["_id"] |
| self.logger.info("profile type is : {}".format(profile_type)) |
| self.logger.info("profile id is : {}".format(profile_id)) |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "detach_profile_from_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "detach_profile_from_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| profile_list = db_cluster[profile_type] |
| self.logger.info("profile list is : {}".format(profile_list)) |
| if resource_status: |
| self.logger.info("it is getting into resource status true") |
| profile_list.remove(profile_id) |
| self.logger.info("profile list is : {}".format(profile_list)) |
| db_cluster[profile_type] = profile_list |
| self.logger.info("db cluster is : {}".format(db_cluster)) |
| # update_dict = {item: profile_list} |
| # self.logger.info("the update_dict is :{}".format(update_dict)) |
| # self.db.set_one(self.topic, filter_q, update_dict) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| return |
| |
| async def register(self, op_id, op_params, content): |
| self.logger.info("cluster register enter") |
| db_cluster = content["cluster"] |
| |
| db_cluster_copy = self.decrypting_key(db_cluster) |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "register_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "CREATED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_CREATION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # Clean items used in the workflow, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "register_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "register_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| return |
| |
| async def deregister(self, op_id, op_params, content): |
| self.logger.info("cluster deregister enter") |
| db_cluster = content["cluster"] |
| |
| self.logger.info("db_cluster is : {}".format(db_cluster)) |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "deregister_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "DELETED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_DELETION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # Clean items used in the workflow or in the cluster, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "deregister_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "deregister_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # To delete it from DB |
| if db_cluster["state"] == "DELETED": |
| self.db.del_one("clusters", {"_id": db_cluster["_id"]}) |
| return |
| |
| async def get_creds(self, op_id, db_cluster): |
| self.logger.info("Cluster get creds Enter") |
| result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster) |
| if result: |
| db_cluster["credentials"] = cluster_creds |
| op_len = 0 |
| for operations in db_cluster["operationHistory"]: |
| if operations["op_id"] == op_id: |
| db_cluster["operationHistory"][op_len]["result"] = result |
| db_cluster["operationHistory"][op_len]["endDate"] = time() |
| op_len += 1 |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| return |
| |
| async def update(self, op_id, op_params, content): |
| self.logger.info("Cluster update Enter") |
| db_cluster = content["cluster"] |
| |
| db_cluster_copy = self.decrypting_key(db_cluster) |
| |
| # vim account details |
| db_vim = self.db.get_one("vim_accounts", {"name": db_cluster["vim_account"]}) |
| db_cluster_copy["vim_account"] = db_vim |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "update_cluster", op_id, op_params, db_cluster_copy |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| # self.logger.info("Db content: {}".format(db_content)) |
| # self.db.set_one(self.db_collection, {"_id": _id}, db_cluster) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # Clean items used in the workflow, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "update_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "update_cluster", op_id, op_params, db_cluster_copy |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| # self.logger.info("db_cluster: {}".format(db_cluster)) |
| # TODO: verify enxtcondition |
| # For the moment, if the workflow completed successfully, then we update the db accordingly. |
| if workflow_status: |
| if "k8s_version" in op_params: |
| db_cluster["k8s_version"] = op_params["k8s_version"] |
| elif "node_count" in op_params: |
| db_cluster["node_count"] = op_params["node_count"] |
| # self.db.set_one(self.db_collection, {"_id": _id}, db_content) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| return |
| |
| |
| class CloudCredentialsLcm(GitOpsLcm): |
| db_collection = "vim_accounts" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| |
| async def add(self, op_id, op_params, content): |
| self.logger.info("Cloud Credentials create") |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_cloud_credentials", op_id, op_params, content |
| ) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| |
| self.logger.info( |
| "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) |
| ) |
| |
| # Clean items used in the workflow, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "create_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| content["_admin"]["operationalState"] = "ENABLED" |
| for operation in content["_admin"]["operations"]: |
| if operation["lcmOperationType"] == "create": |
| operation["operationState"] = "ENABLED" |
| self.logger.info("Content : {}".format(content)) |
| self.db.set_one("vim_accounts", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def edit(self, op_id, op_params, content): |
| _, workflow_name = await self.odu.launch_workflow( |
| "update_cloud_credentials", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) |
| ) |
| |
| # Clean items used in the workflow, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "update_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "update_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| return |
| |
| async def remove(self, op_id, op_params, content): |
| self.logger.info("Cloud Credentials delete") |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_cloud_credentials", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| self.db.del_one(self.db_collection, {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sAppLcm(GitOpsLcm): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("App Create Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("App delete Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sapp", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sResourceLcm(GitOpsLcm): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("Resource Create Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("Resource delete Enter") |
| content = self.db.get_one("k8sresource", {"_id": content["_id"]}) |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sresource", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sInfraControllerLcm(GitOpsLcm): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("Infra controller Create Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("Infra controller delete Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sinfra_controller", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sInfraConfigLcm(GitOpsLcm): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("Infra config Create Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("Infra config delete Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sinfra_config", {"_id": content["_id"]}) |
| return |
| |
| |
| class OkaLcm(GitOpsLcm): |
| db_collection = "okas" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("OKA Create Enter") |
| db_content = content |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_oka", op_id, op_params, db_content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["state"] = "CREATED" |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["state"] = "FAILED_CREATION" |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_oka", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| # self.logger.info("Db content: {}".format(db_content)) |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| return |
| |
| async def edit(self, op_id, op_params, content): |
| self.logger.info("OKA Edit Enter") |
| db_content = content |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "update_oka", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| # self.logger.info("Db content: {}".format(db_content)) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "update_oka", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("OKA delete Enter") |
| db_content = content |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_oka", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["state"] = "DELETED" |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["state"] = "FAILED_DELETION" |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_oka", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if db_content["state"] == "DELETED": |
| self.db.del_one(self.db_collection, {"_id": db_content["_id"]}) |
| return |
| |
| |
| class KsuLcm(GitOpsLcm): |
| db_collection = "ksus" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| super().__init__(msg, lcm_tasks, config) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("ksu Create Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "create_ksus", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if workflow_status: |
| db_ksu["state"] = "CREATED" |
| db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_ksu["state"] = "FAILED_CREATION" |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history(db_ksu, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| # Clean items used in the workflow, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "create_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "create_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if resource_status: |
| db_ksu["resourceState"] = "READY" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history( |
| db_ksu, workflow_status, resource_status |
| ) |
| |
| for db_ksu in content: |
| db_ksu["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| return |
| |
| async def edit(self, op_id, op_params, content): |
| self.logger.info("ksu edit Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "update_ksus", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if workflow_status: |
| db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history(db_ksu, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| # Clean items used in the workflow, no matter if the workflow succeeded |
| clean_status, clean_msg = await self.odu.clean_items_workflow( |
| "create_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| f"clean_status is :{clean_status} and clean_msg is :{clean_msg}" |
| ) |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "update_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if resource_status: |
| db_ksu["resourceState"] = "READY" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history( |
| db_ksu, workflow_status, resource_status |
| ) |
| |
| for db_ksu, ksu_params in zip(content, op_params): |
| db_ksu["operatingState"] = "IDLE" |
| if workflow_status: |
| db_ksu["name"] = ksu_params["name"] |
| db_ksu["description"] = ksu_params["description"] |
| db_ksu["profile"]["profile_type"] = ksu_params["profile"][ |
| "profile_type" |
| ] |
| db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"] |
| db_ksu["oka"] = ksu_params["oka"] |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("ksu delete Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "delete_ksus", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if workflow_status: |
| db_ksu["state"] = "DELETED" |
| db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_ksu["state"] = "FAILED_DELETION" |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history(db_ksu, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "delete_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if resource_status: |
| db_ksu["resourceState"] = "READY" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history( |
| db_ksu, workflow_status, resource_status |
| ) |
| |
| for db_ksu in content: |
| db_ksu["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| if db_ksu["state"] == "DELETED": |
| self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]}) |
| return |
| |
| async def clone(self, op_id, op_params, db_content): |
| self.logger.info("ksu clone Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "clone_ksus", op_id, op_params, db_content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "clone_ksus", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| return |
| |
| async def move(self, op_id, op_params, db_content): |
| self.logger.info("ksu move Enter") |
| |
| _, workflow_name = await self.odu.launch_workflow( |
| "move_ksus", op_id, op_params, db_content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.check_resource_status( |
| "move_ksus", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| return |