Add VIM create Workflow (exceptions) 98/12998/22
authorPatricia Reinoso <patricia.reinoso@canonical.com>
Wed, 8 Mar 2023 17:13:56 +0000 (17:13 +0000)
committerMark Beierl <mark.beierl@canonical.com>
Wed, 29 Mar 2023 17:43:14 +0000 (17:43 +0000)
Change-Id: I4cbf9d1d5c3679ddd946fe4200fe9cd9c188c147
Signed-off-by: Patricia Reinoso <patricia.reinoso@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
osm_lcm/nglcm.py
osm_lcm/temporal/__init__.py [new file with mode: 0644]
osm_lcm/temporal/vim_activities.py [new file with mode: 0644]
osm_lcm/temporal/vim_workflows.py [new file with mode: 0644]

index ee758cb..6c6f196 100644 (file)
@@ -24,11 +24,17 @@ import sys
 import yaml
 
 from osm_common.dbbase import DbException
+from osm_common.temporal_constants import LCM_TASK_QUEUE
 from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.lcm_config import LcmCfg
 from osm_lcm.lcm_utils import LcmException
 from os import path
-from temporalio import workflow
+from osm_lcm.temporal.vim_activities import VimDbActivity, JujuPaasConnector
+from osm_lcm.temporal.vim_workflows import (
+    VimCreateWorkflow,
+    VimDeleteWorkflow,
+    VimUpdateWorkflow,
+)
 from temporalio.client import Client
 from temporalio.worker import Worker
 
@@ -114,37 +120,32 @@ class NGLcm:
         self.logger.critical("starting osm/nglcm")
 
     async def start(self):
-        # do some temporal stuff here
         temporal_api = (
             f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}"
         )
-        self.logger.info(f"Attempting to register with Temporal at {temporal_api}")
         client = await Client.connect(temporal_api)
-
-        task_queue = "lcm-task-queue"
-        workflows = [
-            Heartbeat,
+        data_activity_instance = VimDbActivity(self.db)
+        paas_connector_instance = JujuPaasConnector(self.db)
+
+        workflows = [VimCreateWorkflow, VimDeleteWorkflow, VimUpdateWorkflow]
+        activities = [
+            data_activity_instance.update_vim_operation_state,
+            data_activity_instance.update_vim_state,
+            data_activity_instance.delete_vim_record,
+            paas_connector_instance.test_vim_connectivity,
         ]
-        activities = []
 
         worker = Worker(
-            client, task_queue=task_queue, workflows=workflows, activities=activities
+            client,
+            task_queue=LCM_TASK_QUEUE,
+            workflows=workflows,
+            activities=activities,
         )
 
-        self.logger.info(f"Registered for queue {task_queue}")
-        self.logger.info(f"Registered workflows {workflows}")
-        self.logger.info(f"Registered activites {activities}")
-
+        self.logger.info("Starting LCM temporal worker")
         await worker.run()
 
 
-@workflow.defn
-class Heartbeat:
-    @workflow.run
-    async def run(self) -> str:
-        return "alive"
-
-
 if __name__ == "__main__":
     try:
         opts, args = getopt.getopt(
diff --git a/osm_lcm/temporal/__init__.py b/osm_lcm/temporal/__init__.py
new file mode 100644 (file)
index 0000000..d36bef6
--- /dev/null
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/osm_lcm/temporal/vim_activities.py b/osm_lcm/temporal/vim_activities.py
new file mode 100644 (file)
index 0000000..badeae5
--- /dev/null
@@ -0,0 +1,213 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from temporalio import activity
+from time import time
+from n2vc.temporal_libjuju import ConnectionInfo, Libjuju
+from osm_common.temporal_constants import (
+    ACTIVITY_DELETE_VIM,
+    ACTIVITY_TEST_VIM_CONNECTIVITY,
+    ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+    ACTIVITY_UPDATE_VIM_STATE,
+)
+from osm_common.dataclasses.temporal_dataclasses import (
+    DeleteVimInput,
+    TestVimConnectivityInput,
+    UpdateVimOperationStateInput,
+    UpdateVimStateInput,
+)
+
+activity.logger = logging.getLogger("lcm.temporal.vim_activities")
+
+
+class JujuPaasConnector:
+    """Handles Juju Controller operations.
+
+    Args:
+        db  (object):       Data Access Object
+    """
+
+    def __init__(self, db):
+        self.db = db
+
+    def _decrypt_password(self, vim_content: dict) -> str:
+        """Decrypt a password.
+            vim_content     (dict):     VIM details as a dictionary
+
+        Returns:
+            plain text password (str)
+        """
+        return self.db.decrypt(
+            vim_content["vim_password"],
+            schema_version=vim_content["schema_version"],
+            salt=vim_content["_id"],
+        )
+
+    def _get_connection_info(self, vim_id: str) -> ConnectionInfo:
+        """Get VIM details from database using vim_id and returns
+         the Connection Info to connect Juju Controller.
+
+        Args:
+            vim_id  (str):      VIM ID
+
+        Returns:
+            ConnectionInfo  (object)
+        """
+        vim_content = self.db.get_one("vim_accounts", {"_id": vim_id})
+        endpoint = vim_content["vim_url"]
+        username = vim_content["vim_user"]
+        vim_config = vim_content["config"]
+        cacert = vim_config["ca_cert_content"]
+        cloud_name = vim_config["cloud"]
+        cloud_credentials = vim_config["cloud_credentials"]
+        password = self._decrypt_password(vim_content)
+        return ConnectionInfo(
+            endpoint, username, password, cacert, cloud_name, cloud_credentials
+        )
+
+    @activity.defn(name=ACTIVITY_TEST_VIM_CONNECTIVITY)
+    async def test_vim_connectivity(
+        self, test_connectivity_input: TestVimConnectivityInput
+    ) -> None:
+        """Validates the credentials by attempting to connect to the given Juju Controller.
+
+        Collaborators:
+            DB Read:            vim_accounts
+            Juju Controller:    Connect only
+
+        Raises  (Retryable):
+            ApplicationError    If any of password, cacert, cloud_credentials is invalid
+                                or Juju controller is not reachable
+
+        Activity Lifecycle:
+            This activity should complete relatively quickly (in a few seconds).
+            However, it would be reasonable to wait more than 72 seconds (network timeout)
+            incase there are network issues.
+
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            It is recommended, although not necessary to implement a
+            back-off strategy for this activity, as it will naturally block
+            and wait on each connection attempt.
+        """
+        vim_id = test_connectivity_input.vim_uuid
+        controller = None
+        try:
+            connection_info = self._get_connection_info(vim_id)
+            libjuju = Libjuju(connection_info)
+            controller = await libjuju.get_controller()
+            message = f"Connection to juju controller succeeded for {vim_id}"
+            activity.logger.info(message)
+        finally:
+            await libjuju.disconnect_controller(controller)
+
+
+class VimDbActivity:
+    """Perform Database operations for VIM accounts.
+
+    Args:
+        db  (object):       Data Access Object
+    """
+
+    def __init__(self, db):
+        self.db = db
+
+    @activity.defn(name=ACTIVITY_UPDATE_VIM_STATE)
+    async def update_vim_state(self, data: UpdateVimStateInput) -> None:
+        """
+        Changes the state of the VIM itself.  Should be either
+        ENABLED or ERROR, however this activity does not validate
+        the state as no validation was done in OSM previously.
+
+        Collaborators:
+            DB Write:           vim_accounts
+
+        Raises  (Retryable):
+            DbException         If the target DB record does not exist or DB is not reachable.
+
+        Activity Lifecycle:
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            As this is a direct DB update, it is not recommended to have
+            any specific retry policy
+        """
+        update_vim_state = {
+            "_admin.operationalState": data.operational_state.name,
+            "_admin.detailed-status": data.message,
+            "_admin.modified": time(),
+        }
+
+        self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_vim_state)
+        activity.logger.debug(
+            f"Updated VIM {data.vim_uuid} to {data.operational_state.name}"
+        )
+
+    @activity.defn(name=ACTIVITY_UPDATE_VIM_OPERATION_STATE)
+    async def update_vim_operation_state(
+        self, data: UpdateVimOperationStateInput
+    ) -> None:
+        """
+        Changes the state of a VIM operation task.  Should be done to
+        indicate progress, or completion of the task itself.
+
+        Collaborators:
+            DB Write:           vim_accounts
+
+        Raises  (Retryable):
+            DbException         If the target DB record does not exist or DB is not reachable.
+
+        Activity Lifecycle:
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            As this is a direct DB update, it is not recommended to have
+            any specific retry policy
+        """
+        update_operation_state = {
+            f"_admin.operations.{format(data.op_id)}.operationState": data.op_state.name,
+            f"_admin.operations.{format(data.op_id)}.detailed-status": data.message,
+            "_admin.current_operation": None,
+        }
+
+        self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_operation_state)
+        activity.logger.debug(
+            f"Updated VIM {data.vim_uuid} OP ID {data.op_id} to {data.op_state.name}"
+        )
+
+    @activity.defn(name=ACTIVITY_DELETE_VIM)
+    async def delete_vim_record(self, data: DeleteVimInput) -> None:
+        """
+        Deletes the VIM record from the database.
+
+        Collaborators:
+            DB Delete:           vim_accounts
+
+        Raises  (Retryable):
+            DbException         If the target DB record does not exist or DB is not reachable.
+
+        Activity Lifecycle:
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            As this is a direct DB update, it is not recommended to have
+            any specific retry policy
+        """
+
+        self.db.del_one("vim_accounts", {"_id": data.vim_uuid})
+        activity.logger.debug(f"Removed VIM {data.vim_uuid}")
diff --git a/osm_lcm/temporal/vim_workflows.py b/osm_lcm/temporal/vim_workflows.py
new file mode 100644 (file)
index 0000000..33ab565
--- /dev/null
@@ -0,0 +1,134 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta
+import logging
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError
+
+from osm_common.dataclasses.temporal_dataclasses import (
+    DeleteVimInput,
+    TestVimConnectivityInput,
+    UpdateVimOperationStateInput,
+    UpdateVimStateInput,
+    VimOperationInput,
+    VimState,
+    VimOperationState,
+)
+from osm_common.temporal_constants import (
+    ACTIVITY_TEST_VIM_CONNECTIVITY,
+    ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+    ACTIVITY_UPDATE_VIM_STATE,
+    ACTIVITY_DELETE_VIM,
+    LCM_TASK_QUEUE,
+    WORKFLOW_VIM_CREATE,
+    WORKFLOW_VIM_UPDATE,
+    WORKFLOW_VIM_DELETE,
+)
+
+_SANDBOXED = False
+retry_policy = RetryPolicy(maximum_attempts=3)
+default_schedule_to_close_timeout = timedelta(minutes=10)
+
+workflow.logger = logging.getLogger("lcm.temporal.vim_workflows")
+
+
+@workflow.defn(name=WORKFLOW_VIM_CREATE, sandboxed=_SANDBOXED)
+class VimCreateWorkflow:
+    """Updates VIM account state by validating the VIM connectivity."""
+
+    @workflow.run
+    async def run(self, input: VimOperationInput) -> None:
+        vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ENABLED, "Done")
+        op_state = UpdateVimOperationStateInput(
+            input.vim_uuid, input.op_id, VimOperationState.COMPLETED, "Done"
+        )
+        try:
+            await workflow.execute_activity(
+                activity=ACTIVITY_TEST_VIM_CONNECTIVITY,
+                arg=TestVimConnectivityInput(input.vim_uuid),
+                activity_id=f"{ACTIVITY_TEST_VIM_CONNECTIVITY}-{input.vim_uuid}",
+                task_queue=LCM_TASK_QUEUE,
+                schedule_to_close_timeout=default_schedule_to_close_timeout,
+                retry_policy=retry_policy,
+            )
+
+        except ActivityError as e:
+            vim_state = UpdateVimStateInput(
+                input.vim_uuid, VimState.ERROR, e.cause.message
+            )
+            op_state = UpdateVimOperationStateInput(
+                input.vim_uuid, input.op_id, VimOperationState.FAILED, e.cause.message
+            )
+
+            workflow.logger.error(
+                f"{WORKFLOW_VIM_CREATE} failed with {str(e.cause.message)}"
+            )
+            raise e
+
+        except Exception as e:
+            vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ERROR, str(e))
+            op_state = UpdateVimOperationStateInput(
+                input.vim_uuid, input.op_id, VimOperationState.FAILED, str(e)
+            )
+
+            workflow.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {str(e)}")
+            raise e
+
+        finally:
+            await workflow.execute_activity(
+                activity=ACTIVITY_UPDATE_VIM_STATE,
+                arg=vim_state,
+                activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}",
+                task_queue=LCM_TASK_QUEUE,
+                schedule_to_close_timeout=default_schedule_to_close_timeout,
+                retry_policy=retry_policy,
+            )
+
+            await workflow.execute_activity(
+                activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+                arg=op_state,
+                activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{input.vim_uuid}",
+                task_queue=LCM_TASK_QUEUE,
+                schedule_to_close_timeout=default_schedule_to_close_timeout,
+                retry_policy=retry_policy,
+            )
+
+
+@workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED)
+class VimUpdateWorkflow(VimCreateWorkflow):
+    """Updates VIM account state by validating the VIM connectivity."""
+
+    @workflow.run
+    async def run(self, input: VimOperationInput) -> None:
+        await super().run(input)
+
+
+@workflow.defn(name=WORKFLOW_VIM_DELETE, sandboxed=_SANDBOXED)
+class VimDeleteWorkflow:
+    """Deletes VIM accounts."""
+
+    @workflow.run
+    async def run(self, input: VimOperationInput) -> None:
+        await workflow.execute_activity(
+            activity=ACTIVITY_DELETE_VIM,
+            arg=DeleteVimInput(input.vim_uuid),
+            activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}",
+            task_queue=LCM_TASK_QUEUE,
+            schedule_to_close_timeout=default_schedule_to_close_timeout,
+            retry_policy=retry_policy,
+        )