)
from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow
-from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations
+from osm_lcm.temporal.vnf_activities import (
+ VnfDbActivity,
+ VnfOperations,
+ VnfSendNotifications,
+)
from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
from osm_lcm.temporal.ns_activities import NsOperations, NsDbActivity
from temporalio.client import Client
vim_data_activity_instance = VimDbActivity(self.db)
vnf_data_activity_instance = VnfDbActivity(self.db)
vnf_operation_instance = VnfOperations(self.db)
+ vnf_send_notifications_instance = VnfSendNotifications()
workflows = [
NsInstantiateWorkflow,
vim_data_activity_instance.update_vim_state,
vim_data_activity_instance.delete_vim_record,
vnf_operation_instance.get_task_queue,
- vnf_data_activity_instance.change_nf_state,
- vnf_data_activity_instance.change_nf_instantiation_state,
- vnf_data_activity_instance.change_nf_notification_state,
+ vnf_data_activity_instance.change_vnf_state,
+ vnf_data_activity_instance.change_vnf_instantiation_state,
+ vnf_send_notifications_instance.send_notification_for_vnf,
]
# Check if we are running under a debugger
import logging
from temporalio import activity
from osm_common.temporal_constants import (
- ACTIVITY_CHANGE_NF_STATE,
- ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
- ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+ ACTIVITY_CHANGE_VNF_STATE,
+ ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
+ ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_GET_TASK_QUEUE,
VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
from osm_common.dataclasses.temporal_dataclasses import (
- ChangeNFInstantiationStateInput,
- ChangeNFStateInput,
+ ChangeVnfInstantiationStateInput,
+ ChangeVnfStateInput,
GetTaskQueueInput,
GetTaskQueueOutput,
)
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- @activity.defn(name=ACTIVITY_CHANGE_NF_STATE)
- async def change_nf_state(self, nf_state_input: ChangeNFStateInput) -> None:
+ @activity.defn(name=ACTIVITY_CHANGE_VNF_STATE)
+ async def change_vnf_state(self, vnf_state_input: ChangeVnfStateInput) -> None:
"""Updates the VNF State in VNFR.
Collaborators:
activity, the operation is idempotent.
"""
- update_nf_state = {"vnfState": nf_state_input.nf_state}
- self.db.set_one("vnfrs", {"_id": nf_state_input.vnfr_uuid}, update_nf_state)
+ update_vnf_state = {"vnfState": vnf_state_input.state}
+ self.db.set_one("vnfrs", {"_id": vnf_state_input.vnfr_uuid}, update_vnf_state)
self.logger.debug(
- f"VNF {nf_state_input.vnfr_uuid} state is updated to {nf_state_input.nf_state}."
+ f"VNF {vnf_state_input.vnfr_uuid} state is updated to {vnf_state_input.state}."
)
- @activity.defn(name=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE)
- async def change_nf_instantiation_state(
- self, nf_instantiation_state_input: ChangeNFInstantiationStateInput
+ @activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE)
+ async def change_vnf_instantiation_state(
+ self, vnf_instantiation_state_input: ChangeVnfInstantiationStateInput
) -> None:
"""Updates the VNF Instantiation State in VNFR.
activity, the operation is idempotent.
"""
- update_nf_instantiation_state = {
- "vnfState": nf_instantiation_state_input.nf_instantiation_state
+ update_vnf_instantiation_state = {
+ "vnfState": vnf_instantiation_state_input.state
}
self.db.set_one(
"vnfrs",
- {"_id": nf_instantiation_state_input.vnfr_uuid},
- update_nf_instantiation_state,
+ {"_id": vnf_instantiation_state_input.vnfr_uuid},
+ update_vnf_instantiation_state,
)
self.logger.debug(
- f"VNF {nf_instantiation_state_input.vnfr_uuid} state is updated to {nf_instantiation_state_input.nf_instantiation_state}."
+ f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state}."
)
- @activity.defn(name=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE)
- async def change_nf_notification_state(self) -> None:
+
+class VnfSendNotifications:
+ """Perform Notification operations."""
+
+ def __init__(self):
+ self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+
+ @activity.defn(name=ACTIVITY_SEND_NOTIFICATION_FOR_VNF)
+ async def send_notification_for_vnf(self) -> None:
"""If VNF LCM operation state changes, send notification updates.
This activity does nothing.
"""
- pass
+ self.logger.debug("Send notification for VNF not implemented.")
VnfState,
VnfInstantiateInput,
VduInstantiateInput,
- ChangeNFInstantiationStateInput,
- ChangeNFStateInput,
+ ChangeVnfInstantiationStateInput,
+ ChangeVnfStateInput,
GetTaskQueueInput,
PrepareVnfInput,
)
from osm_common.temporal_constants import (
- ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
- ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
- ACTIVITY_CHANGE_NF_STATE,
+ ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
+ ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
+ ACTIVITY_CHANGE_VNF_STATE,
ACTIVITY_GET_TASK_QUEUE,
LCM_TASK_QUEUE,
WORKFLOW_VDU_INSTANTIATE,
def __init__(self):
self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
- self.nf_instantiation_state = ChangeNFInstantiationStateInput(
- None, VnfInstantiationState.NOT_INSTANTIATED
+ self.vnf_instantiation_state = ChangeVnfInstantiationStateInput(
+ "", VnfInstantiationState.NOT_INSTANTIATED
)
- self.nf_state = ChangeNFStateInput(None, VnfState.STOPPED)
+ self.vnf_state = ChangeVnfStateInput("", VnfState.STOPPED)
@workflow.run
async def run(self, input: VnfInstantiateInput) -> None:
try:
- self.nf_state.vnfr_uuid = (
- self.nf_instantiation_state.vnfr_uuid
+ self.vnf_state.vnfr_uuid = (
+ self.vnf_instantiation_state.vnfr_uuid
) = input.vnfr_uuid
- await self.update_nf_instantiation_state()
- await self.update_nf_state()
+ await self.update_states()
vnf_task_queue = await workflow.execute_activity(
activity=ACTIVITY_GET_TASK_QUEUE,
- arg=GetTaskQueueInput(input.vnf_uuid),
- activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnf_uuid}",
+ arg=GetTaskQueueInput(input.vnfr_uuid),
+ activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
await workflow.execute_child_workflow(
workflow=WORKFLOW_VNF_PREPARE,
- arg=PrepareVnfInput(input.vnf_uuid),
+ arg=PrepareVnfInput(input.vnfr_uuid),
task_queue=vnf_task_queue,
- id=f"{WORKFLOW_VNF_PREPARE}-{input.vnf_uuid}",
+ id=f"{WORKFLOW_VNF_PREPARE}-{input.vnfr_uuid}",
)
await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue)
- self.nf_instantiation_state.state = VnfInstantiationState.INSTANTIATED
- self.nf_state.state = VnfState.STARTED
+ self.vnf_instantiation_state.state = VnfInstantiationState.INSTANTIATED
+ self.vnf_state.state = VnfState.STARTED
except Exception as e:
workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}")
- self.nf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED
- self.nf_state.state = VnfState.STOPPED
+ self.vnf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED
+ self.vnf_state.state = VnfState.STOPPED
raise e
finally:
- await self.update_nf_instantiation_state()
- await self.update_nf_state()
- await self.update_nf_notification_state()
+ await self.update_states()
- async def update_nf_state(self):
+ async def update_vnf_state(self):
await workflow.execute_activity(
- activity=ACTIVITY_CHANGE_NF_STATE,
- arg=self.nf_state,
- activity_id=f"{ACTIVITY_CHANGE_NF_STATE}-{self.nf_state.vnfr_uuid}",
+ activity=ACTIVITY_CHANGE_VNF_STATE,
+ arg=self.vnf_state,
+ activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{self.vnf_state.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
- async def update_nf_instantiation_state(self):
+ async def update_vnf_instantiation_state(self):
await workflow.execute_activity(
- activity=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
- arg=self.nf_instantiation_state,
- activity_id=f"{ACTIVITY_CHANGE_NF_INSTANTIATION_STATE}-{self.nf_instantiation_state.vnfr_uuid}",
+ activity=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
+ arg=self.vnf_instantiation_state,
+ activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{self.vnf_instantiation_state.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
@staticmethod
- async def update_nf_notification_state():
+ async def send_notification_for_vnf():
await workflow.execute_activity(
- activity=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+ activity=ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
arg=None,
- activity_id=f"{ACTIVITY_CHANGE_NF_NOTIFICATION_STATE}",
+ activity_id=f"{ACTIVITY_SEND_NOTIFICATION_FOR_VNF}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue):
# TODO: see OSMENG-989
- vnfr = {"uuid", vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid})
+ vnfr = {"uuid": vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid})
vnfd = {} # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
for vdu in vnfd.get("vdu"):
(
)
return vdu_instantiate_input, vdu_instantiate_workflow_id
+ async def update_states(self):
+ await self.update_vnf_instantiation_state()
+ await self.update_vnf_state()
+ await self.send_notification_for_vnf()
+
@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED)
class PrepareVnfWorkflow: