From: Dario Faccin Date: Fri, 14 Apr 2023 08:19:07 +0000 (+0200) Subject: OSMENG-988: Create ACTIVITY_SEND_NOTIFICATION_FOR_NF X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F99%2F13199%2F3;p=osm%2FLCM.git OSMENG-988: Create ACTIVITY_SEND_NOTIFICATION_FOR_NF Change-Id: I3238f9c2486c052caa614079c5c680b90d7f6f3c Signed-off-by: Dario Faccin --- diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index 6627bc8..781a5bd 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -41,7 +41,11 @@ from osm_lcm.temporal.vim_workflows import ( ) from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow -from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations +from osm_lcm.temporal.vnf_activities import ( + VnfDbActivity, + VnfOperations, + VnfSendNotifications, +) from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow from osm_lcm.temporal.ns_activities import NsOperations, NsDbActivity from temporalio.client import Client @@ -141,6 +145,7 @@ class NGLcm: vim_data_activity_instance = VimDbActivity(self.db) vnf_data_activity_instance = VnfDbActivity(self.db) vnf_operation_instance = VnfOperations(self.db) + vnf_send_notifications_instance = VnfSendNotifications() workflows = [ NsInstantiateWorkflow, @@ -167,9 +172,9 @@ class NGLcm: vim_data_activity_instance.update_vim_state, vim_data_activity_instance.delete_vim_record, vnf_operation_instance.get_task_queue, - vnf_data_activity_instance.change_nf_state, - vnf_data_activity_instance.change_nf_instantiation_state, - vnf_data_activity_instance.change_nf_notification_state, + vnf_data_activity_instance.change_vnf_state, + vnf_data_activity_instance.change_vnf_instantiation_state, + vnf_send_notifications_instance.send_notification_for_vnf, ] # Check if we are running under a debugger diff --git a/osm_lcm/temporal/vnf_activities.py b/osm_lcm/temporal/vnf_activities.py index 2602eeb..3e2155b 100644 --- a/osm_lcm/temporal/vnf_activities.py +++ b/osm_lcm/temporal/vnf_activities.py @@ -17,15 +17,15 @@ import logging from temporalio import activity from osm_common.temporal_constants import ( - ACTIVITY_CHANGE_NF_STATE, - ACTIVITY_CHANGE_NF_INSTANTIATION_STATE, - ACTIVITY_CHANGE_NF_NOTIFICATION_STATE, + ACTIVITY_CHANGE_VNF_STATE, + ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE, + ACTIVITY_SEND_NOTIFICATION_FOR_VNF, ACTIVITY_GET_TASK_QUEUE, VIM_TYPE_TASK_QUEUE_MAPPINGS, ) from osm_common.dataclasses.temporal_dataclasses import ( - ChangeNFInstantiationStateInput, - ChangeNFStateInput, + ChangeVnfInstantiationStateInput, + ChangeVnfStateInput, GetTaskQueueInput, GetTaskQueueOutput, ) @@ -79,8 +79,8 @@ class VnfDbActivity: self.db = db self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") - @activity.defn(name=ACTIVITY_CHANGE_NF_STATE) - async def change_nf_state(self, nf_state_input: ChangeNFStateInput) -> None: + @activity.defn(name=ACTIVITY_CHANGE_VNF_STATE) + async def change_vnf_state(self, vnf_state_input: ChangeVnfStateInput) -> None: """Updates the VNF State in VNFR. Collaborators: @@ -101,15 +101,15 @@ class VnfDbActivity: activity, the operation is idempotent. """ - update_nf_state = {"vnfState": nf_state_input.nf_state} - self.db.set_one("vnfrs", {"_id": nf_state_input.vnfr_uuid}, update_nf_state) + update_vnf_state = {"vnfState": vnf_state_input.state} + self.db.set_one("vnfrs", {"_id": vnf_state_input.vnfr_uuid}, update_vnf_state) self.logger.debug( - f"VNF {nf_state_input.vnfr_uuid} state is updated to {nf_state_input.nf_state}." + f"VNF {vnf_state_input.vnfr_uuid} state is updated to {vnf_state_input.state}." ) - @activity.defn(name=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE) - async def change_nf_instantiation_state( - self, nf_instantiation_state_input: ChangeNFInstantiationStateInput + @activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE) + async def change_vnf_instantiation_state( + self, vnf_instantiation_state_input: ChangeVnfInstantiationStateInput ) -> None: """Updates the VNF Instantiation State in VNFR. @@ -131,23 +131,30 @@ class VnfDbActivity: activity, the operation is idempotent. """ - update_nf_instantiation_state = { - "vnfState": nf_instantiation_state_input.nf_instantiation_state + update_vnf_instantiation_state = { + "vnfState": vnf_instantiation_state_input.state } self.db.set_one( "vnfrs", - {"_id": nf_instantiation_state_input.vnfr_uuid}, - update_nf_instantiation_state, + {"_id": vnf_instantiation_state_input.vnfr_uuid}, + update_vnf_instantiation_state, ) self.logger.debug( - f"VNF {nf_instantiation_state_input.vnfr_uuid} state is updated to {nf_instantiation_state_input.nf_instantiation_state}." + f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state}." ) - @activity.defn(name=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE) - async def change_nf_notification_state(self) -> None: + +class VnfSendNotifications: + """Perform Notification operations.""" + + def __init__(self): + self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") + + @activity.defn(name=ACTIVITY_SEND_NOTIFICATION_FOR_VNF) + async def send_notification_for_vnf(self) -> None: """If VNF LCM operation state changes, send notification updates. This activity does nothing. """ - pass + self.logger.debug("Send notification for VNF not implemented.") diff --git a/osm_lcm/temporal/vnf_workflows.py b/osm_lcm/temporal/vnf_workflows.py index dc2023c..4521889 100644 --- a/osm_lcm/temporal/vnf_workflows.py +++ b/osm_lcm/temporal/vnf_workflows.py @@ -24,16 +24,16 @@ from osm_common.dataclasses.temporal_dataclasses import ( VnfState, VnfInstantiateInput, VduInstantiateInput, - ChangeNFInstantiationStateInput, - ChangeNFStateInput, + ChangeVnfInstantiationStateInput, + ChangeVnfStateInput, GetTaskQueueInput, PrepareVnfInput, ) from osm_common.temporal_constants import ( - ACTIVITY_CHANGE_NF_INSTANTIATION_STATE, - ACTIVITY_CHANGE_NF_NOTIFICATION_STATE, - ACTIVITY_CHANGE_NF_STATE, + ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE, + ACTIVITY_SEND_NOTIFICATION_FOR_VNF, + ACTIVITY_CHANGE_VNF_STATE, ACTIVITY_GET_TASK_QUEUE, LCM_TASK_QUEUE, WORKFLOW_VDU_INSTANTIATE, @@ -58,24 +58,23 @@ class VnfInstantiateWorkflow: def __init__(self): self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}") - self.nf_instantiation_state = ChangeNFInstantiationStateInput( - None, VnfInstantiationState.NOT_INSTANTIATED + self.vnf_instantiation_state = ChangeVnfInstantiationStateInput( + "", VnfInstantiationState.NOT_INSTANTIATED ) - self.nf_state = ChangeNFStateInput(None, VnfState.STOPPED) + self.vnf_state = ChangeVnfStateInput("", VnfState.STOPPED) @workflow.run async def run(self, input: VnfInstantiateInput) -> None: try: - self.nf_state.vnfr_uuid = ( - self.nf_instantiation_state.vnfr_uuid + self.vnf_state.vnfr_uuid = ( + self.vnf_instantiation_state.vnfr_uuid ) = input.vnfr_uuid - await self.update_nf_instantiation_state() - await self.update_nf_state() + await self.update_states() vnf_task_queue = await workflow.execute_activity( activity=ACTIVITY_GET_TASK_QUEUE, - arg=GetTaskQueueInput(input.vnf_uuid), - activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnf_uuid}", + arg=GetTaskQueueInput(input.vnfr_uuid), + activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}", task_queue=LCM_TASK_QUEUE, schedule_to_close_timeout=default_schedule_to_close_timeout, retry_policy=retry_policy, @@ -83,53 +82,51 @@ class VnfInstantiateWorkflow: await workflow.execute_child_workflow( workflow=WORKFLOW_VNF_PREPARE, - arg=PrepareVnfInput(input.vnf_uuid), + arg=PrepareVnfInput(input.vnfr_uuid), task_queue=vnf_task_queue, - id=f"{WORKFLOW_VNF_PREPARE}-{input.vnf_uuid}", + id=f"{WORKFLOW_VNF_PREPARE}-{input.vnfr_uuid}", ) await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue) - self.nf_instantiation_state.state = VnfInstantiationState.INSTANTIATED - self.nf_state.state = VnfState.STARTED + self.vnf_instantiation_state.state = VnfInstantiationState.INSTANTIATED + self.vnf_state.state = VnfState.STARTED except Exception as e: workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}") - self.nf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED - self.nf_state.state = VnfState.STOPPED + self.vnf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED + self.vnf_state.state = VnfState.STOPPED raise e finally: - await self.update_nf_instantiation_state() - await self.update_nf_state() - await self.update_nf_notification_state() + await self.update_states() - async def update_nf_state(self): + async def update_vnf_state(self): await workflow.execute_activity( - activity=ACTIVITY_CHANGE_NF_STATE, - arg=self.nf_state, - activity_id=f"{ACTIVITY_CHANGE_NF_STATE}-{self.nf_state.vnfr_uuid}", + activity=ACTIVITY_CHANGE_VNF_STATE, + arg=self.vnf_state, + activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{self.vnf_state.vnfr_uuid}", task_queue=LCM_TASK_QUEUE, schedule_to_close_timeout=default_schedule_to_close_timeout, retry_policy=retry_policy, ) - async def update_nf_instantiation_state(self): + async def update_vnf_instantiation_state(self): await workflow.execute_activity( - activity=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE, - arg=self.nf_instantiation_state, - activity_id=f"{ACTIVITY_CHANGE_NF_INSTANTIATION_STATE}-{self.nf_instantiation_state.vnfr_uuid}", + activity=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE, + arg=self.vnf_instantiation_state, + activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{self.vnf_instantiation_state.vnfr_uuid}", task_queue=LCM_TASK_QUEUE, schedule_to_close_timeout=default_schedule_to_close_timeout, retry_policy=retry_policy, ) @staticmethod - async def update_nf_notification_state(): + async def send_notification_for_vnf(): await workflow.execute_activity( - activity=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE, + activity=ACTIVITY_SEND_NOTIFICATION_FOR_VNF, arg=None, - activity_id=f"{ACTIVITY_CHANGE_NF_NOTIFICATION_STATE}", + activity_id=f"{ACTIVITY_SEND_NOTIFICATION_FOR_VNF}", task_queue=LCM_TASK_QUEUE, schedule_to_close_timeout=default_schedule_to_close_timeout, retry_policy=retry_policy, @@ -138,7 +135,7 @@ class VnfInstantiateWorkflow: async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue): # TODO: see OSMENG-989 - vnfr = {"uuid", vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid}) + vnfr = {"uuid": vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid}) vnfd = {} # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}) for vdu in vnfd.get("vdu"): ( @@ -164,6 +161,11 @@ class VnfInstantiateWorkflow: ) return vdu_instantiate_input, vdu_instantiate_workflow_id + async def update_states(self): + await self.update_vnf_instantiation_state() + await self.update_vnf_state() + await self.send_notification_for_vnf() + @workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED) class PrepareVnfWorkflow: