From: Patricia Reinoso Date: Wed, 8 Mar 2023 17:13:56 +0000 (+0000) Subject: Add VIM create Workflow (exceptions) X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F98%2F12998%2F22;p=osm%2FLCM.git Add VIM create Workflow (exceptions) Change-Id: I4cbf9d1d5c3679ddd946fe4200fe9cd9c188c147 Signed-off-by: Patricia Reinoso Signed-off-by: Mark Beierl --- diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index ee758cb..6c6f196 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -24,11 +24,17 @@ import sys import yaml from osm_common.dbbase import DbException +from osm_common.temporal_constants import LCM_TASK_QUEUE from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.lcm_config import LcmCfg from osm_lcm.lcm_utils import LcmException from os import path -from temporalio import workflow +from osm_lcm.temporal.vim_activities import VimDbActivity, JujuPaasConnector +from osm_lcm.temporal.vim_workflows import ( + VimCreateWorkflow, + VimDeleteWorkflow, + VimUpdateWorkflow, +) from temporalio.client import Client from temporalio.worker import Worker @@ -114,37 +120,32 @@ class NGLcm: self.logger.critical("starting osm/nglcm") async def start(self): - # do some temporal stuff here temporal_api = ( f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}" ) - self.logger.info(f"Attempting to register with Temporal at {temporal_api}") client = await Client.connect(temporal_api) - - task_queue = "lcm-task-queue" - workflows = [ - Heartbeat, + data_activity_instance = VimDbActivity(self.db) + paas_connector_instance = JujuPaasConnector(self.db) + + workflows = [VimCreateWorkflow, VimDeleteWorkflow, VimUpdateWorkflow] + activities = [ + data_activity_instance.update_vim_operation_state, + data_activity_instance.update_vim_state, + data_activity_instance.delete_vim_record, + paas_connector_instance.test_vim_connectivity, ] - activities = [] worker = Worker( - client, task_queue=task_queue, workflows=workflows, activities=activities + client, + task_queue=LCM_TASK_QUEUE, + workflows=workflows, + activities=activities, ) - self.logger.info(f"Registered for queue {task_queue}") - self.logger.info(f"Registered workflows {workflows}") - self.logger.info(f"Registered activites {activities}") - + self.logger.info("Starting LCM temporal worker") await worker.run() -@workflow.defn -class Heartbeat: - @workflow.run - async def run(self) -> str: - return "alive" - - if __name__ == "__main__": try: opts, args = getopt.getopt( diff --git a/osm_lcm/temporal/__init__.py b/osm_lcm/temporal/__init__.py new file mode 100644 index 0000000..d36bef6 --- /dev/null +++ b/osm_lcm/temporal/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/osm_lcm/temporal/vim_activities.py b/osm_lcm/temporal/vim_activities.py new file mode 100644 index 0000000..badeae5 --- /dev/null +++ b/osm_lcm/temporal/vim_activities.py @@ -0,0 +1,213 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from temporalio import activity +from time import time +from n2vc.temporal_libjuju import ConnectionInfo, Libjuju +from osm_common.temporal_constants import ( + ACTIVITY_DELETE_VIM, + ACTIVITY_TEST_VIM_CONNECTIVITY, + ACTIVITY_UPDATE_VIM_OPERATION_STATE, + ACTIVITY_UPDATE_VIM_STATE, +) +from osm_common.dataclasses.temporal_dataclasses import ( + DeleteVimInput, + TestVimConnectivityInput, + UpdateVimOperationStateInput, + UpdateVimStateInput, +) + +activity.logger = logging.getLogger("lcm.temporal.vim_activities") + + +class JujuPaasConnector: + """Handles Juju Controller operations. + + Args: + db (object): Data Access Object + """ + + def __init__(self, db): + self.db = db + + def _decrypt_password(self, vim_content: dict) -> str: + """Decrypt a password. + vim_content (dict): VIM details as a dictionary + + Returns: + plain text password (str) + """ + return self.db.decrypt( + vim_content["vim_password"], + schema_version=vim_content["schema_version"], + salt=vim_content["_id"], + ) + + def _get_connection_info(self, vim_id: str) -> ConnectionInfo: + """Get VIM details from database using vim_id and returns + the Connection Info to connect Juju Controller. + + Args: + vim_id (str): VIM ID + + Returns: + ConnectionInfo (object) + """ + vim_content = self.db.get_one("vim_accounts", {"_id": vim_id}) + endpoint = vim_content["vim_url"] + username = vim_content["vim_user"] + vim_config = vim_content["config"] + cacert = vim_config["ca_cert_content"] + cloud_name = vim_config["cloud"] + cloud_credentials = vim_config["cloud_credentials"] + password = self._decrypt_password(vim_content) + return ConnectionInfo( + endpoint, username, password, cacert, cloud_name, cloud_credentials + ) + + @activity.defn(name=ACTIVITY_TEST_VIM_CONNECTIVITY) + async def test_vim_connectivity( + self, test_connectivity_input: TestVimConnectivityInput + ) -> None: + """Validates the credentials by attempting to connect to the given Juju Controller. + + Collaborators: + DB Read: vim_accounts + Juju Controller: Connect only + + Raises (Retryable): + ApplicationError If any of password, cacert, cloud_credentials is invalid + or Juju controller is not reachable + + Activity Lifecycle: + This activity should complete relatively quickly (in a few seconds). + However, it would be reasonable to wait more than 72 seconds (network timeout) + incase there are network issues. + + This activity will not report a heartbeat due to its + short-running nature. + + It is recommended, although not necessary to implement a + back-off strategy for this activity, as it will naturally block + and wait on each connection attempt. + """ + vim_id = test_connectivity_input.vim_uuid + controller = None + try: + connection_info = self._get_connection_info(vim_id) + libjuju = Libjuju(connection_info) + controller = await libjuju.get_controller() + message = f"Connection to juju controller succeeded for {vim_id}" + activity.logger.info(message) + finally: + await libjuju.disconnect_controller(controller) + + +class VimDbActivity: + """Perform Database operations for VIM accounts. + + Args: + db (object): Data Access Object + """ + + def __init__(self, db): + self.db = db + + @activity.defn(name=ACTIVITY_UPDATE_VIM_STATE) + async def update_vim_state(self, data: UpdateVimStateInput) -> None: + """ + Changes the state of the VIM itself. Should be either + ENABLED or ERROR, however this activity does not validate + the state as no validation was done in OSM previously. + + Collaborators: + DB Write: vim_accounts + + Raises (Retryable): + DbException If the target DB record does not exist or DB is not reachable. + + Activity Lifecycle: + This activity will not report a heartbeat due to its + short-running nature. + + As this is a direct DB update, it is not recommended to have + any specific retry policy + """ + update_vim_state = { + "_admin.operationalState": data.operational_state.name, + "_admin.detailed-status": data.message, + "_admin.modified": time(), + } + + self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_vim_state) + activity.logger.debug( + f"Updated VIM {data.vim_uuid} to {data.operational_state.name}" + ) + + @activity.defn(name=ACTIVITY_UPDATE_VIM_OPERATION_STATE) + async def update_vim_operation_state( + self, data: UpdateVimOperationStateInput + ) -> None: + """ + Changes the state of a VIM operation task. Should be done to + indicate progress, or completion of the task itself. + + Collaborators: + DB Write: vim_accounts + + Raises (Retryable): + DbException If the target DB record does not exist or DB is not reachable. + + Activity Lifecycle: + This activity will not report a heartbeat due to its + short-running nature. + + As this is a direct DB update, it is not recommended to have + any specific retry policy + """ + update_operation_state = { + f"_admin.operations.{format(data.op_id)}.operationState": data.op_state.name, + f"_admin.operations.{format(data.op_id)}.detailed-status": data.message, + "_admin.current_operation": None, + } + + self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_operation_state) + activity.logger.debug( + f"Updated VIM {data.vim_uuid} OP ID {data.op_id} to {data.op_state.name}" + ) + + @activity.defn(name=ACTIVITY_DELETE_VIM) + async def delete_vim_record(self, data: DeleteVimInput) -> None: + """ + Deletes the VIM record from the database. + + Collaborators: + DB Delete: vim_accounts + + Raises (Retryable): + DbException If the target DB record does not exist or DB is not reachable. + + Activity Lifecycle: + This activity will not report a heartbeat due to its + short-running nature. + + As this is a direct DB update, it is not recommended to have + any specific retry policy + """ + + self.db.del_one("vim_accounts", {"_id": data.vim_uuid}) + activity.logger.debug(f"Removed VIM {data.vim_uuid}") diff --git a/osm_lcm/temporal/vim_workflows.py b/osm_lcm/temporal/vim_workflows.py new file mode 100644 index 0000000..33ab565 --- /dev/null +++ b/osm_lcm/temporal/vim_workflows.py @@ -0,0 +1,134 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta +import logging +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.exceptions import ActivityError + +from osm_common.dataclasses.temporal_dataclasses import ( + DeleteVimInput, + TestVimConnectivityInput, + UpdateVimOperationStateInput, + UpdateVimStateInput, + VimOperationInput, + VimState, + VimOperationState, +) +from osm_common.temporal_constants import ( + ACTIVITY_TEST_VIM_CONNECTIVITY, + ACTIVITY_UPDATE_VIM_OPERATION_STATE, + ACTIVITY_UPDATE_VIM_STATE, + ACTIVITY_DELETE_VIM, + LCM_TASK_QUEUE, + WORKFLOW_VIM_CREATE, + WORKFLOW_VIM_UPDATE, + WORKFLOW_VIM_DELETE, +) + +_SANDBOXED = False +retry_policy = RetryPolicy(maximum_attempts=3) +default_schedule_to_close_timeout = timedelta(minutes=10) + +workflow.logger = logging.getLogger("lcm.temporal.vim_workflows") + + +@workflow.defn(name=WORKFLOW_VIM_CREATE, sandboxed=_SANDBOXED) +class VimCreateWorkflow: + """Updates VIM account state by validating the VIM connectivity.""" + + @workflow.run + async def run(self, input: VimOperationInput) -> None: + vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ENABLED, "Done") + op_state = UpdateVimOperationStateInput( + input.vim_uuid, input.op_id, VimOperationState.COMPLETED, "Done" + ) + try: + await workflow.execute_activity( + activity=ACTIVITY_TEST_VIM_CONNECTIVITY, + arg=TestVimConnectivityInput(input.vim_uuid), + activity_id=f"{ACTIVITY_TEST_VIM_CONNECTIVITY}-{input.vim_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + except ActivityError as e: + vim_state = UpdateVimStateInput( + input.vim_uuid, VimState.ERROR, e.cause.message + ) + op_state = UpdateVimOperationStateInput( + input.vim_uuid, input.op_id, VimOperationState.FAILED, e.cause.message + ) + + workflow.logger.error( + f"{WORKFLOW_VIM_CREATE} failed with {str(e.cause.message)}" + ) + raise e + + except Exception as e: + vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ERROR, str(e)) + op_state = UpdateVimOperationStateInput( + input.vim_uuid, input.op_id, VimOperationState.FAILED, str(e) + ) + + workflow.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {str(e)}") + raise e + + finally: + await workflow.execute_activity( + activity=ACTIVITY_UPDATE_VIM_STATE, + arg=vim_state, + activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + await workflow.execute_activity( + activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE, + arg=op_state, + activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{input.vim_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + +@workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED) +class VimUpdateWorkflow(VimCreateWorkflow): + """Updates VIM account state by validating the VIM connectivity.""" + + @workflow.run + async def run(self, input: VimOperationInput) -> None: + await super().run(input) + + +@workflow.defn(name=WORKFLOW_VIM_DELETE, sandboxed=_SANDBOXED) +class VimDeleteWorkflow: + """Deletes VIM accounts.""" + + @workflow.run + async def run(self, input: VimOperationInput) -> None: + await workflow.execute_activity( + activity=ACTIVITY_DELETE_VIM, + arg=DeleteVimInput(input.vim_uuid), + activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + )