import traceback
from abc import ABC, abstractmethod
from datetime import timedelta
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError, ChildWorkflowError
from osm_common.dataclasses.temporal_dataclasses import (
LcmOperationState,
ACTIVITY_UPDATE_LCM_OPERATION_STATE,
WORKFLOW_NSLCM_NO_OP,
)
-from temporalio import workflow
-from temporalio.common import RetryPolicy
-from temporalio.exceptions import ActivityError
class LcmOperationWorkflow(ABC):
await self.update_operation_state(LcmOperationState.PROCESSING)
try:
await self.workflow(input=input)
+
except ActivityError as e:
- # TODO: Deteremine the best content for Activity Errors OSM-994
- self.logger.exception(e)
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ self.logger.error(err_details)
await self.update_operation_state(
LcmOperationState.FAILED,
error_message=str(e.cause.message),
- detailed_status=str(e.cause.with_traceback(e.__traceback__)),
+ detailed_status=err_details,
)
raise e
+
+ except ChildWorkflowError as e:
+ err_details = str(e.cause.cause.cause.with_traceback(e.cause.__traceback__))
+ self.logger.error(err_details)
+ await self.update_operation_state(
+ LcmOperationState.FAILED,
+ error_message=str(e.cause.cause.message),
+ detailed_status=err_details,
+ )
+ raise e
+
except Exception as e:
- self.logger.exception(e)
- self.update_operation_state(
+ err_details = str(traceback.format_exc())
+ self.logger.error(err_details)
+ await self.update_operation_state(
LcmOperationState.FAILED,
error_message=str(e),
- detailed_status=traceback.format_exc(),
+ detailed_status=err_details,
)
raise e
+
await self.update_operation_state(LcmOperationState.COMPLETED)
async def update_operation_state(
# limitations under the License.
import asyncio
+from temporalio import workflow
+from temporalio.converter import value_to_type
+from temporalio.exceptions import ActivityError, ChildWorkflowError
+import traceback
from osm_common.dataclasses.temporal_dataclasses import (
GetVnfRecordIdsInput,
WORKFLOW_NS_INSTANTIATE,
WORKFLOW_VNF_INSTANTIATE,
)
-from temporalio import workflow
-from temporalio.converter import value_to_type
-from temporalio.exceptions import ActivityError
-
from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow
)
except ActivityError as e:
- await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, e.cause.message)
- self.logger.error(
- f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e.cause.message)}"
- )
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, err_details)
+ self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {err_details}")
+ raise e
+
+ except ChildWorkflowError as e:
+ err_details = str(e.cause.cause.cause.with_traceback(e.cause.__traceback__))
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, err_details)
+ self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {err_details}")
raise e
except Exception as e:
- await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, str(e))
- self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e)}")
+ err_details = str(traceback.format_exc())
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, err_details)
+ self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {err_details}")
raise e
+
await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, "Done")
@staticmethod
import logging
from temporalio import workflow
from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError
+import traceback
from osm_common.dataclasses.temporal_dataclasses import (
VduInstantiateInput,
self.logger.info(f"VDU `{input.charm_info.app_name}` is ready")
+ except ActivityError as e:
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ self.logger.error(f"{WORKFLOW_VDU_INSTANTIATE} failed with {err_details}")
+ raise e
+
except Exception as e:
- self.logger.error(f"{WORKFLOW_VDU_INSTANTIATE} failed with {str(e)}")
+ err_details = str(traceback.format_exc())
+ self.logger.error(f"{WORKFLOW_VDU_INSTANTIATE} failed with {err_details}")
raise e
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError
+import traceback
from osm_common.dataclasses.temporal_dataclasses import (
DeleteVimInput,
retry_policy = RetryPolicy(maximum_attempts=3)
default_schedule_to_close_timeout = timedelta(minutes=10)
-workflow.logger = logging.getLogger("lcm.temporal.vim_workflows")
-
@workflow.defn(name=WORKFLOW_VIM_CREATE, sandboxed=_SANDBOXED)
class VimCreateWorkflow:
"""Updates VIM account state by validating the VIM connectivity."""
+ def __init__(self):
+ self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
+
@workflow.run
async def run(self, input: VimOperationInput) -> None:
vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ENABLED, "Done")
)
except ActivityError as e:
+ error_details = str(e.cause.with_traceback(e.__traceback__))
vim_state = UpdateVimStateInput(
- input.vim_uuid, VimState.ERROR, e.cause.message
+ input.vim_uuid, VimState.ERROR, str(e.cause.message)
)
op_state = UpdateVimOperationStateInput(
- input.vim_uuid, input.op_id, VimOperationState.FAILED, e.cause.message
- )
-
- workflow.logger.error(
- f"{WORKFLOW_VIM_CREATE} failed with {str(e.cause.message)}"
+ input.vim_uuid,
+ input.op_id,
+ VimOperationState.FAILED,
+ error_details,
)
+ await self.update_states(op_state, vim_state)
+ self.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {error_details}")
raise e
except Exception as e:
+ error_details = str(traceback.format_exc())
vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ERROR, str(e))
op_state = UpdateVimOperationStateInput(
- input.vim_uuid, input.op_id, VimOperationState.FAILED, str(e)
+ input.vim_uuid,
+ input.op_id,
+ VimOperationState.FAILED,
+ error_details,
)
-
- workflow.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {str(e)}")
+ await self.update_states(op_state, vim_state)
+ self.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {error_details}")
raise e
- finally:
- await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_VIM_STATE,
- arg=vim_state,
- activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=default_schedule_to_close_timeout,
- retry_policy=retry_policy,
- )
+ async def update_states(
+ self,
+ op_state: UpdateVimOperationStateInput,
+ vim_state: UpdateVimStateInput,
+ ):
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_VIM_STATE,
+ arg=vim_state,
+ activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
- await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
- arg=op_state,
- activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{input.vim_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=default_schedule_to_close_timeout,
- retry_policy=retry_policy,
- )
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+ arg=op_state,
+ activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
@workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED)
activity, the operation is idempotent.
"""
- update_vnf_state = {"vnfState": vnf_state_input.state}
+ update_vnf_state = {"vnfState": vnf_state_input.state.name}
self.db.set_one("vnfrs", {"_id": vnf_state_input.vnfr_uuid}, update_vnf_state)
self.logger.debug(
- f"VNF {vnf_state_input.vnfr_uuid} state is updated to {vnf_state_input.state}."
+ f"VNF {vnf_state_input.vnfr_uuid} state is updated to {vnf_state_input.state.name}."
)
@activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE)
"""
update_vnf_instantiation_state = {
- "instantiationState": vnf_instantiation_state_input.state
+ "instantiationState": vnf_instantiation_state_input.state.name
}
self.db.set_one(
"vnfrs",
update_vnf_instantiation_state,
)
self.logger.debug(
- f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state}."
+ f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state.name}."
)
@activity.defn(name=ACTIVITY_SET_VNF_MODEL)
from temporalio import workflow
from temporalio.converter import value_to_type
from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError, ChildWorkflowError
+import traceback
from osm_common.dataclasses.temporal_dataclasses import (
- VnfInstantiationState,
- VnfState,
- VnfInstantiateInput,
- VduInstantiateInput,
ChangeVnfInstantiationStateInput,
ChangeVnfStateInput,
GetTaskQueueInput,
GetTaskQueueOutput,
GetVnfDetailsInput,
GetVnfDetailsOutput,
+ VduComputeConstraints,
+ VduInstantiateInput,
+ VnfInstantiateInput,
+ VnfInstantiationState,
+ VnfState,
)
from osm_common.temporal_constants import (
),
)
+ except ActivityError as e:
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ await self.update_states(
+ vnf_instantiation_state=vnf_instantiation_state,
+ vnf_state=vnf_state,
+ )
+ self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
+ raise e
+
+ except ChildWorkflowError as e:
+ err_details = str(e.cause.cause.with_traceback(e.cause.__traceback__))
+ await self.update_states(
+ vnf_instantiation_state=vnf_instantiation_state,
+ vnf_state=vnf_state,
+ )
+ self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
+ raise e
+
except Exception as e:
- workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}")
+ err_details = str(traceback.format_exc())
await self.update_states(
vnf_instantiation_state=vnf_instantiation_state,
vnf_state=vnf_state,
)
+ self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
raise e
@staticmethod
vim_id = vnfr.get("vim-account-id")
sw_image_descs = vnfd.get("sw-image-desc")
vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
- vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info, None)
+ compute_constraints = VduComputeConstraints(cores=0, mem=0)
+ vdu_instantiate_input = VduInstantiateInput(
+ vim_id, model_name, vdu_info, compute_constraints, "cloud"
+ )
vdu_instantiate_workflow_id = (
vdu_instantiate_input.model_name
+ "-"
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
+ except ActivityError as e:
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ self.logger.error(f"{WORKFLOW_VNF_PREPARE} failed with {err_details}")
+ raise e
+
except Exception as e:
- self.logger.error(f"{WORKFLOW_VNF_PREPARE} failed with {str(e)}")
+ err_details = str(traceback.format_exc())
+ self.logger.error(f"{WORKFLOW_VNF_PREPARE} failed with {err_details}")
raise e
channel = "latest"
entity_url = "ch:my-charm"
charm_info = CharmInfo(app_name, channel, entity_url)
- constraints = VduComputeConstraints(1, 2, "")
+ constraints = VduComputeConstraints(1, 2)
vdu_instantiate_input = VduInstantiateInput(
- vim_content["_id"], namespace, charm_info, constraints
+ vim_content["_id"], namespace, charm_info, constraints, cloud=""
)
async def test_deploy_charm_nominal_case(self):
channel = "latest"
entity_url = "ch:my-charm"
cloud = "microk8s"
- constraints = VduComputeConstraints(mem=1, cores=1, cloud=cloud)
+ constraints = VduComputeConstraints(mem=1, cores=1)
charm_info = CharmInfo(app_name, channel, entity_url)
vdu_instantiate_input = VduInstantiateInput(
- vim_id, namespace, charm_info, constraints
+ vim_id, namespace, charm_info, constraints, cloud
)
worflow_id = namespace + "-" + app_name
charm_info=CharmInfo(
app_name="my-app", channel="latest", entity_url="my-url"
),
- constraints=VduComputeConstraints(cores=1, mem=1, cloud=cloud),
+ constraints=VduComputeConstraints(cores=1, mem=1),
+ cloud=cloud,
),
"vdu_instantiate_workflow_id",
)
-r{toxinidir}/requirements-test.txt
pylint
commands =
- pylint -E osm_lcm --extension-pkg-whitelist=pydantic # issue with pydantic (https://github.com/pydantic/pydantic/issues/1961)
+ pylint -E osm_lcm --extension-pkg-whitelist=pydantic --disable=E1101 # issue with pydantic (https://github.com/pydantic/pydantic/issues/1961)
#######################################################################################