| # -*- coding: utf-8 -*- |
| |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| __author__ = ( |
| "Shrinithi R <shrinithi.r@tataelxsi.co.in>", |
| "Shahithya Y <shahithya.y@tataelxsi.co.in>", |
| ) |
| |
| import logging |
| from osm_lcm.lcm_utils import LcmBase |
| from copy import deepcopy |
| from osm_lcm import odu_workflows |
| from osm_lcm import vim_sdn |
| |
| |
| class ClusterLcm(LcmBase): |
| db_collection = "clusters" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("cluster Create Enter") |
| db_cluster = content["cluster"] |
| |
| workflow_name = await self.odu.launch_workflow( |
| "create_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "CREATED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_CREATION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| self.update_profile_state(db_cluster, workflow_status, resource_status) |
| return |
| |
| def update_profile_state(self, db_cluster, workflow_status, resource_status): |
| profiles = [ |
| "infra_controller_profiles", |
| "infra_config_profiles", |
| "app_profiles", |
| "resource_profiles", |
| ] |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| for profile_type in profiles: |
| profile_id = db_cluster[profile_type] |
| self.logger.info("profile id is : {}".format(profile_id)) |
| db_collection = profiles_collection[profile_type] |
| self.logger.info("the db_collection is :{}".format(db_collection)) |
| db_profile = self.db.get_one(db_collection, {"_id": profile_id}) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| db_profile["state"] = db_cluster["state"] |
| db_profile["resourceState"] = db_cluster["resourceState"] |
| db_profile["operatingState"] = db_cluster["operatingState"] |
| db_profile = self.update_operation_history( |
| db_profile, workflow_status, resource_status |
| ) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile) |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("cluster delete Enter") |
| db_cluster = content["cluster"] |
| |
| workflow_name = await self.odu.launch_workflow( |
| "delete_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "DELETED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_DELETION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # To delete it from DB |
| if db_cluster["state"] == "DELETED": |
| self.delete_cluster(db_cluster) |
| return |
| |
| def delete_cluster(self, db_cluster): |
| # Actually, item_content is equal to db_cluster |
| # item_content = self.db.get_one("clusters", {"_id": db_cluster["_id"]}) |
| # self.logger.debug("item_content is : {}".format(item_content)) |
| |
| # detach profiles |
| update_dict = None |
| profiles_to_detach = [ |
| "infra_controller_profiles", |
| "infra_config_profiles", |
| "app_profiles", |
| "resource_profiles", |
| ] |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| for profile_type in profiles_to_detach: |
| if db_cluster.get(profile_type): |
| self.logger.info("the profile_type is :{}".format(profile_type)) |
| profile_ids = db_cluster[profile_type] |
| self.logger.info("the profile_ids is :{}".format(profile_ids)) |
| profile_ids_copy = deepcopy(profile_ids) |
| self.logger.info("the profile_ids_copy is :{}".format(profile_ids_copy)) |
| for profile_id in profile_ids_copy: |
| self.logger.info("the profile_id is :{}".format(profile_id)) |
| db_collection = profiles_collection[profile_type] |
| self.logger.info("the db_collection is :{}".format(db_collection)) |
| db_profile = self.db.get_one(db_collection, {"_id": profile_id}) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| self.logger.info( |
| "the item_content name is :{}".format(db_cluster["name"]) |
| ) |
| self.logger.info( |
| "the db_profile name is :{}".format(db_profile["name"]) |
| ) |
| if db_cluster["name"] == db_profile["name"]: |
| self.logger.info("it is getting into if default") |
| self.db.del_one(db_collection, {"_id": profile_id}) |
| else: |
| self.logger.info("it is getting into else non default") |
| profile_ids.remove(profile_id) |
| update_dict = {profile_type: profile_ids} |
| self.logger.info(f"the update dict is :{update_dict}") |
| self.db.set_one( |
| "clusters", {"_id": db_cluster["_id"]}, update_dict |
| ) |
| self.db.del_one("clusters", {"_id": db_cluster["_id"]}) |
| self.logger.info("the id is :{}".format(db_cluster["_id"])) |
| |
| async def attach_profile(self, op_id, op_params, content): |
| self.logger.info("profile attach Enter") |
| db_cluster = content["cluster"] |
| db_profile = content["profile"] |
| profile_type = db_profile["profile_type"] |
| profile_id = db_profile["_id"] |
| self.logger.info("profile type is : {}".format(profile_type)) |
| self.logger.info("profile id is : {}".format(profile_id)) |
| |
| workflow_name = await self.odu.launch_workflow( |
| "attach_profile_to_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "attach_profile_to_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| profile_list = db_cluster[profile_type] |
| self.logger.info("profile list is : {}".format(profile_list)) |
| if resource_status: |
| self.logger.info("it is getting into resource status true") |
| profile_list.append(profile_id) |
| self.logger.info("profile list is : {}".format(profile_list)) |
| db_cluster[profile_type] = profile_list |
| self.logger.info("db cluster is : {}".format(db_cluster)) |
| # update_dict = {item: profile_list} |
| # self.logger.info("the update_dict is :{}".format(update_dict)) |
| # self.db.set_one(self.topic, filter_q, update_dict) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| return |
| |
| async def detach_profile(self, op_id, op_params, content): |
| self.logger.info("profile dettach Enter") |
| db_cluster = content["cluster"] |
| db_profile = content["profile"] |
| profile_type = db_profile["profile_type"] |
| profile_id = db_profile["_id"] |
| self.logger.info("profile type is : {}".format(profile_type)) |
| self.logger.info("profile id is : {}".format(profile_id)) |
| |
| workflow_name = await self.odu.launch_workflow( |
| "detach_profile_from_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "detach_profile_from_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| profile_list = db_cluster[profile_type] |
| self.logger.info("profile list is : {}".format(profile_list)) |
| if resource_status: |
| self.logger.info("it is getting into resource status true") |
| profile_list.remove(profile_id) |
| self.logger.info("profile list is : {}".format(profile_list)) |
| db_cluster[profile_type] = profile_list |
| self.logger.info("db cluster is : {}".format(db_cluster)) |
| # update_dict = {item: profile_list} |
| # self.logger.info("the update_dict is :{}".format(update_dict)) |
| # self.db.set_one(self.topic, filter_q, update_dict) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| return |
| |
| async def register(self, op_id, op_params, content): |
| self.logger.info("cluster register enter") |
| db_cluster = content["cluster"] |
| |
| workflow_name = await self.odu.launch_workflow( |
| "register_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "CREATED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_CREATION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "register_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| self.update_profile_state(db_cluster, workflow_status, resource_status) |
| return |
| |
| async def deregister(self, op_id, op_params, content): |
| self.logger.info("cluster deregister enter") |
| db_cluster = content["cluster"] |
| |
| self.logger.info("db_cluster is : {}".format(db_cluster)) |
| |
| workflow_name = await self.odu.launch_workflow( |
| "deregister_cluster", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["state"] = "DELETED" |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["state"] = "FAILED_DELETION" |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "deregister_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| # To delete it from DB |
| if db_cluster["state"] == "DELETED": |
| self.db.del_one("clusters", {"_id": db_cluster["_id"]}) |
| return |
| |
| async def get_creds(self, db_cluster): |
| self.logger.info("Cluster get creds Enter") |
| result, cluster_creds = await self.odu.get_cluster_credentials(db_cluster) |
| if result: |
| db_cluster["credentials"] = cluster_creds |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| return |
| |
| async def update(self, op_id, op_params, content): |
| self.logger.info("Cluster update Enter") |
| db_cluster = content["cluster"] |
| |
| workflow_name = await self.odu.launch_workflow( |
| "update_cluster", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| # self.logger.info("Db content: {}".format(db_content)) |
| # self.db.set_one(self.db_collection, {"_id": _id}, db_cluster) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "update_cluster", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| # self.logger.info("db_cluster: {}".format(db_cluster)) |
| # TODO: verify enxtcondition |
| # For the moment, if the workflow completed successfully, then we update the db accordingly. |
| if workflow_status: |
| if "k8s_version" in op_params: |
| db_cluster["k8s_version"] = op_params["k8s_version"] |
| elif "node_count" in op_params: |
| db_cluster["node_count"] = op_params["node_count"] |
| # self.db.set_one(self.db_collection, {"_id": _id}, db_content) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| return |
| |
| |
| class CloudCredentialsLcm(LcmBase): |
| db_collection = "vim_accounts" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def add(self, op_id, op_params, content): |
| self.logger.info("Cloud Credentials create") |
| workflow_name = await self.odu.launch_workflow( |
| "create_cloud_credentials", op_id, op_params, content |
| ) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| |
| self.logger.info( |
| "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| return |
| |
| async def edit(self, op_id, op_params, content): |
| workflow_name = await self.odu.launch_workflow( |
| "update_cloud_credentials", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "update_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| return |
| |
| async def remove(self, op_id, op_params, content): |
| self.logger.info("Cloud Credentials delete") |
| workflow_name = await self.odu.launch_workflow( |
| "delete_cloud_credentials", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Msg: {}".format(workflow_status, workflow_msg) |
| ) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_cloud_credentials", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| self.db.del_one(self.db_collection, {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sAppLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("App Create Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("App delete Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sapp", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sResourceLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("Resource Create Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("Resource delete Enter") |
| content = self.db.get_one("k8sresource", {"_id": content["_id"]}) |
| |
| workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sresource", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sInfraControllerLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("Infra controller Create Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("Infra controller delete Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sinfra_controller", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sInfraConfigLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("Infra config Create Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("Infra config delete Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "DELETED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_DELETION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_profile", op_id, op_params, content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| # To delete it from DB |
| if content["state"] == "DELETED": |
| self.db.del_one("k8sinfra_config", {"_id": content["_id"]}) |
| return |
| |
| |
| class OkaLcm(LcmBase): |
| db_collection = "okas" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("OKA Create Enter") |
| db_content = content |
| |
| workflow_name = await self.odu.launch_workflow( |
| "create_oka", op_id, op_params, db_content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["state"] = "CREATED" |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["state"] = "FAILED_CREATION" |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_oka", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| # self.logger.info("Db content: {}".format(db_content)) |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| return |
| |
| async def edit(self, op_id, op_params, content): |
| self.logger.info("OKA Edit Enter") |
| db_content = content |
| |
| workflow_name = await self.odu.launch_workflow( |
| "update_oka", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| # self.logger.info("Db content: {}".format(db_content)) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "update_oka", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("OKA delete Enter") |
| db_content = content |
| |
| workflow_name = await self.odu.launch_workflow( |
| "delete_oka", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["state"] = "DELETED" |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["state"] = "FAILED_DELETION" |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_oka", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if db_content["state"] == "DELETED": |
| self.db.del_one(self.db_collection, {"_id": db_content["_id"]}) |
| return |
| |
| |
| class KsuLcm(LcmBase): |
| db_collection = "ksus" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.gitops") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, op_id, op_params, content): |
| self.logger.info("ksu Create Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "create_ksus", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if workflow_status: |
| db_ksu["state"] = "CREATED" |
| db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_ksu["state"] = "FAILED_CREATION" |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history(db_ksu, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "create_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if resource_status: |
| db_ksu["resourceState"] = "READY" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history( |
| db_ksu, workflow_status, resource_status |
| ) |
| |
| for db_ksu in content: |
| db_ksu["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| return |
| |
| async def edit(self, op_id, op_params, content): |
| self.logger.info("ksu edit Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "update_ksus", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if workflow_status: |
| db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history(db_ksu, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "update_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if resource_status: |
| db_ksu["resourceState"] = "READY" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history( |
| db_ksu, workflow_status, resource_status |
| ) |
| |
| for db_ksu, ksu_params in zip(content, op_params): |
| db_ksu["operatingState"] = "IDLE" |
| if workflow_status: |
| db_ksu["name"] = ksu_params["name"] |
| db_ksu["description"] = ksu_params["description"] |
| db_ksu["profile"]["profile_type"] = ksu_params["profile"][ |
| "profile_type" |
| ] |
| db_ksu["profile"]["_id"] = ksu_params["profile"]["_id"] |
| db_ksu["oka"] = ksu_params["oka"] |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| return |
| |
| async def delete(self, op_id, op_params, content): |
| self.logger.info("ksu delete Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "delete_ksus", op_id, op_params, content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if workflow_status: |
| db_ksu["state"] = "DELETED" |
| db_ksu["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_ksu["state"] = "FAILED_DELETION" |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history(db_ksu, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "delete_ksus", op_id, op_params, content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| for db_ksu in content: |
| if resource_status: |
| db_ksu["resourceState"] = "READY" |
| else: |
| db_ksu["resourceState"] = "ERROR" |
| |
| db_ksu = self.update_operation_history( |
| db_ksu, workflow_status, resource_status |
| ) |
| |
| for db_ksu in content: |
| db_ksu["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_ksu["_id"]}, db_ksu) |
| |
| if db_ksu["state"] == "DELETED": |
| self.db.del_one(self.db_collection, {"_id": db_ksu["_id"]}) |
| return |
| |
| async def clone(self, op_id, op_params, db_content): |
| self.logger.info("ksu clone Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "clone_ksus", op_id, op_params, db_content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "clone_ksus", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| return |
| |
| async def move(self, op_id, op_params, db_content): |
| self.logger.info("ksu move Enter") |
| |
| workflow_name = await self.odu.launch_workflow( |
| "move_ksus", op_id, op_params, db_content |
| ) |
| workflow_status, workflow_msg = await self.odu.check_workflow_status( |
| workflow_name |
| ) |
| self.logger.info( |
| "Workflow Status: {} Workflow Message: {}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| |
| if workflow_status: |
| db_content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history(db_content, workflow_status, None) |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| |
| if workflow_status: |
| resource_status, resource_msg = await self.odu.check_resource_status( |
| "move_ksus", op_id, op_params, db_content |
| ) |
| self.logger.info( |
| "Resource Status: {} Resource Message: {}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_content["resourceState"] = "READY" |
| else: |
| db_content["resourceState"] = "ERROR" |
| |
| db_content = self.update_operation_history( |
| db_content, workflow_status, resource_status |
| ) |
| |
| db_content["operatingState"] = "IDLE" |
| self.db.set_one(self.db_collection, {"_id": db_content["_id"]}, db_content) |
| return |