Feature 11023 - 11026 : Advanced cluster management 75/14475/9
authorrshri <shrinithi.r@tataelxsi.co.in>
Fri, 5 Jul 2024 15:11:55 +0000 (15:11 +0000)
committerrshri <shrinithi.r@tataelxsi.co.in>
Fri, 16 Aug 2024 10:35:54 +0000 (10:35 +0000)
Change-Id: Ibd042e6151048fd6b633e2acc29bc9d97e460fd2
Signed-off-by: rshri <shrinithi.r@tataelxsi.co.in>
osm_lcm/k8s.py [new file with mode: 0644]
osm_lcm/lcm.py
osm_lcm/lcm_utils.py
osm_lcm/odu_workflows.py [new file with mode: 0644]

diff --git a/osm_lcm/k8s.py b/osm_lcm/k8s.py
new file mode 100644 (file)
index 0000000..4272cfd
--- /dev/null
@@ -0,0 +1,857 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__author__ = (
+    "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
+    "Shahithya Y <shahithya.y@tataelxsi.co.in>",
+)
+
+import logging
+from osm_lcm.lcm_utils import LcmBase
+from copy import deepcopy
+from osm_lcm import odu_workflows
+from osm_lcm import vim_sdn
+
+
+class ClusterLcm(LcmBase):
+    db_topic = "clusters"
+
+    def __init__(self, msg, lcm_tasks, config):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+
+        self.logger = logging.getLogger("lcm.clusterlcm")
+        self.lcm_tasks = lcm_tasks
+        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+        self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
+
+        super().__init__(msg, self.logger)
+
+    async def create(self, content, order_id):
+        self.logger.info("cluster Create Enter")
+
+        workflow_name = self.odu.launch_workflow("create_cluster", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            content["state"] = "CREATED"
+            content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            content["state"] = "FAILED_CREATION"
+            content["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        content = self.update_operation_history(content, workflow_status, None)
+        self.db.set_one("clusters", {"_id": content["_id"]}, content)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "create_cluster", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                content["resourceState"] = "READY"
+            else:
+                content["resourceState"] = "ERROR"
+
+        content["operatingState"] = "IDLE"
+        content = self.update_operation_history(
+            content, workflow_status, resource_status
+        )
+        self.db.set_one("clusters", {"_id": content["_id"]}, content)
+        self.profile_state(content, workflow_status, resource_status)
+        return
+
+    def profile_state(self, content, workflow_status, resource_status):
+        profiles = [
+            "infra_controller_profiles",
+            "infra_config_profiles",
+            "app_profiles",
+            "resource_profiles",
+        ]
+        profiles_collection = {
+            "infra_controller_profiles": "k8sinfra_controller",
+            "infra_config_profiles": "k8sinfra_config",
+            "app_profiles": "k8sapp",
+            "resource_profiles": "k8sresource",
+        }
+        for profile_type in profiles:
+            profile_id = content[profile_type]
+            self.logger.info("profile id is : {}".format(profile_id))
+            db_collection = profiles_collection[profile_type]
+            self.logger.info("the db_collection is :{}".format(db_collection))
+            db_profile = self.db.get_one(db_collection, {"_id": profile_id})
+            self.logger.info("the db_profile is :{}".format(db_profile))
+            db_profile["state"] = content["state"]
+            db_profile["resourceState"] = content["resourceState"]
+            db_profile["operatingState"] = content["operatingState"]
+            db_profile = self.update_operation_history(
+                db_profile, workflow_status, resource_status
+            )
+            self.logger.info("the db_profile is :{}".format(db_profile))
+            self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
+
+    async def delete(self, content, order_id):
+        self.logger.info("cluster delete Enter")
+        items = self.db.get_one("clusters", {"_id": content["_id"]})
+
+        workflow_name = self.odu.launch_workflow("delete_cluster", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            items["state"] = "DELETED"
+            items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            items["state"] = "FAILED_DELETION"
+            items["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        items = self.update_operation_history(items, workflow_status, None)
+        self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "delete_cluster", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                items["resourceState"] = "READY"
+            else:
+                items["resourceState"] = "ERROR"
+
+        items["operatingState"] = "IDLE"
+        items = self.update_operation_history(items, workflow_status, resource_status)
+        self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+        # To delete it from dB
+        if items["state"] == "DELETED":
+            self.delete_cluster(content, order_id)
+        return
+
+    def delete_cluster(self, content, order_id):
+        item_content = self.db.get_one("clusters", {"_id": content["_id"]})
+        self.logger.info("1_the item_content is : {}".format(item_content))
+
+        self.logger.info("it is getting into if item_content state")
+        # detach profiles
+        update_dict = None
+        profiles_to_detach = [
+            "infra_controller_profiles",
+            "infra_config_profiles",
+            "app_profiles",
+            "resource_profiles",
+        ]
+        profiles_collection = {
+            "infra_controller_profiles": "k8sinfra_controller",
+            "infra_config_profiles": "k8sinfra_config",
+            "app_profiles": "k8sapp",
+            "resource_profiles": "k8sresource",
+        }
+        for profile_type in profiles_to_detach:
+            if item_content.get(profile_type):
+                self.logger.info("the profile_type is :{}".format(profile_type))
+                profile_ids = item_content[profile_type]
+                self.logger.info("the profile_ids is :{}".format(profile_ids))
+                profile_ids_copy = deepcopy(profile_ids)
+                self.logger.info("the profile_ids_copy is :{}".format(profile_ids_copy))
+                for profile_id in profile_ids_copy:
+                    self.logger.info("the profile_id is :{}".format(profile_id))
+                    db_collection = profiles_collection[profile_type]
+                    self.logger.info("the db_collection is :{}".format(db_collection))
+                    db_profile = self.db.get_one(db_collection, {"_id": profile_id})
+                    self.logger.info("the db_profile is :{}".format(db_profile))
+                    self.logger.info(
+                        "the item_content name is :{}".format(item_content["name"])
+                    )
+                    self.logger.info(
+                        "the db_profile name is :{}".format(db_profile["name"])
+                    )
+                    if item_content["name"] == db_profile["name"]:
+                        self.logger.info("it is getting into if default")
+                        self.db.del_one(db_collection, {"_id": profile_id})
+                    else:
+                        self.logger.info("it is getting into else non default")
+                        profile_ids.remove(profile_id)
+                        update_dict = {profile_type: profile_ids}
+                        self.logger.info(f"the update dict is :{update_dict}")
+                        self.db.set_one(
+                            "clusters", {"_id": content["_id"]}, update_dict
+                        )
+        self.db.del_one("clusters", {"_id": item_content["_id"]})
+        self.logger.info("the id is :{}".format(content["_id"]))
+
+    async def add(self, content, order_id):
+        self.logger.info("profile attach Enter")
+        db_cluster = self.db.get_one("clusters", {"_id": content["_id"]})
+        profile_type = content["profile_type"]
+        self.logger.info("profile type is : {}".format(profile_type))
+        profile_id = content["profile_id"]
+        self.logger.info("profile id is : {}".format(profile_id))
+
+        workflow_name = self.odu.launch_workflow("attach_profile_to_cluster", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            db_cluster["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
+        self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "attach_profile_to_cluster", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                db_cluster["resourceState"] = "READY"
+            else:
+                db_cluster["resourceState"] = "ERROR"
+
+        db_cluster["operatingState"] = "IDLE"
+        db_cluster = self.update_operation_history(
+            db_cluster, workflow_status, resource_status
+        )
+        profiles_collection = {
+            "infra_controller_profiles": "k8sinfra_controller",
+            "infra_config_profiles": "k8sinfra_config",
+            "app_profiles": "k8sapp",
+            "resource_profiles": "k8sresource",
+        }
+        db_collection = profiles_collection[profile_type]
+        self.logger.info("db_collection is : {}".format(db_collection))
+        profile_list = db_cluster[profile_type]
+        self.logger.info("profile list is : {}".format(profile_list))
+        if resource_status:
+            self.logger.info("it is getting into resource status true")
+            profile_list.append(profile_id)
+            self.logger.info("profile list is : {}".format(profile_list))
+            db_cluster[profile_type] = profile_list
+            self.logger.info("db cluster is : {}".format(db_cluster))
+        # update_dict = {item: profile_list}
+        # self.logger.info("the update_dict is :{}".format(update_dict))
+        # self.db.set_one(self.topic, filter_q, update_dict)
+        self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+        return
+
+    async def remove(self, content, order_id):
+        self.logger.info("profile dettach Enter")
+        db_cluster = self.db.get_one("clusters", {"_id": content["_id"]})
+        profile_type = content["profile_type"]
+        self.logger.info("profile type is : {}".format(profile_type))
+        profile_id = content["profile_id"]
+        self.logger.info("profile id is : {}".format(profile_id))
+
+        workflow_name = self.odu.launch_workflow("detach_profile_from_cluster", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            db_cluster["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
+        self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "detach_profile_from_cluster", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                db_cluster["resourceState"] = "READY"
+            else:
+                db_cluster["resourceState"] = "ERROR"
+
+        db_cluster["operatingState"] = "IDLE"
+        db_cluster = self.update_operation_history(
+            db_cluster, workflow_status, resource_status
+        )
+        profiles_collection = {
+            "infra_controller_profiles": "k8sinfra_controller",
+            "infra_config_profiles": "k8sinfra_config",
+            "app_profiles": "k8sapp",
+            "resource_profiles": "k8sresource",
+        }
+        db_collection = profiles_collection[profile_type]
+        self.logger.info("db_collection is : {}".format(db_collection))
+        profile_list = db_cluster[profile_type]
+        self.logger.info("profile list is : {}".format(profile_list))
+        if resource_status:
+            self.logger.info("it is getting into resource status true")
+            profile_list.remove(profile_id)
+            self.logger.info("profile list is : {}".format(profile_list))
+            db_cluster[profile_type] = profile_list
+            self.logger.info("db cluster is : {}".format(db_cluster))
+        # update_dict = {item: profile_list}
+        # self.logger.info("the update_dict is :{}".format(update_dict))
+        # self.db.set_one(self.topic, filter_q, update_dict)
+        self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
+
+        return
+
+    async def register(self, content, order_id):
+        self.logger.info("cluster register enter")
+
+        workflow_name = self.odu.launch_workflow("register_cluster", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            content["state"] = "CREATED"
+            content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            content["state"] = "FAILED_CREATION"
+            content["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        content = self.update_operation_history(content, workflow_status, None)
+        self.db.set_one("clusters", {"_id": content["_id"]}, content)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "register_cluster", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                content["resourceState"] = "READY"
+            else:
+                content["resourceState"] = "ERROR"
+
+        content["operatingState"] = "IDLE"
+        content = self.update_operation_history(
+            content, workflow_status, resource_status
+        )
+        self.db.set_one("clusters", {"_id": content["_id"]}, content)
+        self.profile_state(content, workflow_status, resource_status)
+        return
+
+    async def deregister(self, content, order_id):
+        self.logger.info("cluster deregister enter")
+
+        items = self.db.get_one("clusters", {"_id": content["_id"]})
+        self.logger.info("the items is : {}".format(items))
+
+        workflow_name = self.odu.launch_workflow("deregister_cluster", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            items["state"] = "DELETED"
+            items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            items["state"] = "FAILED_DELETION"
+            items["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        items = self.update_operation_history(items, workflow_status, None)
+        self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "deregister_cluster", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                items["resourceState"] = "READY"
+            else:
+                items["resourceState"] = "ERROR"
+
+        items["operatingState"] = "IDLE"
+        items = self.update_operation_history(items, workflow_status, resource_status)
+        self.db.set_one("clusters", {"_id": content["_id"]}, items)
+
+        # To delete it from dB
+        if items["state"] == "DELETED":
+            self.db.del_one("clusters", {"_id": items["_id"]})
+        return
+
+
+class K8sAppLcm(LcmBase):
+    def __init__(self, msg, lcm_tasks, config):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+
+        self.logger = logging.getLogger("lcm.clusterlcm")
+        self.lcm_tasks = lcm_tasks
+        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+        super().__init__(msg, self.logger)
+
+    async def create(self, content, order_id):
+        self.logger.info("App Create Enter")
+
+        workflow_name = self.odu.launch_workflow("create_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            content["state"] = "CREATED"
+            content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            content["state"] = "FAILED_CREATION"
+            content["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        content = self.update_operation_history(content, workflow_status, None)
+        self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "create_profile", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                content["resourceState"] = "READY"
+            else:
+                content["resourceState"] = "ERROR"
+
+        content["operatingState"] = "IDLE"
+        content = self.update_operation_history(
+            content, workflow_status, resource_status
+        )
+        self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
+        return
+
+    async def delete(self, content, order_id):
+        self.logger.info("App delete Enter")
+        items = self.db.get_one("k8sapp", {"_id": content["_id"]})
+
+        workflow_name = self.odu.launch_workflow("delete_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            items["state"] = "DELETED"
+            items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            items["state"] = "FAILED_DELETION"
+            items["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        items = self.update_operation_history(items, workflow_status, None)
+        self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "delete_profile", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                items["resourceState"] = "READY"
+            else:
+                items["resourceState"] = "ERROR"
+
+        items["operatingState"] = "IDLE"
+        items = self.update_operation_history(items, workflow_status, resource_status)
+        self.db.set_one("k8sapp", {"_id": content["_id"]}, items)
+
+        # To delete it from dB
+        if items["state"] == "DELETED":
+            self.db.del_one("k8sapp", {"_id": content["_id"]})
+        return
+
+
+class K8sResourceLcm(LcmBase):
+    def __init__(self, msg, lcm_tasks, config):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+
+        self.logger = logging.getLogger("lcm.clusterlcm")
+        self.lcm_tasks = lcm_tasks
+        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+        super().__init__(msg, self.logger)
+
+    async def create(self, content, order_id):
+        self.logger.info("Resource Create Enter")
+
+        workflow_name = self.odu.launch_workflow("create_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            content["state"] = "CREATED"
+            content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            content["state"] = "FAILED_CREATION"
+            content["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        content = self.update_operation_history(content, workflow_status, None)
+        self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "create_profile", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                content["resourceState"] = "READY"
+            else:
+                content["resourceState"] = "ERROR"
+
+        content["operatingState"] = "IDLE"
+        content = self.update_operation_history(
+            content, workflow_status, resource_status
+        )
+        self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
+
+        return
+
+    async def delete(self, content, order_id):
+        self.logger.info("Resource delete Enter")
+        items = self.db.get_one("k8sresource", {"_id": content["_id"]})
+
+        workflow_name = self.odu.launch_workflow("delete_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            items["state"] = "DELETED"
+            items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            items["state"] = "FAILED_DELETION"
+            items["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        items = self.update_operation_history(items, workflow_status, None)
+        self.db.set_one("k8sresource", {"_id": content["_id"]}, items)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "delete_profile", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                items["resourceState"] = "READY"
+            else:
+                items["resourceState"] = "ERROR"
+
+        items["operatingState"] = "IDLE"
+        items = self.update_operation_history(items, workflow_status, resource_status)
+        self.db.set_one("k8sresource", {"_id": content["_id"]}, items)
+
+        # To delete it from dB
+        if items["state"] == "DELETED":
+            self.db.del_one("k8sresource", {"_id": content["_id"]})
+        return
+
+
+class K8sInfraControllerLcm(LcmBase):
+    def __init__(self, msg, lcm_tasks, config):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+
+        self.logger = logging.getLogger("lcm.clusterlcm")
+        self.lcm_tasks = lcm_tasks
+        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+        super().__init__(msg, self.logger)
+
+    async def create(self, content, order_id):
+        self.logger.info("Infra controller Create Enter")
+
+        workflow_name = self.odu.launch_workflow("create_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            content["state"] = "CREATED"
+            content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            content["state"] = "FAILED_CREATION"
+            content["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        content = self.update_operation_history(content, workflow_status, None)
+        self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "create_profile", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                content["resourceState"] = "READY"
+            else:
+                content["resourceState"] = "ERROR"
+
+        content["operatingState"] = "IDLE"
+        content = self.update_operation_history(
+            content, workflow_status, resource_status
+        )
+        self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
+
+        return
+
+    async def delete(self, content, order_id):
+        self.logger.info("Infra controller delete Enter")
+        items = self.db.get_one("k8sinfra_controller", {"_id": content["_id"]})
+
+        workflow_name = self.odu.launch_workflow("delete_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            items["state"] = "DELETED"
+            items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            items["state"] = "FAILED_DELETION"
+            items["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        items = self.update_operation_history(items, workflow_status, None)
+        self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "delete_profile", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                items["resourceState"] = "READY"
+            else:
+                items["resourceState"] = "ERROR"
+
+        items["operatingState"] = "IDLE"
+        items = self.update_operation_history(items, workflow_status, resource_status)
+        self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items)
+
+        # To delete it from dB
+        if items["state"] == "DELETED":
+            self.db.del_one("k8sinfra_controller", {"_id": content["_id"]})
+        return
+
+
+class K8sInfraConfigLcm(LcmBase):
+    def __init__(self, msg, lcm_tasks, config):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+
+        self.logger = logging.getLogger("lcm.clusterlcm")
+        self.lcm_tasks = lcm_tasks
+        self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
+
+        super().__init__(msg, self.logger)
+
+    async def create(self, content, order_id):
+        self.logger.info("Infra config Create Enter")
+
+        workflow_name = self.odu.launch_workflow("create_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            content["state"] = "CREATED"
+            content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            content["state"] = "FAILED_CREATION"
+            content["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        content = self.update_operation_history(content, workflow_status, None)
+        self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+
+        if workflow_status:
+            resource_status, resource_msg = self.odu.check_resource_status(
+                "create_profile", content
+            )
+            self.logger.info(
+                "resource_status is :{} and resource_msg is :{}".format(
+                    resource_status, resource_msg
+                )
+            )
+            if resource_status:
+                content["resourceState"] = "READY"
+            else:
+                content["resourceState"] = "ERROR"
+
+        content["operatingState"] = "IDLE"
+        content = self.update_operation_history(
+            content, workflow_status, resource_status
+        )
+        self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
+
+        return
+
+    async def delete(self, content, order_id):
+        self.logger.info("Infra config delete Enter")
+
+        workflow_name = self.odu.launch_workflow("delete_profile", content)
+        self.logger.info("workflow_name is :{}".format(workflow_name))
+        items = self.db.get_one("k8sinfra_config", {"_id": content["_id"]})
+
+        workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
+        self.logger.info(
+            "workflow_status is :{} and workflow_msg is :{}".format(
+                workflow_status, workflow_msg
+            )
+        )
+        if workflow_status:
+            items["state"] = "DELETED"
+            items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
+        else:
+            items["state"] = "FAILED_DELETION"
+            items["resourceState"] = "ERROR"
+        # has to call update_operation_history return content
+        items = self.update_operation_history(items, workflow_status, None)
+        self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items)
+
+        resource_status, resource_msg = self.odu.check_resource_status(
+            "delete_profile", content
+        )
+        self.logger.info(
+            "resource_status is :{} and resource_msg is :{}".format(
+                resource_status, resource_msg
+            )
+        )
+        if resource_status:
+            items["resourceState"] = "READY"
+        else:
+            items["resourceState"] = "ERROR"
+
+        items["operatingState"] = "IDLE"
+        items = self.update_operation_history(items, workflow_status, resource_status)
+        self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items)
+
+        # To delete it from dB
+        if items["state"] == "DELETED":
+            self.db.del_one("k8sinfra_config", {"_id": content["_id"]})
+        return
index 1c81da1..9d711ea 100644 (file)
@@ -30,7 +30,7 @@ import getopt
 import sys
 from random import SystemRandom
 
-from osm_lcm import ns, vim_sdn, netslice
+from osm_lcm import ns, vim_sdn, netslice, k8s
 from osm_lcm.ng_ro import NgRoException, NgRoClient
 from osm_lcm.ROclient import ROClient, ROClientException
 
@@ -100,7 +100,21 @@ class Lcm:
             self.netslice
         ) = (
             self.vim
-        ) = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
+        ) = (
+            self.wim
+        ) = (
+            self.sdn
+        ) = (
+            self.k8scluster
+        ) = (
+            self.vca
+        ) = (
+            self.k8srepo
+        ) = (
+            self.cluster
+        ) = (
+            self.k8s_app
+        ) = self.k8s_resource = self.k8s_infra_controller = self.k8s_infra_config = None
 
         # logging
         log_format_simple = (
@@ -193,6 +207,12 @@ class Lcm:
         # contains created tasks/futures to be able to cancel
         self.lcm_tasks = TaskRegistry(self.worker_id, self.logger)
 
+        self.logger.info(
+            "Worker_id: {} main_config: {} lcm tasks: {}".format(
+                self.worker_id, self.main_config, self.lcm_tasks
+            )
+        )
+
     async def check_RO_version(self):
         tries = 14
         last_error = None
@@ -297,7 +317,11 @@ class Lcm:
 
     async def kafka_read_callback(self, topic, command, params):
         order_id = 1
-
+        self.logger.info(
+            "Topic: {} command: {} params: {} order ID: {}".format(
+                topic, command, params, order_id
+            )
+        )
         if topic != "admin" and command != "ping":
             self.logger.debug(
                 "Task kafka_read receives {} {}: {}".format(topic, command, params)
@@ -305,6 +329,11 @@ class Lcm:
         self.consecutive_errors = 0
         self.first_start = False
         order_id += 1
+        self.logger.info(
+            "Consecutive error: {} First start: {}".format(
+                self.consecutive_errors, self.first_start
+            )
+        )
         if command == "exit":
             raise LcmExceptionExit
         elif command.startswith("#"):
@@ -432,9 +461,15 @@ class Lcm:
         elif topic == "ns":
             if command == "instantiate":
                 # self.logger.debug("Deploying NS {}".format(nsr_id))
+                self.logger.info("NS instantiate")
                 nslcmop = params
                 nslcmop_id = nslcmop["_id"]
                 nsr_id = nslcmop["nsInstanceId"]
+                self.logger.info(
+                    "NsLCMOP: {} NsLCMOP_ID:{} nsr_id: {}".format(
+                        nslcmop, nslcmop_id, nsr_id
+                    )
+                )
                 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
                 self.lcm_tasks.register(
                     "ns", nsr_id, nslcmop_id, "ns_instantiate", task
@@ -531,6 +566,7 @@ class Lcm:
                 "actioned",
                 "updated",
                 "migrated",
+                "verticalscaled",
             ):  # "scaled-cooldown-time"
                 return
 
@@ -592,8 +628,13 @@ class Lcm:
                 return
         elif topic == "vim_account":
             vim_id = params["_id"]
+            self.logger.info("Vim_ID: {}".format(vim_id))
             if command in ("create", "created"):
+                self.logger.info("Command : {}".format(command))
+                # "self.logger.info("Main config: {}".format(main_config))"
+                # "self.logger.info("Main config RO: {}".format(main_config.RO.ng))""
                 if not self.main_config.RO.ng:
+                    self.logger.info("Vim create")
                     task = asyncio.ensure_future(self.vim.create(params, order_id))
                     self.lcm_tasks.register(
                         "vim_account", vim_id, order_id, "vim_create", task
@@ -667,6 +708,161 @@ class Lcm:
                 return
             elif command == "deleted":
                 return  # TODO cleaning of task just in case should be done
+        elif topic == "cluster":
+            if command == "create" or command == "created":
+                operation_id = params["operation_id"]
+                cluster_id = params["_id"]
+                db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+                # cluster_id = params.get("_id")
+                self.logger.debug("operation_id = {}".format(operation_id))
+                self.logger.debug("cluster_id = {}".format(cluster_id))
+                self.logger.debug("cluster_db = {}".format(db_cluster))
+                task = asyncio.ensure_future(self.cluster.create(params, order_id))
+                self.lcm_tasks.register(
+                    "cluster", cluster_id, order_id, "cluster_create", task
+                )
+                return
+            elif command == "delete" or command == "deleted":
+                # cluster_id = params.get("_id")
+                operation_id = params["operation_id"]
+                cluster_id = params["_id"]
+                db_cluster = self.db.get_one("clusters", {"_id": cluster_id})
+                task = asyncio.ensure_future(self.cluster.delete(params, order_id))
+                self.lcm_tasks.register(
+                    "cluster", cluster_id, order_id, "cluster_delete", task
+                )
+                return
+            elif command == "add" or command == "added":
+                add_id = params.get("_id")
+                task = asyncio.ensure_future(self.cluster.add(params, order_id))
+                self.lcm_tasks.register(
+                    "cluster", add_id, order_id, "profile_add", task
+                )
+                return
+            elif command == "remove" or command == "removed":
+                remove_id = params.get("_id")
+                task = asyncio.ensure_future(self.cluster.remove(params, order_id))
+                self.lcm_tasks.register(
+                    "cluster", remove_id, order_id, "profile_remove", task
+                )
+                return
+            elif command == "register" or command == "registered":
+                cluster_id = params.get("_id")
+                task = asyncio.ensure_future(self.cluster.register(params, order_id))
+                self.lcm_tasks.register(
+                    "cluster", cluster_id, order_id, "cluster_register", task
+                )
+                return
+            elif command == "deregister" or command == "deregistered":
+                cluster_id = params.get("_id")
+                task = asyncio.ensure_future(self.cluster.deregister(params, order_id))
+                self.lcm_tasks.register(
+                    "cluster", cluster_id, order_id, "cluster_deregister", task
+                )
+                return
+        elif topic == "k8s_app":
+            if command == "profile_create" or command == "profile_created":
+                k8s_app_id = params.get("_id")
+                self.logger.debug("k8s_app_id = {}".format(k8s_app_id))
+                task = asyncio.ensure_future(self.k8s_app.create(params, order_id))
+                self.lcm_tasks.register(
+                    "k8s_app", k8s_app_id, order_id, "k8s_app_create", task
+                )
+                return
+            elif command == "delete" or command == "deleted":
+                k8s_app_id = params.get("_id")
+                task = asyncio.ensure_future(self.k8s_app.delete(params, order_id))
+                self.lcm_tasks.register(
+                    "k8s_app", k8s_app_id, order_id, "k8s_app_delete", task
+                )
+                return
+        elif topic == "k8s_resource":
+            if command == "profile_create" or command == "profile_created":
+                k8s_resource_id = params.get("_id")
+                self.logger.debug("k8s_resource_id = {}".format(k8s_resource_id))
+                task = asyncio.ensure_future(self.k8s_resource.create(params, order_id))
+                self.lcm_tasks.register(
+                    "k8s_resource",
+                    k8s_resource_id,
+                    order_id,
+                    "k8s_resource_create",
+                    task,
+                )
+                return
+            elif command == "delete" or command == "deleted":
+                k8s_resource_id = params.get("_id")
+                task = asyncio.ensure_future(self.k8s_resource.delete(params, order_id))
+                self.lcm_tasks.register(
+                    "k8s_resource",
+                    k8s_resource_id,
+                    order_id,
+                    "k8s_resource_delete",
+                    task,
+                )
+                return
+
+        elif topic == "k8s_infra_controller":
+            if command == "profile_create" or command == "profile_created":
+                k8s_infra_controller_id = params.get("_id")
+                self.logger.debug(
+                    "k8s_infra_controller_id = {}".format(k8s_infra_controller_id)
+                )
+                task = asyncio.ensure_future(
+                    self.k8s_infra_controller.create(params, order_id)
+                )
+                self.lcm_tasks.register(
+                    "k8s_infra_controller",
+                    k8s_infra_controller_id,
+                    order_id,
+                    "k8s_infra_controller_create",
+                    task,
+                )
+                return
+            elif command == "delete" or command == "deleted":
+                k8s_infra_controller_id = params.get("_id")
+                task = asyncio.ensure_future(
+                    self.k8s_infra_controller.delete(params, order_id)
+                )
+                self.lcm_tasks.register(
+                    "k8s_infra_controller",
+                    k8s_infra_controller_id,
+                    order_id,
+                    "k8s_infra_controller_delete",
+                    task,
+                )
+                return
+
+        elif topic == "k8s_infra_config":
+            if command == "profile_create" or command == "profile_created":
+                k8s_infra_config_id = params.get("_id")
+                self.logger.debug(
+                    "k8s_infra_config_id = {}".format(k8s_infra_config_id)
+                )
+                task = asyncio.ensure_future(
+                    self.k8s_infra_config.create(params, order_id)
+                )
+                self.lcm_tasks.register(
+                    "k8s_infra_config",
+                    k8s_infra_config_id,
+                    order_id,
+                    "k8s_infra_config_create",
+                    task,
+                )
+                return
+            elif command == "delete" or command == "deleted":
+                k8s_infra_config_id = params.get("_id")
+                task = asyncio.ensure_future(
+                    self.k8s_infra_config.delete(params, order_id)
+                )
+                self.lcm_tasks.register(
+                    "k8s_infra_config",
+                    k8s_infra_config_id,
+                    order_id,
+                    "k8s_infra_config_delete",
+                    task,
+                )
+                return
+
         self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
 
     async def kafka_read(self):
@@ -675,6 +871,11 @@ class Lcm:
         )
         self.consecutive_errors = 0
         self.first_start = True
+        self.logger.info(
+            "Consecutive errors: {} first start: {}".format(
+                self.consecutive_errors, self.first_start
+            )
+        )
         while self.consecutive_errors < 10:
             try:
                 topics = (
@@ -688,6 +889,16 @@ class Lcm:
                     "k8srepo",
                     "pla",
                     "nslcmops",
+                    "cluster",
+                    "k8s_app",
+                    "k8s_resource",
+                    "k8s_infra_controller",
+                    "k8s_infra_config",
+                )
+                self.logger.info(
+                    "Consecutive errors: {} first start: {}".format(
+                        self.consecutive_errors, self.first_start
+                    )
                 )
                 topics_admin = ("admin",)
                 await asyncio.gather(
@@ -729,6 +940,7 @@ class Lcm:
         await asyncio.gather(self.kafka_read(), self.kafka_ping())
 
     async def start(self):
+        self.logger.info("Start LCM")
         # check RO version
         await self.check_RO_version()
 
@@ -747,6 +959,27 @@ class Lcm:
         self.k8srepo = vim_sdn.K8sRepoLcm(
             self.msg, self.lcm_tasks, self.main_config.to_dict()
         )
+        self.cluster = k8s.ClusterLcm(
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
+        )
+        self.k8s_app = k8s.K8sAppLcm(
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
+        )
+        self.k8s_resource = k8s.K8sResourceLcm(
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
+        )
+        self.k8s_infra_controller = k8s.K8sInfraControllerLcm(
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
+        )
+        self.k8s_infra_config = k8s.K8sInfraConfigLcm(
+            self.msg, self.lcm_tasks, self.main_config.to_dict()
+        )
+
+        self.logger.info(
+            "Msg: {} lcm tasks: {} main config: {}".format(
+                self.msg, self.lcm_tasks, self.main_config
+            )
+        )
 
         await self.kafka_read_ping()
 
index af460d2..d1b310b 100644 (file)
@@ -221,11 +221,47 @@ class LcmBase:
             return
         now = time()
         _desc["_admin.modified"] = now
+        self.logger.info("Desc: {} Item: {} _id: {}".format(_desc, item, _id))
         self.db.set_one(item, {"_id": _id}, _desc)
         _desc.clear()
         # except DbException as e:
         #     self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
 
+    def update_operation_history(
+        self, content, workflow_status=None, resource_status=None
+    ):
+        self.logger.info("Update Operation History in LcmBase")
+        self.logger.info(
+            "Content: {} Workflow Status: {} Resource Status: {}".format(
+                content, workflow_status, resource_status
+            )
+        )
+
+        op_id = content["current_operation"]
+        self.logger.info("OP_id: {}".format(op_id))
+        length = 0
+        for op in content["operationHistory"]:
+            self.logger.info("Operations: {}".format(op))
+            if op["op_id"] == op_id:
+                self.logger.info("Length: {}".format(length))
+                now = time()
+                if workflow_status:
+                    content["operationHistory"][length]["workflowState"] = "COMPLETED"
+                else:
+                    content["operationHistory"][length]["workflowState"] = "ERROR"
+
+                if resource_status:
+                    content["operationHistory"][length]["resourceState"] = "READY"
+                else:
+                    content["operationHistory"][length]["resourceState"] = "NOT_READY"
+
+                content["operationHistory"][length]["endDate"] = now
+                break
+            length += 1
+        self.logger.info("content: {}".format(content))
+
+        return content
+
     @staticmethod
     def calculate_charm_hash(zipped_file):
         """Calculate the hash of charm files which ends with .charm
@@ -489,7 +525,19 @@ class TaskRegistry(LcmBase):
 
     # NS/NSI: "services" VIM/WIM/SDN: "accounts"
     topic_service_list = ["ns", "nsi"]
-    topic_account_list = ["vim", "wim", "sdn", "k8scluster", "vca", "k8srepo"]
+    topic_account_list = [
+        "vim",
+        "wim",
+        "sdn",
+        "k8scluster",
+        "vca",
+        "k8srepo",
+        "cluster",
+        "k8s_app",
+        "k8s_resource",
+        "k8s_infra_controller",
+        "k8s_infra_config",
+    ]
 
     # Map topic to InstanceID
     topic2instid_dict = {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"}
@@ -504,6 +552,11 @@ class TaskRegistry(LcmBase):
         "k8scluster": "k8sclusters",
         "vca": "vca",
         "k8srepo": "k8srepos",
+        "cluster": "k8sclusters",
+        "k8s_app": "k8sapp",
+        "k8s_resource": "k8sresource",
+        "k8s_infra_controller": "k8sinfra_controller",
+        "k8s_infra_config": "k8sinfra_config",
     }
 
     def __init__(self, worker_id=None, logger=None):
@@ -516,10 +569,17 @@ class TaskRegistry(LcmBase):
             "k8scluster": {},
             "vca": {},
             "k8srepo": {},
+            "cluster": {},
+            "k8s_app": {},
+            "k8s_resource": {},
+            "k8s_infra_controller": {},
+            "k8s_infra_config": {},
+            "odu": {},
         }
         self.worker_id = worker_id
         self.db = Database().instance.db
         self.logger = logger
+        # self.logger.info("Task registry: {}".format(self.task_registry))
 
     def register(self, topic, _id, op_id, task_name, task):
         """
@@ -531,12 +591,18 @@ class TaskRegistry(LcmBase):
         :param task: Task class
         :return: none
         """
+        self.logger.info(
+            "topic : {}, _id:{}, op_id:{}, taskname:{}, task:{}".format(
+                topic, _id, op_id, task_name, task
+            )
+        )
         if _id not in self.task_registry[topic]:
             self.task_registry[topic][_id] = OrderedDict()
         if op_id not in self.task_registry[topic][_id]:
             self.task_registry[topic][_id][op_id] = {task_name: task}
         else:
             self.task_registry[topic][_id][op_id][task_name] = task
+        self.logger.info("Task resgistry: {}".format(self.task_registry))
         # print("registering task", topic, _id, op_id, task_name, task)
 
     def remove(self, topic, _id, op_id, task_name=None):
@@ -713,12 +779,16 @@ class TaskRegistry(LcmBase):
         """
 
         # Backward compatibility for VIM/WIM/SDN/k8scluster without op_id
+        self.logger.info("Lock_HA")
         if self._is_account_type_HA(topic) and op_id is None:
             return True
 
         # Try to lock this task
         db_table_name = self.topic2dbtable_dict[topic]
         q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id)
+        self.logger.info(
+            "db table name: {} update dict: {}".format(db_table_name, update_dict)
+        )
         db_lock_task = self.db.set_one(
             db_table_name,
             q_filter=q_filter,
@@ -759,7 +829,7 @@ class TaskRegistry(LcmBase):
         :param op_id: Account ID + ':' + Operation Index
         :return: nothing
         """
-
+        self.logger.info("Unlock HA")
         # Backward compatibility
         if not self._is_account_type_HA(topic) or not op_id:
             return
@@ -767,7 +837,7 @@ class TaskRegistry(LcmBase):
         # Get Account ID and Operation Index
         account_id, op_index = self._get_account_and_op_HA(op_id)
         db_table_name = self.topic2dbtable_dict[topic]
-
+        self.logger.info("db_table_name: {}".format(db_table_name))
         # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
         # If the account exist, register the HA task.
         # Update DB for HA tasks
@@ -778,6 +848,7 @@ class TaskRegistry(LcmBase):
             "_admin.operations.{}.worker".format(op_index): None,
             "_admin.current_operation": None,
         }
+        self.logger.info("Update dict: {}".format(update_dict))
         self.db.set_one(
             db_table_name,
             q_filter=q_filter,
@@ -834,6 +905,7 @@ class TaskRegistry(LcmBase):
                 step = "Waiting for {} related tasks to be completed.".format(
                     new_num_related_tasks
                 )
+                self.logger.info("{}".format(step))
                 update_dict = {}
                 q_filter = {"_id": _id}
                 # NS/NSI
diff --git a/osm_lcm/odu_workflows.py b/osm_lcm/odu_workflows.py
new file mode 100644 (file)
index 0000000..273bf01
--- /dev/null
@@ -0,0 +1,48 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+
+import logging
+from osm_lcm.lcm_utils import LcmBase
+
+
+class OduWorkflow(LcmBase):
+    def __init__(self, msg, lcm_tasks, config):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+
+        self.logger = logging.getLogger("lcm.odu")
+        self.lcm_tasks = lcm_tasks
+        self.logger.info("Msg: {} lcm_tasks: {} ".format(msg, lcm_tasks))
+
+        super().__init__(msg, self.logger)
+
+    def launch_workflow(self, key, content):
+        self.logger.info(
+            f"Workflow is getting into launch. Key: {key}. Content: {content}"
+        )
+        return f"workflow-{key}-{content['_id']}"
+
+    def check_workflow_status(self, workflow_name):
+        self.logger.info(f"Check workflow status {workflow_name}")
+        return True, "OK"
+
+    def check_resource_status(self, key, content):
+        self.logger.info(f"Check resource status {key}: {content}")
+        return True, "OK"