From: rshri Date: Fri, 5 Jul 2024 15:11:55 +0000 (+0000) Subject: Feature 11023 - 11026 : Advanced cluster management X-Git-Tag: release-v16.0-start~3 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F75%2F14475%2F9;p=osm%2FLCM.git Feature 11023 - 11026 : Advanced cluster management Change-Id: Ibd042e6151048fd6b633e2acc29bc9d97e460fd2 Signed-off-by: rshri --- diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py new file mode 100644 index 0000000..4272cfd --- /dev/null +++ b/osm_lcm/k8s.py @@ -0,0 +1,857 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__author__ = ( + "Shrinithi R ", + "Shahithya Y ", +) + +import logging +from osm_lcm.lcm_utils import LcmBase +from copy import deepcopy +from osm_lcm import odu_workflows +from osm_lcm import vim_sdn + + +class ClusterLcm(LcmBase): + db_topic = "clusters" + + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.clusterlcm") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def create(self, content, order_id): + self.logger.info("cluster Create Enter") + + workflow_name = self.odu.launch_workflow("create_cluster", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + content["state"] = "CREATED" + content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + content["state"] = "FAILED_CREATION" + content["resourceState"] = "ERROR" + # has to call update_operation_history return content + content = self.update_operation_history(content, workflow_status, None) + self.db.set_one("clusters", {"_id": content["_id"]}, content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "create_cluster", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + content["resourceState"] = "READY" + else: + content["resourceState"] = "ERROR" + + content["operatingState"] = "IDLE" + content = self.update_operation_history( + content, workflow_status, resource_status + ) + self.db.set_one("clusters", {"_id": content["_id"]}, content) + self.profile_state(content, workflow_status, resource_status) + return + + def profile_state(self, content, workflow_status, resource_status): + profiles = [ + "infra_controller_profiles", + "infra_config_profiles", + "app_profiles", + "resource_profiles", + ] + profiles_collection = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "app_profiles": "k8sapp", + "resource_profiles": "k8sresource", + } + for profile_type in profiles: + profile_id = content[profile_type] + self.logger.info("profile id is : {}".format(profile_id)) + db_collection = profiles_collection[profile_type] + self.logger.info("the db_collection is :{}".format(db_collection)) + db_profile = self.db.get_one(db_collection, {"_id": profile_id}) + self.logger.info("the db_profile is :{}".format(db_profile)) + db_profile["state"] = content["state"] + db_profile["resourceState"] = content["resourceState"] + db_profile["operatingState"] = content["operatingState"] + db_profile = self.update_operation_history( + db_profile, workflow_status, resource_status + ) + self.logger.info("the db_profile is :{}".format(db_profile)) + self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile) + + async def delete(self, content, order_id): + self.logger.info("cluster delete Enter") + items = self.db.get_one("clusters", {"_id": content["_id"]}) + + workflow_name = self.odu.launch_workflow("delete_cluster", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + items["state"] = "DELETED" + items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + items["state"] = "FAILED_DELETION" + items["resourceState"] = "ERROR" + # has to call update_operation_history return content + items = self.update_operation_history(items, workflow_status, None) + self.db.set_one("clusters", {"_id": content["_id"]}, items) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "delete_cluster", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + items["resourceState"] = "READY" + else: + items["resourceState"] = "ERROR" + + items["operatingState"] = "IDLE" + items = self.update_operation_history(items, workflow_status, resource_status) + self.db.set_one("clusters", {"_id": content["_id"]}, items) + + # To delete it from dB + if items["state"] == "DELETED": + self.delete_cluster(content, order_id) + return + + def delete_cluster(self, content, order_id): + item_content = self.db.get_one("clusters", {"_id": content["_id"]}) + self.logger.info("1_the item_content is : {}".format(item_content)) + + self.logger.info("it is getting into if item_content state") + # detach profiles + update_dict = None + profiles_to_detach = [ + "infra_controller_profiles", + "infra_config_profiles", + "app_profiles", + "resource_profiles", + ] + profiles_collection = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "app_profiles": "k8sapp", + "resource_profiles": "k8sresource", + } + for profile_type in profiles_to_detach: + if item_content.get(profile_type): + self.logger.info("the profile_type is :{}".format(profile_type)) + profile_ids = item_content[profile_type] + self.logger.info("the profile_ids is :{}".format(profile_ids)) + profile_ids_copy = deepcopy(profile_ids) + self.logger.info("the profile_ids_copy is :{}".format(profile_ids_copy)) + for profile_id in profile_ids_copy: + self.logger.info("the profile_id is :{}".format(profile_id)) + db_collection = profiles_collection[profile_type] + self.logger.info("the db_collection is :{}".format(db_collection)) + db_profile = self.db.get_one(db_collection, {"_id": profile_id}) + self.logger.info("the db_profile is :{}".format(db_profile)) + self.logger.info( + "the item_content name is :{}".format(item_content["name"]) + ) + self.logger.info( + "the db_profile name is :{}".format(db_profile["name"]) + ) + if item_content["name"] == db_profile["name"]: + self.logger.info("it is getting into if default") + self.db.del_one(db_collection, {"_id": profile_id}) + else: + self.logger.info("it is getting into else non default") + profile_ids.remove(profile_id) + update_dict = {profile_type: profile_ids} + self.logger.info(f"the update dict is :{update_dict}") + self.db.set_one( + "clusters", {"_id": content["_id"]}, update_dict + ) + self.db.del_one("clusters", {"_id": item_content["_id"]}) + self.logger.info("the id is :{}".format(content["_id"])) + + async def add(self, content, order_id): + self.logger.info("profile attach Enter") + db_cluster = self.db.get_one("clusters", {"_id": content["_id"]}) + profile_type = content["profile_type"] + self.logger.info("profile type is : {}".format(profile_type)) + profile_id = content["profile_id"] + self.logger.info("profile id is : {}".format(profile_id)) + + workflow_name = self.odu.launch_workflow("attach_profile_to_cluster", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_cluster["resourceState"] = "ERROR" + # has to call update_operation_history return content + db_cluster = self.update_operation_history(db_cluster, workflow_status, None) + self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "attach_profile_to_cluster", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + db_cluster["resourceState"] = "READY" + else: + db_cluster["resourceState"] = "ERROR" + + db_cluster["operatingState"] = "IDLE" + db_cluster = self.update_operation_history( + db_cluster, workflow_status, resource_status + ) + profiles_collection = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "app_profiles": "k8sapp", + "resource_profiles": "k8sresource", + } + db_collection = profiles_collection[profile_type] + self.logger.info("db_collection is : {}".format(db_collection)) + profile_list = db_cluster[profile_type] + self.logger.info("profile list is : {}".format(profile_list)) + if resource_status: + self.logger.info("it is getting into resource status true") + profile_list.append(profile_id) + self.logger.info("profile list is : {}".format(profile_list)) + db_cluster[profile_type] = profile_list + self.logger.info("db cluster is : {}".format(db_cluster)) + # update_dict = {item: profile_list} + # self.logger.info("the update_dict is :{}".format(update_dict)) + # self.db.set_one(self.topic, filter_q, update_dict) + self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) + + return + + async def remove(self, content, order_id): + self.logger.info("profile dettach Enter") + db_cluster = self.db.get_one("clusters", {"_id": content["_id"]}) + profile_type = content["profile_type"] + self.logger.info("profile type is : {}".format(profile_type)) + profile_id = content["profile_id"] + self.logger.info("profile id is : {}".format(profile_id)) + + workflow_name = self.odu.launch_workflow("detach_profile_from_cluster", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + db_cluster["resourceState"] = "ERROR" + # has to call update_operation_history return content + db_cluster = self.update_operation_history(db_cluster, workflow_status, None) + self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "detach_profile_from_cluster", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + db_cluster["resourceState"] = "READY" + else: + db_cluster["resourceState"] = "ERROR" + + db_cluster["operatingState"] = "IDLE" + db_cluster = self.update_operation_history( + db_cluster, workflow_status, resource_status + ) + profiles_collection = { + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "app_profiles": "k8sapp", + "resource_profiles": "k8sresource", + } + db_collection = profiles_collection[profile_type] + self.logger.info("db_collection is : {}".format(db_collection)) + profile_list = db_cluster[profile_type] + self.logger.info("profile list is : {}".format(profile_list)) + if resource_status: + self.logger.info("it is getting into resource status true") + profile_list.remove(profile_id) + self.logger.info("profile list is : {}".format(profile_list)) + db_cluster[profile_type] = profile_list + self.logger.info("db cluster is : {}".format(db_cluster)) + # update_dict = {item: profile_list} + # self.logger.info("the update_dict is :{}".format(update_dict)) + # self.db.set_one(self.topic, filter_q, update_dict) + self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster) + + return + + async def register(self, content, order_id): + self.logger.info("cluster register enter") + + workflow_name = self.odu.launch_workflow("register_cluster", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + content["state"] = "CREATED" + content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + content["state"] = "FAILED_CREATION" + content["resourceState"] = "ERROR" + # has to call update_operation_history return content + content = self.update_operation_history(content, workflow_status, None) + self.db.set_one("clusters", {"_id": content["_id"]}, content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "register_cluster", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + content["resourceState"] = "READY" + else: + content["resourceState"] = "ERROR" + + content["operatingState"] = "IDLE" + content = self.update_operation_history( + content, workflow_status, resource_status + ) + self.db.set_one("clusters", {"_id": content["_id"]}, content) + self.profile_state(content, workflow_status, resource_status) + return + + async def deregister(self, content, order_id): + self.logger.info("cluster deregister enter") + + items = self.db.get_one("clusters", {"_id": content["_id"]}) + self.logger.info("the items is : {}".format(items)) + + workflow_name = self.odu.launch_workflow("deregister_cluster", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + items["state"] = "DELETED" + items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + items["state"] = "FAILED_DELETION" + items["resourceState"] = "ERROR" + # has to call update_operation_history return content + items = self.update_operation_history(items, workflow_status, None) + self.db.set_one("clusters", {"_id": content["_id"]}, items) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "deregister_cluster", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + items["resourceState"] = "READY" + else: + items["resourceState"] = "ERROR" + + items["operatingState"] = "IDLE" + items = self.update_operation_history(items, workflow_status, resource_status) + self.db.set_one("clusters", {"_id": content["_id"]}, items) + + # To delete it from dB + if items["state"] == "DELETED": + self.db.del_one("clusters", {"_id": items["_id"]}) + return + + +class K8sAppLcm(LcmBase): + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.clusterlcm") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def create(self, content, order_id): + self.logger.info("App Create Enter") + + workflow_name = self.odu.launch_workflow("create_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + content["state"] = "CREATED" + content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + content["state"] = "FAILED_CREATION" + content["resourceState"] = "ERROR" + # has to call update_operation_history return content + content = self.update_operation_history(content, workflow_status, None) + self.db.set_one("k8sapp", {"_id": content["_id"]}, content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "create_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + content["resourceState"] = "READY" + else: + content["resourceState"] = "ERROR" + + content["operatingState"] = "IDLE" + content = self.update_operation_history( + content, workflow_status, resource_status + ) + self.db.set_one("k8sapp", {"_id": content["_id"]}, content) + + return + + async def delete(self, content, order_id): + self.logger.info("App delete Enter") + items = self.db.get_one("k8sapp", {"_id": content["_id"]}) + + workflow_name = self.odu.launch_workflow("delete_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + items["state"] = "DELETED" + items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + items["state"] = "FAILED_DELETION" + items["resourceState"] = "ERROR" + # has to call update_operation_history return content + items = self.update_operation_history(items, workflow_status, None) + self.db.set_one("k8sapp", {"_id": content["_id"]}, content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "delete_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + items["resourceState"] = "READY" + else: + items["resourceState"] = "ERROR" + + items["operatingState"] = "IDLE" + items = self.update_operation_history(items, workflow_status, resource_status) + self.db.set_one("k8sapp", {"_id": content["_id"]}, items) + + # To delete it from dB + if items["state"] == "DELETED": + self.db.del_one("k8sapp", {"_id": content["_id"]}) + return + + +class K8sResourceLcm(LcmBase): + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.clusterlcm") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def create(self, content, order_id): + self.logger.info("Resource Create Enter") + + workflow_name = self.odu.launch_workflow("create_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + content["state"] = "CREATED" + content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + content["state"] = "FAILED_CREATION" + content["resourceState"] = "ERROR" + # has to call update_operation_history return content + content = self.update_operation_history(content, workflow_status, None) + self.db.set_one("k8sresource", {"_id": content["_id"]}, content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "create_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + content["resourceState"] = "READY" + else: + content["resourceState"] = "ERROR" + + content["operatingState"] = "IDLE" + content = self.update_operation_history( + content, workflow_status, resource_status + ) + self.db.set_one("k8sresource", {"_id": content["_id"]}, content) + + return + + async def delete(self, content, order_id): + self.logger.info("Resource delete Enter") + items = self.db.get_one("k8sresource", {"_id": content["_id"]}) + + workflow_name = self.odu.launch_workflow("delete_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + items["state"] = "DELETED" + items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + items["state"] = "FAILED_DELETION" + items["resourceState"] = "ERROR" + # has to call update_operation_history return content + items = self.update_operation_history(items, workflow_status, None) + self.db.set_one("k8sresource", {"_id": content["_id"]}, items) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "delete_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + items["resourceState"] = "READY" + else: + items["resourceState"] = "ERROR" + + items["operatingState"] = "IDLE" + items = self.update_operation_history(items, workflow_status, resource_status) + self.db.set_one("k8sresource", {"_id": content["_id"]}, items) + + # To delete it from dB + if items["state"] == "DELETED": + self.db.del_one("k8sresource", {"_id": content["_id"]}) + return + + +class K8sInfraControllerLcm(LcmBase): + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.clusterlcm") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def create(self, content, order_id): + self.logger.info("Infra controller Create Enter") + + workflow_name = self.odu.launch_workflow("create_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + content["state"] = "CREATED" + content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + content["state"] = "FAILED_CREATION" + content["resourceState"] = "ERROR" + # has to call update_operation_history return content + content = self.update_operation_history(content, workflow_status, None) + self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "create_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + content["resourceState"] = "READY" + else: + content["resourceState"] = "ERROR" + + content["operatingState"] = "IDLE" + content = self.update_operation_history( + content, workflow_status, resource_status + ) + self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content) + + return + + async def delete(self, content, order_id): + self.logger.info("Infra controller delete Enter") + items = self.db.get_one("k8sinfra_controller", {"_id": content["_id"]}) + + workflow_name = self.odu.launch_workflow("delete_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + items["state"] = "DELETED" + items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + items["state"] = "FAILED_DELETION" + items["resourceState"] = "ERROR" + # has to call update_operation_history return content + items = self.update_operation_history(items, workflow_status, None) + self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "delete_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + items["resourceState"] = "READY" + else: + items["resourceState"] = "ERROR" + + items["operatingState"] = "IDLE" + items = self.update_operation_history(items, workflow_status, resource_status) + self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items) + + # To delete it from dB + if items["state"] == "DELETED": + self.db.del_one("k8sinfra_controller", {"_id": content["_id"]}) + return + + +class K8sInfraConfigLcm(LcmBase): + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.clusterlcm") + self.lcm_tasks = lcm_tasks + self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config) + + super().__init__(msg, self.logger) + + async def create(self, content, order_id): + self.logger.info("Infra config Create Enter") + + workflow_name = self.odu.launch_workflow("create_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + content["state"] = "CREATED" + content["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + content["state"] = "FAILED_CREATION" + content["resourceState"] = "ERROR" + # has to call update_operation_history return content + content = self.update_operation_history(content, workflow_status, None) + self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) + + if workflow_status: + resource_status, resource_msg = self.odu.check_resource_status( + "create_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + content["resourceState"] = "READY" + else: + content["resourceState"] = "ERROR" + + content["operatingState"] = "IDLE" + content = self.update_operation_history( + content, workflow_status, resource_status + ) + self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content) + + return + + async def delete(self, content, order_id): + self.logger.info("Infra config delete Enter") + + workflow_name = self.odu.launch_workflow("delete_profile", content) + self.logger.info("workflow_name is :{}".format(workflow_name)) + items = self.db.get_one("k8sinfra_config", {"_id": content["_id"]}) + + workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name) + self.logger.info( + "workflow_status is :{} and workflow_msg is :{}".format( + workflow_status, workflow_msg + ) + ) + if workflow_status: + items["state"] = "DELETED" + items["resourceState"] = "IN_PROGRESS.GIT_SYNCED" + else: + items["state"] = "FAILED_DELETION" + items["resourceState"] = "ERROR" + # has to call update_operation_history return content + items = self.update_operation_history(items, workflow_status, None) + self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items) + + resource_status, resource_msg = self.odu.check_resource_status( + "delete_profile", content + ) + self.logger.info( + "resource_status is :{} and resource_msg is :{}".format( + resource_status, resource_msg + ) + ) + if resource_status: + items["resourceState"] = "READY" + else: + items["resourceState"] = "ERROR" + + items["operatingState"] = "IDLE" + items = self.update_operation_history(items, workflow_status, resource_status) + self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items) + + # To delete it from dB + if items["state"] == "DELETED": + self.db.del_one("k8sinfra_config", {"_id": content["_id"]}) + return diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 1c81da1..9d711ea 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -30,7 +30,7 @@ import getopt import sys from random import SystemRandom -from osm_lcm import ns, vim_sdn, netslice +from osm_lcm import ns, vim_sdn, netslice, k8s from osm_lcm.ng_ro import NgRoException, NgRoClient from osm_lcm.ROclient import ROClient, ROClientException @@ -100,7 +100,21 @@ class Lcm: self.netslice ) = ( self.vim - ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None + ) = ( + self.wim + ) = ( + self.sdn + ) = ( + self.k8scluster + ) = ( + self.vca + ) = ( + self.k8srepo + ) = ( + self.cluster + ) = ( + self.k8s_app + ) = self.k8s_resource = self.k8s_infra_controller = self.k8s_infra_config = None # logging log_format_simple = ( @@ -193,6 +207,12 @@ class Lcm: # contains created tasks/futures to be able to cancel self.lcm_tasks = TaskRegistry(self.worker_id, self.logger) + self.logger.info( + "Worker_id: {} main_config: {} lcm tasks: {}".format( + self.worker_id, self.main_config, self.lcm_tasks + ) + ) + async def check_RO_version(self): tries = 14 last_error = None @@ -297,7 +317,11 @@ class Lcm: async def kafka_read_callback(self, topic, command, params): order_id = 1 - + self.logger.info( + "Topic: {} command: {} params: {} order ID: {}".format( + topic, command, params, order_id + ) + ) if topic != "admin" and command != "ping": self.logger.debug( "Task kafka_read receives {} {}: {}".format(topic, command, params) @@ -305,6 +329,11 @@ class Lcm: self.consecutive_errors = 0 self.first_start = False order_id += 1 + self.logger.info( + "Consecutive error: {} First start: {}".format( + self.consecutive_errors, self.first_start + ) + ) if command == "exit": raise LcmExceptionExit elif command.startswith("#"): @@ -432,9 +461,15 @@ class Lcm: elif topic == "ns": if command == "instantiate": # self.logger.debug("Deploying NS {}".format(nsr_id)) + self.logger.info("NS instantiate") nslcmop = params nslcmop_id = nslcmop["_id"] nsr_id = nslcmop["nsInstanceId"] + self.logger.info( + "NsLCMOP: {} NsLCMOP_ID:{} nsr_id: {}".format( + nslcmop, nslcmop_id, nsr_id + ) + ) task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) self.lcm_tasks.register( "ns", nsr_id, nslcmop_id, "ns_instantiate", task @@ -531,6 +566,7 @@ class Lcm: "actioned", "updated", "migrated", + "verticalscaled", ): # "scaled-cooldown-time" return @@ -592,8 +628,13 @@ class Lcm: return elif topic == "vim_account": vim_id = params["_id"] + self.logger.info("Vim_ID: {}".format(vim_id)) if command in ("create", "created"): + self.logger.info("Command : {}".format(command)) + # "self.logger.info("Main config: {}".format(main_config))" + # "self.logger.info("Main config RO: {}".format(main_config.RO.ng))"" if not self.main_config.RO.ng: + self.logger.info("Vim create") task = asyncio.ensure_future(self.vim.create(params, order_id)) self.lcm_tasks.register( "vim_account", vim_id, order_id, "vim_create", task @@ -667,6 +708,161 @@ class Lcm: return elif command == "deleted": return # TODO cleaning of task just in case should be done + elif topic == "cluster": + if command == "create" or command == "created": + operation_id = params["operation_id"] + cluster_id = params["_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + # cluster_id = params.get("_id") + self.logger.debug("operation_id = {}".format(operation_id)) + self.logger.debug("cluster_id = {}".format(cluster_id)) + self.logger.debug("cluster_db = {}".format(db_cluster)) + task = asyncio.ensure_future(self.cluster.create(params, order_id)) + self.lcm_tasks.register( + "cluster", cluster_id, order_id, "cluster_create", task + ) + return + elif command == "delete" or command == "deleted": + # cluster_id = params.get("_id") + operation_id = params["operation_id"] + cluster_id = params["_id"] + db_cluster = self.db.get_one("clusters", {"_id": cluster_id}) + task = asyncio.ensure_future(self.cluster.delete(params, order_id)) + self.lcm_tasks.register( + "cluster", cluster_id, order_id, "cluster_delete", task + ) + return + elif command == "add" or command == "added": + add_id = params.get("_id") + task = asyncio.ensure_future(self.cluster.add(params, order_id)) + self.lcm_tasks.register( + "cluster", add_id, order_id, "profile_add", task + ) + return + elif command == "remove" or command == "removed": + remove_id = params.get("_id") + task = asyncio.ensure_future(self.cluster.remove(params, order_id)) + self.lcm_tasks.register( + "cluster", remove_id, order_id, "profile_remove", task + ) + return + elif command == "register" or command == "registered": + cluster_id = params.get("_id") + task = asyncio.ensure_future(self.cluster.register(params, order_id)) + self.lcm_tasks.register( + "cluster", cluster_id, order_id, "cluster_register", task + ) + return + elif command == "deregister" or command == "deregistered": + cluster_id = params.get("_id") + task = asyncio.ensure_future(self.cluster.deregister(params, order_id)) + self.lcm_tasks.register( + "cluster", cluster_id, order_id, "cluster_deregister", task + ) + return + elif topic == "k8s_app": + if command == "profile_create" or command == "profile_created": + k8s_app_id = params.get("_id") + self.logger.debug("k8s_app_id = {}".format(k8s_app_id)) + task = asyncio.ensure_future(self.k8s_app.create(params, order_id)) + self.lcm_tasks.register( + "k8s_app", k8s_app_id, order_id, "k8s_app_create", task + ) + return + elif command == "delete" or command == "deleted": + k8s_app_id = params.get("_id") + task = asyncio.ensure_future(self.k8s_app.delete(params, order_id)) + self.lcm_tasks.register( + "k8s_app", k8s_app_id, order_id, "k8s_app_delete", task + ) + return + elif topic == "k8s_resource": + if command == "profile_create" or command == "profile_created": + k8s_resource_id = params.get("_id") + self.logger.debug("k8s_resource_id = {}".format(k8s_resource_id)) + task = asyncio.ensure_future(self.k8s_resource.create(params, order_id)) + self.lcm_tasks.register( + "k8s_resource", + k8s_resource_id, + order_id, + "k8s_resource_create", + task, + ) + return + elif command == "delete" or command == "deleted": + k8s_resource_id = params.get("_id") + task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id)) + self.lcm_tasks.register( + "k8s_resource", + k8s_resource_id, + order_id, + "k8s_resource_delete", + task, + ) + return + + elif topic == "k8s_infra_controller": + if command == "profile_create" or command == "profile_created": + k8s_infra_controller_id = params.get("_id") + self.logger.debug( + "k8s_infra_controller_id = {}".format(k8s_infra_controller_id) + ) + task = asyncio.ensure_future( + self.k8s_infra_controller.create(params, order_id) + ) + self.lcm_tasks.register( + "k8s_infra_controller", + k8s_infra_controller_id, + order_id, + "k8s_infra_controller_create", + task, + ) + return + elif command == "delete" or command == "deleted": + k8s_infra_controller_id = params.get("_id") + task = asyncio.ensure_future( + self.k8s_infra_controller.delete(params, order_id) + ) + self.lcm_tasks.register( + "k8s_infra_controller", + k8s_infra_controller_id, + order_id, + "k8s_infra_controller_delete", + task, + ) + return + + elif topic == "k8s_infra_config": + if command == "profile_create" or command == "profile_created": + k8s_infra_config_id = params.get("_id") + self.logger.debug( + "k8s_infra_config_id = {}".format(k8s_infra_config_id) + ) + task = asyncio.ensure_future( + self.k8s_infra_config.create(params, order_id) + ) + self.lcm_tasks.register( + "k8s_infra_config", + k8s_infra_config_id, + order_id, + "k8s_infra_config_create", + task, + ) + return + elif command == "delete" or command == "deleted": + k8s_infra_config_id = params.get("_id") + task = asyncio.ensure_future( + self.k8s_infra_config.delete(params, order_id) + ) + self.lcm_tasks.register( + "k8s_infra_config", + k8s_infra_config_id, + order_id, + "k8s_infra_config_delete", + task, + ) + return + self.logger.critical("unknown topic {} and command '{}'".format(topic, command)) async def kafka_read(self): @@ -675,6 +871,11 @@ class Lcm: ) self.consecutive_errors = 0 self.first_start = True + self.logger.info( + "Consecutive errors: {} first start: {}".format( + self.consecutive_errors, self.first_start + ) + ) while self.consecutive_errors < 10: try: topics = ( @@ -688,6 +889,16 @@ class Lcm: "k8srepo", "pla", "nslcmops", + "cluster", + "k8s_app", + "k8s_resource", + "k8s_infra_controller", + "k8s_infra_config", + ) + self.logger.info( + "Consecutive errors: {} first start: {}".format( + self.consecutive_errors, self.first_start + ) ) topics_admin = ("admin",) await asyncio.gather( @@ -729,6 +940,7 @@ class Lcm: await asyncio.gather(self.kafka_read(), self.kafka_ping()) async def start(self): + self.logger.info("Start LCM") # check RO version await self.check_RO_version() @@ -747,6 +959,27 @@ class Lcm: self.k8srepo = vim_sdn.K8sRepoLcm( self.msg, self.lcm_tasks, self.main_config.to_dict() ) + self.cluster = k8s.ClusterLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.k8s_app = k8s.K8sAppLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.k8s_resource = k8s.K8sResourceLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.k8s_infra_controller = k8s.K8sInfraControllerLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + self.k8s_infra_config = k8s.K8sInfraConfigLcm( + self.msg, self.lcm_tasks, self.main_config.to_dict() + ) + + self.logger.info( + "Msg: {} lcm tasks: {} main config: {}".format( + self.msg, self.lcm_tasks, self.main_config + ) + ) await self.kafka_read_ping() diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index af460d2..d1b310b 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -221,11 +221,47 @@ class LcmBase: return now = time() _desc["_admin.modified"] = now + self.logger.info("Desc: {} Item: {} _id: {}".format(_desc, item, _id)) self.db.set_one(item, {"_id": _id}, _desc) _desc.clear() # except DbException as e: # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e)) + def update_operation_history( + self, content, workflow_status=None, resource_status=None + ): + self.logger.info("Update Operation History in LcmBase") + self.logger.info( + "Content: {} Workflow Status: {} Resource Status: {}".format( + content, workflow_status, resource_status + ) + ) + + op_id = content["current_operation"] + self.logger.info("OP_id: {}".format(op_id)) + length = 0 + for op in content["operationHistory"]: + self.logger.info("Operations: {}".format(op)) + if op["op_id"] == op_id: + self.logger.info("Length: {}".format(length)) + now = time() + if workflow_status: + content["operationHistory"][length]["workflowState"] = "COMPLETED" + else: + content["operationHistory"][length]["workflowState"] = "ERROR" + + if resource_status: + content["operationHistory"][length]["resourceState"] = "READY" + else: + content["operationHistory"][length]["resourceState"] = "NOT_READY" + + content["operationHistory"][length]["endDate"] = now + break + length += 1 + self.logger.info("content: {}".format(content)) + + return content + @staticmethod def calculate_charm_hash(zipped_file): """Calculate the hash of charm files which ends with .charm @@ -489,7 +525,19 @@ class TaskRegistry(LcmBase): # NS/NSI: "services" VIM/WIM/SDN: "accounts" topic_service_list = ["ns", "nsi"] - topic_account_list = ["vim", "wim", "sdn", "k8scluster", "vca", "k8srepo"] + topic_account_list = [ + "vim", + "wim", + "sdn", + "k8scluster", + "vca", + "k8srepo", + "cluster", + "k8s_app", + "k8s_resource", + "k8s_infra_controller", + "k8s_infra_config", + ] # Map topic to InstanceID topic2instid_dict = {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"} @@ -504,6 +552,11 @@ class TaskRegistry(LcmBase): "k8scluster": "k8sclusters", "vca": "vca", "k8srepo": "k8srepos", + "cluster": "k8sclusters", + "k8s_app": "k8sapp", + "k8s_resource": "k8sresource", + "k8s_infra_controller": "k8sinfra_controller", + "k8s_infra_config": "k8sinfra_config", } def __init__(self, worker_id=None, logger=None): @@ -516,10 +569,17 @@ class TaskRegistry(LcmBase): "k8scluster": {}, "vca": {}, "k8srepo": {}, + "cluster": {}, + "k8s_app": {}, + "k8s_resource": {}, + "k8s_infra_controller": {}, + "k8s_infra_config": {}, + "odu": {}, } self.worker_id = worker_id self.db = Database().instance.db self.logger = logger + # self.logger.info("Task registry: {}".format(self.task_registry)) def register(self, topic, _id, op_id, task_name, task): """ @@ -531,12 +591,18 @@ class TaskRegistry(LcmBase): :param task: Task class :return: none """ + self.logger.info( + "topic : {}, _id:{}, op_id:{}, taskname:{}, task:{}".format( + topic, _id, op_id, task_name, task + ) + ) if _id not in self.task_registry[topic]: self.task_registry[topic][_id] = OrderedDict() if op_id not in self.task_registry[topic][_id]: self.task_registry[topic][_id][op_id] = {task_name: task} else: self.task_registry[topic][_id][op_id][task_name] = task + self.logger.info("Task resgistry: {}".format(self.task_registry)) # print("registering task", topic, _id, op_id, task_name, task) def remove(self, topic, _id, op_id, task_name=None): @@ -713,12 +779,16 @@ class TaskRegistry(LcmBase): """ # Backward compatibility for VIM/WIM/SDN/k8scluster without op_id + self.logger.info("Lock_HA") if self._is_account_type_HA(topic) and op_id is None: return True # Try to lock this task db_table_name = self.topic2dbtable_dict[topic] q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id) + self.logger.info( + "db table name: {} update dict: {}".format(db_table_name, update_dict) + ) db_lock_task = self.db.set_one( db_table_name, q_filter=q_filter, @@ -759,7 +829,7 @@ class TaskRegistry(LcmBase): :param op_id: Account ID + ':' + Operation Index :return: nothing """ - + self.logger.info("Unlock HA") # Backward compatibility if not self._is_account_type_HA(topic) or not op_id: return @@ -767,7 +837,7 @@ class TaskRegistry(LcmBase): # Get Account ID and Operation Index account_id, op_index = self._get_account_and_op_HA(op_id) db_table_name = self.topic2dbtable_dict[topic] - + self.logger.info("db_table_name: {}".format(db_table_name)) # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED) # If the account exist, register the HA task. # Update DB for HA tasks @@ -778,6 +848,7 @@ class TaskRegistry(LcmBase): "_admin.operations.{}.worker".format(op_index): None, "_admin.current_operation": None, } + self.logger.info("Update dict: {}".format(update_dict)) self.db.set_one( db_table_name, q_filter=q_filter, @@ -834,6 +905,7 @@ class TaskRegistry(LcmBase): step = "Waiting for {} related tasks to be completed.".format( new_num_related_tasks ) + self.logger.info("{}".format(step)) update_dict = {} q_filter = {"_id": _id} # NS/NSI diff --git a/osm_lcm/odu_workflows.py b/osm_lcm/odu_workflows.py new file mode 100644 index 0000000..273bf01 --- /dev/null +++ b/osm_lcm/odu_workflows.py @@ -0,0 +1,48 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +import logging +from osm_lcm.lcm_utils import LcmBase + + +class OduWorkflow(LcmBase): + def __init__(self, msg, lcm_tasks, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.odu") + self.lcm_tasks = lcm_tasks + self.logger.info("Msg: {} lcm_tasks: {} ".format(msg, lcm_tasks)) + + super().__init__(msg, self.logger) + + def launch_workflow(self, key, content): + self.logger.info( + f"Workflow is getting into launch. Key: {key}. Content: {content}" + ) + return f"workflow-{key}-{content['_id']}" + + def check_workflow_status(self, workflow_name): + self.logger.info(f"Check workflow status {workflow_name}") + return True, "OK" + + def check_resource_status(self, key, content): + self.logger.info(f"Check resource status {key}: {content}") + return True, "OK"