--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__author__ = (
+ "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
+ "Shahithya Y <shahithya.y@tataelxsi.co.in>",
+)
+
+import logging
+from osm_lcm.lcm_utils import LcmBase
+from copy import deepcopy
+from osm_lcm import odu_workflows
+from osm_lcm import vim_sdn
+
+
+class ClusterLcm(LcmBase):
+ db_topic = "clusters"
+
+ def __init__(self, msg, lcm_tasks, config):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.clusterlcm")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+ self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
+
+ super().__init__(msg, self.logger)
+
+ async def create(self, content, order_id):
+ self.logger.info("cluster Create Enter")
+
+ workflow_name = self.odu.launch_workflow("create_cluster", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ content["state"] = "CREATED"
+ content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ content["state"] = "FAILED_CREATION"
+ content["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ content = self.update_operation_history(content, workflow_status, None)
+ self.db.set_one("clusters", {"_id": content["_id"]}, content)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "create_cluster", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ content["resourceState"] = "READY"
+ else:
+ content["resourceState"] = "ERROR"
+
+ content["operatingState"] = "IDLE"
+ content = self.update_operation_history(
+ content, workflow_status, resource_status
+ )
+ self.db.set_one("clusters", {"_id": content["_id"]}, content)
+ self.profile_state(content, workflow_status, resource_status)
+ return
+
+ def profile_state(self, content, workflow_status, resource_status):
+ profiles = [
+ "infra_controller_profiles",
+ "infra_config_profiles",
+ "app_profiles",
+ "resource_profiles",
+ ]
+ profiles_collection = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "app_profiles": "k8sapp",
+ "resource_profiles": "k8sresource",
+ }
+ for profile_type in profiles:
+ profile_id = content[profile_type]
+ self.logger.info("profile id is : {}".format(profile_id))
+ db_collection = profiles_collection[profile_type]
+ self.logger.info("the db_collection is :{}".format(db_collection))
+ db_profile = self.db.get_one(db_collection, {"_id": profile_id})
+ self.logger.info("the db_profile is :{}".format(db_profile))
+ db_profile["state"] = content["state"]
+ db_profile["resourceState"] = content["resourceState"]
+ db_profile["operatingState"] = content["operatingState"]
+ db_profile = self.update_operation_history(
+ db_profile, workflow_status, resource_status
+ )
+ self.logger.info("the db_profile is :{}".format(db_profile))
+ self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
+
+ async def delete(self, content, order_id):
+ self.logger.info("cluster delete Enter")
+ items = self.db.get_one("clusters", {"_id": content["_id"]})
+
+ workflow_name = self.odu.launch_workflow("delete_cluster", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ items["state"] = "DELETED"
+ items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ items["state"] = "FAILED_DELETION"
+ items["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ items = self.update_operation_history(items, workflow_status, None)
+ self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "delete_cluster", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ items["resourceState"] = "READY"
+ else:
+ items["resourceState"] = "ERROR"
+
+ items["operatingState"] = "IDLE"
+ items = self.update_operation_history(items, workflow_status, resource_status)
+ self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+ # To delete it from dB
+ if items["state"] == "DELETED":
+ self.delete_cluster(content, order_id)
+ return
+
+ def delete_cluster(self, content, order_id):
+ item_content = self.db.get_one("clusters", {"_id": content["_id"]})
+ self.logger.info("1_the item_content is : {}".format(item_content))
+
+ self.logger.info("it is getting into if item_content state")
+ # detach profiles
+ update_dict = None
+ profiles_to_detach = [
+ "infra_controller_profiles",
+ "infra_config_profiles",
+ "app_profiles",
+ "resource_profiles",
+ ]
+ profiles_collection = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "app_profiles": "k8sapp",
+ "resource_profiles": "k8sresource",
+ }
+ for profile_type in profiles_to_detach:
+ if item_content.get(profile_type):
+ self.logger.info("the profile_type is :{}".format(profile_type))
+ profile_ids = item_content[profile_type]
+ self.logger.info("the profile_ids is :{}".format(profile_ids))
+ profile_ids_copy = deepcopy(profile_ids)
+ self.logger.info("the profile_ids_copy is :{}".format(profile_ids_copy))
+ for profile_id in profile_ids_copy:
+ self.logger.info("the profile_id is :{}".format(profile_id))
+ db_collection = profiles_collection[profile_type]
+ self.logger.info("the db_collection is :{}".format(db_collection))
+ db_profile = self.db.get_one(db_collection, {"_id": profile_id})
+ self.logger.info("the db_profile is :{}".format(db_profile))
+ self.logger.info(
+ "the item_content name is :{}".format(item_content["name"])
+ )
+ self.logger.info(
+ "the db_profile name is :{}".format(db_profile["name"])
+ )
+ if item_content["name"] == db_profile["name"]:
+ self.logger.info("it is getting into if default")
+ self.db.del_one(db_collection, {"_id": profile_id})
+ else:
+ self.logger.info("it is getting into else non default")
+ profile_ids.remove(profile_id)
+ update_dict = {profile_type: profile_ids}
+ self.logger.info(f"the update dict is :{update_dict}")
+ self.db.set_one(
+ "clusters", {"_id": content["_id"]}, update_dict
+ )
+ self.db.del_one("clusters", {"_id": item_content["_id"]})
+ self.logger.info("the id is :{}".format(content["_id"]))
+
+ async def add(self, content, order_id):
+ self.logger.info("profile attach Enter")
+ db_cluster = self.db.get_one("clusters", {"_id": content["_id"]})
+ profile_type = content["profile_type"]
+ self.logger.info("profile type is : {}".format(profile_type))
+ profile_id = content["profile_id"]
+ self.logger.info("profile id is : {}".format(profile_id))
+
+ workflow_name = self.odu.launch_workflow("attach_profile_to_cluster", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ db_cluster["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
+ self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "attach_profile_to_cluster", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ db_cluster["resourceState"] = "READY"
+ else:
+ db_cluster["resourceState"] = "ERROR"
+
+ db_cluster["operatingState"] = "IDLE"
+ db_cluster = self.update_operation_history(
+ db_cluster, workflow_status, resource_status
+ )
+ profiles_collection = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "app_profiles": "k8sapp",
+ "resource_profiles": "k8sresource",
+ }
+ db_collection = profiles_collection[profile_type]
+ self.logger.info("db_collection is : {}".format(db_collection))
+ profile_list = db_cluster[profile_type]
+ self.logger.info("profile list is : {}".format(profile_list))
+ if resource_status:
+ self.logger.info("it is getting into resource status true")
+ profile_list.append(profile_id)
+ self.logger.info("profile list is : {}".format(profile_list))
+ db_cluster[profile_type] = profile_list
+ self.logger.info("db cluster is : {}".format(db_cluster))
+ # update_dict = {item: profile_list}
+ # self.logger.info("the update_dict is :{}".format(update_dict))
+ # self.db.set_one(self.topic, filter_q, update_dict)
+ self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+ return
+
+ async def remove(self, content, order_id):
+ self.logger.info("profile dettach Enter")
+ db_cluster = self.db.get_one("clusters", {"_id": content["_id"]})
+ profile_type = content["profile_type"]
+ self.logger.info("profile type is : {}".format(profile_type))
+ profile_id = content["profile_id"]
+ self.logger.info("profile id is : {}".format(profile_id))
+
+ workflow_name = self.odu.launch_workflow("detach_profile_from_cluster", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ db_cluster["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
+ self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "detach_profile_from_cluster", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ db_cluster["resourceState"] = "READY"
+ else:
+ db_cluster["resourceState"] = "ERROR"
+
+ db_cluster["operatingState"] = "IDLE"
+ db_cluster = self.update_operation_history(
+ db_cluster, workflow_status, resource_status
+ )
+ profiles_collection = {
+ "infra_controller_profiles": "k8sinfra_controller",
+ "infra_config_profiles": "k8sinfra_config",
+ "app_profiles": "k8sapp",
+ "resource_profiles": "k8sresource",
+ }
+ db_collection = profiles_collection[profile_type]
+ self.logger.info("db_collection is : {}".format(db_collection))
+ profile_list = db_cluster[profile_type]
+ self.logger.info("profile list is : {}".format(profile_list))
+ if resource_status:
+ self.logger.info("it is getting into resource status true")
+ profile_list.remove(profile_id)
+ self.logger.info("profile list is : {}".format(profile_list))
+ db_cluster[profile_type] = profile_list
+ self.logger.info("db cluster is : {}".format(db_cluster))
+ # update_dict = {item: profile_list}
+ # self.logger.info("the update_dict is :{}".format(update_dict))
+ # self.db.set_one(self.topic, filter_q, update_dict)
+ self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+ return
+
+ async def register(self, content, order_id):
+ self.logger.info("cluster register enter")
+
+ workflow_name = self.odu.launch_workflow("register_cluster", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ content["state"] = "CREATED"
+ content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ content["state"] = "FAILED_CREATION"
+ content["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ content = self.update_operation_history(content, workflow_status, None)
+ self.db.set_one("clusters", {"_id": content["_id"]}, content)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "register_cluster", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ content["resourceState"] = "READY"
+ else:
+ content["resourceState"] = "ERROR"
+
+ content["operatingState"] = "IDLE"
+ content = self.update_operation_history(
+ content, workflow_status, resource_status
+ )
+ self.db.set_one("clusters", {"_id": content["_id"]}, content)
+ self.profile_state(content, workflow_status, resource_status)
+ return
+
+ async def deregister(self, content, order_id):
+ self.logger.info("cluster deregister enter")
+
+ items = self.db.get_one("clusters", {"_id": content["_id"]})
+ self.logger.info("the items is : {}".format(items))
+
+ workflow_name = self.odu.launch_workflow("deregister_cluster", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ items["state"] = "DELETED"
+ items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ items["state"] = "FAILED_DELETION"
+ items["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ items = self.update_operation_history(items, workflow_status, None)
+ self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "deregister_cluster", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ items["resourceState"] = "READY"
+ else:
+ items["resourceState"] = "ERROR"
+
+ items["operatingState"] = "IDLE"
+ items = self.update_operation_history(items, workflow_status, resource_status)
+ self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+ # To delete it from dB
+ if items["state"] == "DELETED":
+ self.db.del_one("clusters", {"_id": items["_id"]})
+ return
+
+
+class K8sAppLcm(LcmBase):
+ def __init__(self, msg, lcm_tasks, config):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.clusterlcm")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+ super().__init__(msg, self.logger)
+
+ async def create(self, content, order_id):
+ self.logger.info("App Create Enter")
+
+ workflow_name = self.odu.launch_workflow("create_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ content["state"] = "CREATED"
+ content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ content["state"] = "FAILED_CREATION"
+ content["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ content = self.update_operation_history(content, workflow_status, None)
+ self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "create_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ content["resourceState"] = "READY"
+ else:
+ content["resourceState"] = "ERROR"
+
+ content["operatingState"] = "IDLE"
+ content = self.update_operation_history(
+ content, workflow_status, resource_status
+ )
+ self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
+ return
+
+ async def delete(self, content, order_id):
+ self.logger.info("App delete Enter")
+ items = self.db.get_one("k8sapp", {"_id": content["_id"]})
+
+ workflow_name = self.odu.launch_workflow("delete_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ items["state"] = "DELETED"
+ items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ items["state"] = "FAILED_DELETION"
+ items["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ items = self.update_operation_history(items, workflow_status, None)
+ self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "delete_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ items["resourceState"] = "READY"
+ else:
+ items["resourceState"] = "ERROR"
+
+ items["operatingState"] = "IDLE"
+ items = self.update_operation_history(items, workflow_status, resource_status)
+ self.db.set_one("k8sapp", {"_id": content["_id"]}, items)
+
+ # To delete it from dB
+ if items["state"] == "DELETED":
+ self.db.del_one("k8sapp", {"_id": content["_id"]})
+ return
+
+
+class K8sResourceLcm(LcmBase):
+ def __init__(self, msg, lcm_tasks, config):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.clusterlcm")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+ super().__init__(msg, self.logger)
+
+ async def create(self, content, order_id):
+ self.logger.info("Resource Create Enter")
+
+ workflow_name = self.odu.launch_workflow("create_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ content["state"] = "CREATED"
+ content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ content["state"] = "FAILED_CREATION"
+ content["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ content = self.update_operation_history(content, workflow_status, None)
+ self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "create_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ content["resourceState"] = "READY"
+ else:
+ content["resourceState"] = "ERROR"
+
+ content["operatingState"] = "IDLE"
+ content = self.update_operation_history(
+ content, workflow_status, resource_status
+ )
+ self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+
+ return
+
+ async def delete(self, content, order_id):
+ self.logger.info("Resource delete Enter")
+ items = self.db.get_one("k8sresource", {"_id": content["_id"]})
+
+ workflow_name = self.odu.launch_workflow("delete_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ items["state"] = "DELETED"
+ items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ items["state"] = "FAILED_DELETION"
+ items["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ items = self.update_operation_history(items, workflow_status, None)
+ self.db.set_one("k8sresource", {"_id": content["_id"]}, items)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "delete_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ items["resourceState"] = "READY"
+ else:
+ items["resourceState"] = "ERROR"
+
+ items["operatingState"] = "IDLE"
+ items = self.update_operation_history(items, workflow_status, resource_status)
+ self.db.set_one("k8sresource", {"_id": content["_id"]}, items)
+
+ # To delete it from dB
+ if items["state"] == "DELETED":
+ self.db.del_one("k8sresource", {"_id": content["_id"]})
+ return
+
+
+class K8sInfraControllerLcm(LcmBase):
+ def __init__(self, msg, lcm_tasks, config):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.clusterlcm")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+ super().__init__(msg, self.logger)
+
+ async def create(self, content, order_id):
+ self.logger.info("Infra controller Create Enter")
+
+ workflow_name = self.odu.launch_workflow("create_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ content["state"] = "CREATED"
+ content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ content["state"] = "FAILED_CREATION"
+ content["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ content = self.update_operation_history(content, workflow_status, None)
+ self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "create_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ content["resourceState"] = "READY"
+ else:
+ content["resourceState"] = "ERROR"
+
+ content["operatingState"] = "IDLE"
+ content = self.update_operation_history(
+ content, workflow_status, resource_status
+ )
+ self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+
+ return
+
+ async def delete(self, content, order_id):
+ self.logger.info("Infra controller delete Enter")
+ items = self.db.get_one("k8sinfra_controller", {"_id": content["_id"]})
+
+ workflow_name = self.odu.launch_workflow("delete_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ items["state"] = "DELETED"
+ items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ items["state"] = "FAILED_DELETION"
+ items["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ items = self.update_operation_history(items, workflow_status, None)
+ self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "delete_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ items["resourceState"] = "READY"
+ else:
+ items["resourceState"] = "ERROR"
+
+ items["operatingState"] = "IDLE"
+ items = self.update_operation_history(items, workflow_status, resource_status)
+ self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items)
+
+ # To delete it from dB
+ if items["state"] == "DELETED":
+ self.db.del_one("k8sinfra_controller", {"_id": content["_id"]})
+ return
+
+
+class K8sInfraConfigLcm(LcmBase):
+ def __init__(self, msg, lcm_tasks, config):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger("lcm.clusterlcm")
+ self.lcm_tasks = lcm_tasks
+ self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+ super().__init__(msg, self.logger)
+
+ async def create(self, content, order_id):
+ self.logger.info("Infra config Create Enter")
+
+ workflow_name = self.odu.launch_workflow("create_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ content["state"] = "CREATED"
+ content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ content["state"] = "FAILED_CREATION"
+ content["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ content = self.update_operation_history(content, workflow_status, None)
+ self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+
+ if workflow_status:
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "create_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ content["resourceState"] = "READY"
+ else:
+ content["resourceState"] = "ERROR"
+
+ content["operatingState"] = "IDLE"
+ content = self.update_operation_history(
+ content, workflow_status, resource_status
+ )
+ self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+
+ return
+
+ async def delete(self, content, order_id):
+ self.logger.info("Infra config delete Enter")
+
+ workflow_name = self.odu.launch_workflow("delete_profile", content)
+ self.logger.info("workflow_name is :{}".format(workflow_name))
+ items = self.db.get_one("k8sinfra_config", {"_id": content["_id"]})
+
+ workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+ self.logger.info(
+ "workflow_status is :{} and workflow_msg is :{}".format(
+ workflow_status, workflow_msg
+ )
+ )
+ if workflow_status:
+ items["state"] = "DELETED"
+ items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+ else:
+ items["state"] = "FAILED_DELETION"
+ items["resourceState"] = "ERROR"
+ # has to call update_operation_history return content
+ items = self.update_operation_history(items, workflow_status, None)
+ self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items)
+
+ resource_status, resource_msg = self.odu.check_resource_status(
+ "delete_profile", content
+ )
+ self.logger.info(
+ "resource_status is :{} and resource_msg is :{}".format(
+ resource_status, resource_msg
+ )
+ )
+ if resource_status:
+ items["resourceState"] = "READY"
+ else:
+ items["resourceState"] = "ERROR"
+
+ items["operatingState"] = "IDLE"
+ items = self.update_operation_history(items, workflow_status, resource_status)
+ self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items)
+
+ # To delete it from dB
+ if items["state"] == "DELETED":
+ self.db.del_one("k8sinfra_config", {"_id": content["_id"]})
+ return
import sys
from random import SystemRandom
-from osm_lcm import ns, vim_sdn, netslice
+from osm_lcm import ns, vim_sdn, netslice, k8s
from osm_lcm.ng_ro import NgRoException, NgRoClient
from osm_lcm.ROclient import ROClient, ROClientException
self.netslice
) = (
self.vim
- ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
+ ) = (
+ self.wim
+ ) = (
+ self.sdn
+ ) = (
+ self.k8scluster
+ ) = (
+ self.vca
+ ) = (
+ self.k8srepo
+ ) = (
+ self.cluster
+ ) = (
+ self.k8s_app
+ ) = self.k8s_resource = self.k8s_infra_controller = self.k8s_infra_config = None
# logging
log_format_simple = (
# contains created tasks/futures to be able to cancel
self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
+ self.logger.info(
+ "Worker_id: {} main_config: {} lcm tasks: {}".format(
+ self.worker_id, self.main_config, self.lcm_tasks
+ )
+ )
+
async def check_RO_version(self):
tries = 14
last_error = None
async def kafka_read_callback(self, topic, command, params):
order_id = 1
-
+ self.logger.info(
+ "Topic: {} command: {} params: {} order ID: {}".format(
+ topic, command, params, order_id
+ )
+ )
if topic != "admin" and command != "ping":
self.logger.debug(
"Task kafka_read receives {} {}: {}".format(topic, command, params)
self.consecutive_errors = 0
self.first_start = False
order_id += 1
+ self.logger.info(
+ "Consecutive error: {} First start: {}".format(
+ self.consecutive_errors, self.first_start
+ )
+ )
if command == "exit":
raise LcmExceptionExit
elif command.startswith("#"):
elif topic == "ns":
if command == "instantiate":
# self.logger.debug("Deploying NS {}".format(nsr_id))
+ self.logger.info("NS instantiate")
nslcmop = params
nslcmop_id = nslcmop["_id"]
nsr_id = nslcmop["nsInstanceId"]
+ self.logger.info(
+ "NsLCMOP: {} NsLCMOP_ID:{} nsr_id: {}".format(
+ nslcmop, nslcmop_id, nsr_id
+ )
+ )
task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
self.lcm_tasks.register(
"ns", nsr_id, nslcmop_id, "ns_instantiate", task
"actioned",
"updated",
"migrated",
+ "verticalscaled",
): # "scaled-cooldown-time"
return
return
elif topic == "vim_account":
vim_id = params["_id"]
+ self.logger.info("Vim_ID: {}".format(vim_id))
if command in ("create", "created"):
+ self.logger.info("Command : {}".format(command))
+ # "self.logger.info("Main config: {}".format(main_config))"
+ # "self.logger.info("Main config RO: {}".format(main_config.RO.ng))""
if not self.main_config.RO.ng:
+ self.logger.info("Vim create")
task = asyncio.ensure_future(self.vim.create(params, order_id))
self.lcm_tasks.register(
"vim_account", vim_id, order_id, "vim_create", task
return
elif command == "deleted":
return # TODO cleaning of task just in case should be done
+ elif topic == "cluster":
+ if command == "create" or command == "created":
+ operation_id = params["operation_id"]
+ cluster_id = params["_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ # cluster_id = params.get("_id")
+ self.logger.debug("operation_id = {}".format(operation_id))
+ self.logger.debug("cluster_id = {}".format(cluster_id))
+ self.logger.debug("cluster_db = {}".format(db_cluster))
+ task = asyncio.ensure_future(self.cluster.create(params, order_id))
+ self.lcm_tasks.register(
+ "cluster", cluster_id, order_id, "cluster_create", task
+ )
+ return
+ elif command == "delete" or command == "deleted":
+ # cluster_id = params.get("_id")
+ operation_id = params["operation_id"]
+ cluster_id = params["_id"]
+ db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+ task = asyncio.ensure_future(self.cluster.delete(params, order_id))
+ self.lcm_tasks.register(
+ "cluster", cluster_id, order_id, "cluster_delete", task
+ )
+ return
+ elif command == "add" or command == "added":
+ add_id = params.get("_id")
+ task = asyncio.ensure_future(self.cluster.add(params, order_id))
+ self.lcm_tasks.register(
+ "cluster", add_id, order_id, "profile_add", task
+ )
+ return
+ elif command == "remove" or command == "removed":
+ remove_id = params.get("_id")
+ task = asyncio.ensure_future(self.cluster.remove(params, order_id))
+ self.lcm_tasks.register(
+ "cluster", remove_id, order_id, "profile_remove", task
+ )
+ return
+ elif command == "register" or command == "registered":
+ cluster_id = params.get("_id")
+ task = asyncio.ensure_future(self.cluster.register(params, order_id))
+ self.lcm_tasks.register(
+ "cluster", cluster_id, order_id, "cluster_register", task
+ )
+ return
+ elif command == "deregister" or command == "deregistered":
+ cluster_id = params.get("_id")
+ task = asyncio.ensure_future(self.cluster.deregister(params, order_id))
+ self.lcm_tasks.register(
+ "cluster", cluster_id, order_id, "cluster_deregister", task
+ )
+ return
+ elif topic == "k8s_app":
+ if command == "profile_create" or command == "profile_created":
+ k8s_app_id = params.get("_id")
+ self.logger.debug("k8s_app_id = {}".format(k8s_app_id))
+ task = asyncio.ensure_future(self.k8s_app.create(params, order_id))
+ self.lcm_tasks.register(
+ "k8s_app", k8s_app_id, order_id, "k8s_app_create", task
+ )
+ return
+ elif command == "delete" or command == "deleted":
+ k8s_app_id = params.get("_id")
+ task = asyncio.ensure_future(self.k8s_app.delete(params, order_id))
+ self.lcm_tasks.register(
+ "k8s_app", k8s_app_id, order_id, "k8s_app_delete", task
+ )
+ return
+ elif topic == "k8s_resource":
+ if command == "profile_create" or command == "profile_created":
+ k8s_resource_id = params.get("_id")
+ self.logger.debug("k8s_resource_id = {}".format(k8s_resource_id))
+ task = asyncio.ensure_future(self.k8s_resource.create(params, order_id))
+ self.lcm_tasks.register(
+ "k8s_resource",
+ k8s_resource_id,
+ order_id,
+ "k8s_resource_create",
+ task,
+ )
+ return
+ elif command == "delete" or command == "deleted":
+ k8s_resource_id = params.get("_id")
+ task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id))
+ self.lcm_tasks.register(
+ "k8s_resource",
+ k8s_resource_id,
+ order_id,
+ "k8s_resource_delete",
+ task,
+ )
+ return
+
+ elif topic == "k8s_infra_controller":
+ if command == "profile_create" or command == "profile_created":
+ k8s_infra_controller_id = params.get("_id")
+ self.logger.debug(
+ "k8s_infra_controller_id = {}".format(k8s_infra_controller_id)
+ )
+ task = asyncio.ensure_future(
+ self.k8s_infra_controller.create(params, order_id)
+ )
+ self.lcm_tasks.register(
+ "k8s_infra_controller",
+ k8s_infra_controller_id,
+ order_id,
+ "k8s_infra_controller_create",
+ task,
+ )
+ return
+ elif command == "delete" or command == "deleted":
+ k8s_infra_controller_id = params.get("_id")
+ task = asyncio.ensure_future(
+ self.k8s_infra_controller.delete(params, order_id)
+ )
+ self.lcm_tasks.register(
+ "k8s_infra_controller",
+ k8s_infra_controller_id,
+ order_id,
+ "k8s_infra_controller_delete",
+ task,
+ )
+ return
+
+ elif topic == "k8s_infra_config":
+ if command == "profile_create" or command == "profile_created":
+ k8s_infra_config_id = params.get("_id")
+ self.logger.debug(
+ "k8s_infra_config_id = {}".format(k8s_infra_config_id)
+ )
+ task = asyncio.ensure_future(
+ self.k8s_infra_config.create(params, order_id)
+ )
+ self.lcm_tasks.register(
+ "k8s_infra_config",
+ k8s_infra_config_id,
+ order_id,
+ "k8s_infra_config_create",
+ task,
+ )
+ return
+ elif command == "delete" or command == "deleted":
+ k8s_infra_config_id = params.get("_id")
+ task = asyncio.ensure_future(
+ self.k8s_infra_config.delete(params, order_id)
+ )
+ self.lcm_tasks.register(
+ "k8s_infra_config",
+ k8s_infra_config_id,
+ order_id,
+ "k8s_infra_config_delete",
+ task,
+ )
+ return
+
self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
async def kafka_read(self):
)
self.consecutive_errors = 0
self.first_start = True
+ self.logger.info(
+ "Consecutive errors: {} first start: {}".format(
+ self.consecutive_errors, self.first_start
+ )
+ )
while self.consecutive_errors < 10:
try:
topics = (
"k8srepo",
"pla",
"nslcmops",
+ "cluster",
+ "k8s_app",
+ "k8s_resource",
+ "k8s_infra_controller",
+ "k8s_infra_config",
+ )
+ self.logger.info(
+ "Consecutive errors: {} first start: {}".format(
+ self.consecutive_errors, self.first_start
+ )
)
topics_admin = ("admin",)
await asyncio.gather(
await asyncio.gather(self.kafka_read(), self.kafka_ping())
async def start(self):
+ self.logger.info("Start LCM")
# check RO version
await self.check_RO_version()
self.k8srepo = vim_sdn.K8sRepoLcm(
self.msg, self.lcm_tasks, self.main_config.to_dict()
)
+ self.cluster = k8s.ClusterLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
+ )
+ self.k8s_app = k8s.K8sAppLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
+ )
+ self.k8s_resource = k8s.K8sResourceLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
+ )
+ self.k8s_infra_controller = k8s.K8sInfraControllerLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
+ )
+ self.k8s_infra_config = k8s.K8sInfraConfigLcm(
+ self.msg, self.lcm_tasks, self.main_config.to_dict()
+ )
+
+ self.logger.info(
+ "Msg: {} lcm tasks: {} main config: {}".format(
+ self.msg, self.lcm_tasks, self.main_config
+ )
+ )
await self.kafka_read_ping()