| # -*- coding: utf-8 -*- |
| |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| __author__ = ( |
| "Shrinithi R <shrinithi.r@tataelxsi.co.in>", |
| "Shahithya Y <shahithya.y@tataelxsi.co.in>", |
| ) |
| |
| import logging |
| from osm_lcm.lcm_utils import LcmBase |
| from copy import deepcopy |
| from osm_lcm import odu_workflows |
| from osm_lcm import vim_sdn |
| |
| |
| class ClusterLcm(LcmBase): |
| db_topic = "clusters" |
| |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.clusterlcm") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, content, order_id): |
| self.logger.info("cluster Create Enter") |
| |
| workflow_name = self.odu.launch_workflow("create_cluster", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("clusters", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "create_cluster", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": content["_id"]}, content) |
| self.profile_state(content, workflow_status, resource_status) |
| return |
| |
| def profile_state(self, content, workflow_status, resource_status): |
| profiles = [ |
| "infra_controller_profiles", |
| "infra_config_profiles", |
| "app_profiles", |
| "resource_profiles", |
| ] |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| for profile_type in profiles: |
| profile_id = content[profile_type] |
| self.logger.info("profile id is : {}".format(profile_id)) |
| db_collection = profiles_collection[profile_type] |
| self.logger.info("the db_collection is :{}".format(db_collection)) |
| db_profile = self.db.get_one(db_collection, {"_id": profile_id}) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| db_profile["state"] = content["state"] |
| db_profile["resourceState"] = content["resourceState"] |
| db_profile["operatingState"] = content["operatingState"] |
| db_profile = self.update_operation_history( |
| db_profile, workflow_status, resource_status |
| ) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile) |
| |
| async def delete(self, content, order_id): |
| self.logger.info("cluster delete Enter") |
| items = self.db.get_one("clusters", {"_id": content["_id"]}) |
| |
| workflow_name = self.odu.launch_workflow("delete_cluster", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| items["state"] = "DELETED" |
| items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| items["state"] = "FAILED_DELETION" |
| items["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| items = self.update_operation_history(items, workflow_status, None) |
| self.db.set_one("clusters", {"_id": content["_id"]}, items) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "delete_cluster", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| items["resourceState"] = "READY" |
| else: |
| items["resourceState"] = "ERROR" |
| |
| items["operatingState"] = "IDLE" |
| items = self.update_operation_history(items, workflow_status, resource_status) |
| self.db.set_one("clusters", {"_id": content["_id"]}, items) |
| |
| # To delete it from dB |
| if items["state"] == "DELETED": |
| self.delete_cluster(content, order_id) |
| return |
| |
| def delete_cluster(self, content, order_id): |
| item_content = self.db.get_one("clusters", {"_id": content["_id"]}) |
| self.logger.info("1_the item_content is : {}".format(item_content)) |
| |
| self.logger.info("it is getting into if item_content state") |
| # detach profiles |
| update_dict = None |
| profiles_to_detach = [ |
| "infra_controller_profiles", |
| "infra_config_profiles", |
| "app_profiles", |
| "resource_profiles", |
| ] |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| for profile_type in profiles_to_detach: |
| if item_content.get(profile_type): |
| self.logger.info("the profile_type is :{}".format(profile_type)) |
| profile_ids = item_content[profile_type] |
| self.logger.info("the profile_ids is :{}".format(profile_ids)) |
| profile_ids_copy = deepcopy(profile_ids) |
| self.logger.info("the profile_ids_copy is :{}".format(profile_ids_copy)) |
| for profile_id in profile_ids_copy: |
| self.logger.info("the profile_id is :{}".format(profile_id)) |
| db_collection = profiles_collection[profile_type] |
| self.logger.info("the db_collection is :{}".format(db_collection)) |
| db_profile = self.db.get_one(db_collection, {"_id": profile_id}) |
| self.logger.info("the db_profile is :{}".format(db_profile)) |
| self.logger.info( |
| "the item_content name is :{}".format(item_content["name"]) |
| ) |
| self.logger.info( |
| "the db_profile name is :{}".format(db_profile["name"]) |
| ) |
| if item_content["name"] == db_profile["name"]: |
| self.logger.info("it is getting into if default") |
| self.db.del_one(db_collection, {"_id": profile_id}) |
| else: |
| self.logger.info("it is getting into else non default") |
| profile_ids.remove(profile_id) |
| update_dict = {profile_type: profile_ids} |
| self.logger.info(f"the update dict is :{update_dict}") |
| self.db.set_one( |
| "clusters", {"_id": content["_id"]}, update_dict |
| ) |
| self.db.del_one("clusters", {"_id": item_content["_id"]}) |
| self.logger.info("the id is :{}".format(content["_id"])) |
| |
| async def add(self, content, order_id): |
| self.logger.info("profile attach Enter") |
| db_cluster = self.db.get_one("clusters", {"_id": content["_id"]}) |
| profile_type = content["profile_type"] |
| self.logger.info("profile type is : {}".format(profile_type)) |
| profile_id = content["profile_id"] |
| self.logger.info("profile id is : {}".format(profile_id)) |
| |
| workflow_name = self.odu.launch_workflow("attach_profile_to_cluster", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "attach_profile_to_cluster", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| db_collection = profiles_collection[profile_type] |
| self.logger.info("db_collection is : {}".format(db_collection)) |
| profile_list = db_cluster[profile_type] |
| self.logger.info("profile list is : {}".format(profile_list)) |
| if resource_status: |
| self.logger.info("it is getting into resource status true") |
| profile_list.append(profile_id) |
| self.logger.info("profile list is : {}".format(profile_list)) |
| db_cluster[profile_type] = profile_list |
| self.logger.info("db cluster is : {}".format(db_cluster)) |
| # update_dict = {item: profile_list} |
| # self.logger.info("the update_dict is :{}".format(update_dict)) |
| # self.db.set_one(self.topic, filter_q, update_dict) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| return |
| |
| async def remove(self, content, order_id): |
| self.logger.info("profile dettach Enter") |
| db_cluster = self.db.get_one("clusters", {"_id": content["_id"]}) |
| profile_type = content["profile_type"] |
| self.logger.info("profile type is : {}".format(profile_type)) |
| profile_id = content["profile_id"] |
| self.logger.info("profile id is : {}".format(profile_id)) |
| |
| workflow_name = self.odu.launch_workflow("detach_profile_from_cluster", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| db_cluster = self.update_operation_history(db_cluster, workflow_status, None) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "detach_profile_from_cluster", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| db_cluster["resourceState"] = "READY" |
| else: |
| db_cluster["resourceState"] = "ERROR" |
| |
| db_cluster["operatingState"] = "IDLE" |
| db_cluster = self.update_operation_history( |
| db_cluster, workflow_status, resource_status |
| ) |
| profiles_collection = { |
| "infra_controller_profiles": "k8sinfra_controller", |
| "infra_config_profiles": "k8sinfra_config", |
| "app_profiles": "k8sapp", |
| "resource_profiles": "k8sresource", |
| } |
| db_collection = profiles_collection[profile_type] |
| self.logger.info("db_collection is : {}".format(db_collection)) |
| profile_list = db_cluster[profile_type] |
| self.logger.info("profile list is : {}".format(profile_list)) |
| if resource_status: |
| self.logger.info("it is getting into resource status true") |
| profile_list.remove(profile_id) |
| self.logger.info("profile list is : {}".format(profile_list)) |
| db_cluster[profile_type] = profile_list |
| self.logger.info("db cluster is : {}".format(db_cluster)) |
| # update_dict = {item: profile_list} |
| # self.logger.info("the update_dict is :{}".format(update_dict)) |
| # self.db.set_one(self.topic, filter_q, update_dict) |
| self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) |
| |
| return |
| |
| async def register(self, content, order_id): |
| self.logger.info("cluster register enter") |
| |
| workflow_name = self.odu.launch_workflow("register_cluster", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("clusters", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "register_cluster", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("clusters", {"_id": content["_id"]}, content) |
| self.profile_state(content, workflow_status, resource_status) |
| return |
| |
| async def deregister(self, content, order_id): |
| self.logger.info("cluster deregister enter") |
| |
| items = self.db.get_one("clusters", {"_id": content["_id"]}) |
| self.logger.info("the items is : {}".format(items)) |
| |
| workflow_name = self.odu.launch_workflow("deregister_cluster", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| items["state"] = "DELETED" |
| items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| items["state"] = "FAILED_DELETION" |
| items["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| items = self.update_operation_history(items, workflow_status, None) |
| self.db.set_one("clusters", {"_id": content["_id"]}, items) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "deregister_cluster", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| items["resourceState"] = "READY" |
| else: |
| items["resourceState"] = "ERROR" |
| |
| items["operatingState"] = "IDLE" |
| items = self.update_operation_history(items, workflow_status, resource_status) |
| self.db.set_one("clusters", {"_id": content["_id"]}, items) |
| |
| # To delete it from dB |
| if items["state"] == "DELETED": |
| self.db.del_one("clusters", {"_id": items["_id"]}) |
| return |
| |
| |
| class K8sAppLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.clusterlcm") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, content, order_id): |
| self.logger.info("App Create Enter") |
| |
| workflow_name = self.odu.launch_workflow("create_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "create_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, content, order_id): |
| self.logger.info("App delete Enter") |
| items = self.db.get_one("k8sapp", {"_id": content["_id"]}) |
| |
| workflow_name = self.odu.launch_workflow("delete_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| items["state"] = "DELETED" |
| items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| items["state"] = "FAILED_DELETION" |
| items["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| items = self.update_operation_history(items, workflow_status, None) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "delete_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| items["resourceState"] = "READY" |
| else: |
| items["resourceState"] = "ERROR" |
| |
| items["operatingState"] = "IDLE" |
| items = self.update_operation_history(items, workflow_status, resource_status) |
| self.db.set_one("k8sapp", {"_id": content["_id"]}, items) |
| |
| # To delete it from dB |
| if items["state"] == "DELETED": |
| self.db.del_one("k8sapp", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sResourceLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.clusterlcm") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, content, order_id): |
| self.logger.info("Resource Create Enter") |
| |
| workflow_name = self.odu.launch_workflow("create_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "create_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, content, order_id): |
| self.logger.info("Resource delete Enter") |
| items = self.db.get_one("k8sresource", {"_id": content["_id"]}) |
| |
| workflow_name = self.odu.launch_workflow("delete_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| items["state"] = "DELETED" |
| items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| items["state"] = "FAILED_DELETION" |
| items["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| items = self.update_operation_history(items, workflow_status, None) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, items) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "delete_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| items["resourceState"] = "READY" |
| else: |
| items["resourceState"] = "ERROR" |
| |
| items["operatingState"] = "IDLE" |
| items = self.update_operation_history(items, workflow_status, resource_status) |
| self.db.set_one("k8sresource", {"_id": content["_id"]}, items) |
| |
| # To delete it from dB |
| if items["state"] == "DELETED": |
| self.db.del_one("k8sresource", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sInfraControllerLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.clusterlcm") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, content, order_id): |
| self.logger.info("Infra controller Create Enter") |
| |
| workflow_name = self.odu.launch_workflow("create_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "create_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, content, order_id): |
| self.logger.info("Infra controller delete Enter") |
| items = self.db.get_one("k8sinfra_controller", {"_id": content["_id"]}) |
| |
| workflow_name = self.odu.launch_workflow("delete_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| items["state"] = "DELETED" |
| items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| items["state"] = "FAILED_DELETION" |
| items["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| items = self.update_operation_history(items, workflow_status, None) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "delete_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| items["resourceState"] = "READY" |
| else: |
| items["resourceState"] = "ERROR" |
| |
| items["operatingState"] = "IDLE" |
| items = self.update_operation_history(items, workflow_status, resource_status) |
| self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items) |
| |
| # To delete it from dB |
| if items["state"] == "DELETED": |
| self.db.del_one("k8sinfra_controller", {"_id": content["_id"]}) |
| return |
| |
| |
| class K8sInfraConfigLcm(LcmBase): |
| def __init__(self, msg, lcm_tasks, config): |
| """ |
| Init, Connect to database, filesystem storage, and messaging |
| :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| :return: None |
| """ |
| |
| self.logger = logging.getLogger("lcm.clusterlcm") |
| self.lcm_tasks = lcm_tasks |
| self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) |
| |
| super().__init__(msg, self.logger) |
| |
| async def create(self, content, order_id): |
| self.logger.info("Infra config Create Enter") |
| |
| workflow_name = self.odu.launch_workflow("create_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| content["state"] = "CREATED" |
| content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| content["state"] = "FAILED_CREATION" |
| content["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| content = self.update_operation_history(content, workflow_status, None) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| if workflow_status: |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "create_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| content["resourceState"] = "READY" |
| else: |
| content["resourceState"] = "ERROR" |
| |
| content["operatingState"] = "IDLE" |
| content = self.update_operation_history( |
| content, workflow_status, resource_status |
| ) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) |
| |
| return |
| |
| async def delete(self, content, order_id): |
| self.logger.info("Infra config delete Enter") |
| |
| workflow_name = self.odu.launch_workflow("delete_profile", content) |
| self.logger.info("workflow_name is :{}".format(workflow_name)) |
| items = self.db.get_one("k8sinfra_config", {"_id": content["_id"]}) |
| |
| workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) |
| self.logger.info( |
| "workflow_status is :{} and workflow_msg is :{}".format( |
| workflow_status, workflow_msg |
| ) |
| ) |
| if workflow_status: |
| items["state"] = "DELETED" |
| items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" |
| else: |
| items["state"] = "FAILED_DELETION" |
| items["resourceState"] = "ERROR" |
| # has to call update_operation_history return content |
| items = self.update_operation_history(items, workflow_status, None) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items) |
| |
| resource_status, resource_msg = self.odu.check_resource_status( |
| "delete_profile", content |
| ) |
| self.logger.info( |
| "resource_status is :{} and resource_msg is :{}".format( |
| resource_status, resource_msg |
| ) |
| ) |
| if resource_status: |
| items["resourceState"] = "READY" |
| else: |
| items["resourceState"] = "ERROR" |
| |
| items["operatingState"] = "IDLE" |
| items = self.update_operation_history(items, workflow_status, resource_status) |
| self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items) |
| |
| # To delete it from dB |
| if items["state"] == "DELETED": |
| self.db.del_one("k8sinfra_config", {"_id": content["_id"]}) |
| return |