VimUpdateWorkflow,
)
from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
+from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow
+from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations
from temporalio.client import Client
from temporalio.worker import Worker
vim_data_activity_instance = VimDbActivity(self.db)
paas_connector_instance = JujuPaasConnector(self.db)
nslcm_activity_instance = NsLcmActivity(self.db)
+ vnf_operation_instance = VnfOperations(self.db)
+ vnf_data_activity_instance = VnfDbActivity(self.db)
workflows = [
NsNoOpWorkflow,
VimDeleteWorkflow,
VimUpdateWorkflow,
VduInstantiateWorkflow,
+ VnfInstantiateWorkflow,
]
activities = [
vim_data_activity_instance.update_vim_operation_state,
paas_connector_instance.create_model_if_doesnt_exist,
paas_connector_instance.deploy_charm,
paas_connector_instance.check_charm_status,
+ vnf_operation_instance.get_task_queue,
+ vnf_data_activity_instance.change_nf_state,
+ vnf_data_activity_instance.change_nf_instantiation_state,
+ vnf_data_activity_instance.change_nf_notification_state,
]
# Check if we are running under a debugger
TestVimConnectivityInput,
CreateModelInput,
VduInstantiateInput,
+ CharmInfo,
)
and wait on each connection attempt.
"""
pass
+
+
+class CharmInfoUtils:
+ @staticmethod
+ def get_charm_info(vdu: dict, sw_image_descs: list) -> CharmInfo:
+ """Extract the charm info of a VDU.
+ Args:
+ vdu (dict): contains the charm information.
+ sw_image_descs (list): list of images in the VNF.
+
+ Returns:
+ CharmInfo (object)
+ """
+ app_name = vdu.get("id")
+ channel = CharmInfoUtils._get_property_value(
+ "channel", vdu.get("configurable-properties")
+ )
+ entity_url = CharmInfoUtils._get_entity_url(
+ vdu.get("sw-image-desc"), sw_image_descs
+ )
+ return CharmInfo(app_name, channel, entity_url)
+
+ @staticmethod
+ def _get_property_value(wanted_property: str, configurable_properties: list) -> str:
+ """Extract the value corresponding to a given key-value pairs in
+ vdu.configurable-properties
+
+ Args:
+ wanted_property (str): property used as key.
+ configurable_properties (list): list of configurable-properties in VDU.
+
+ Returns:
+ value of the wanted property (str)
+ """
+ filtered_property = next(
+ filter(
+ lambda property: property.get("key") == wanted_property,
+ configurable_properties,
+ ),
+ None,
+ )
+ return filtered_property.get("value") if filtered_property else None
+
+ @staticmethod
+ def _get_entity_url(sw_image_desc_id: str, sw_image_descs: list) -> str:
+ """Extract the image field for a given image_id
+ Args:
+ sw_image_desc_id (str): ID of the image used by a VDU.
+ sw_image_descs (list): information of available images in the VNF.
+
+ Returns:
+ image of the sw_image_desc_id (str)
+ """
+ filtered_image = next(
+ filter(lambda image: image.get("id") == sw_image_desc_id, sw_image_descs),
+ None,
+ )
+ return filtered_image.get("image") if filtered_image else None
from temporalio.common import RetryPolicy
from osm_common.dataclasses.temporal_dataclasses import VduInstantiateInput
+
from osm_common.temporal_constants import (
- WORKFLOW_VDU_INSTANTIATE,
ACTIVITY_DEPLOY_CHARM,
ACTIVITY_CHECK_CHARM_STATUS,
LCM_TASK_QUEUE,
+ WORKFLOW_VDU_INSTANTIATE,
)
_SANDBOXED = False
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from temporalio import activity
+from osm_common.temporal_constants import (
+ ACTIVITY_CHANGE_NF_STATE,
+ ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
+ ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+ ACTIVITY_GET_TASK_QUEUE,
+ vim_type_task_queue_mappings,
+)
+from osm_common.dataclasses.temporal_dataclasses import (
+ ChangeNFInstantiationStateInput,
+ ChangeNFStateInput,
+ GetTaskQueueInput,
+ GetTaskQueueOutput,
+)
+
+
+class VnfOperations:
+ def __init__(self, db):
+ self.db = db
+ self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+
+ @activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
+ async def get_task_queue(
+ self, get_task_queue_input: GetTaskQueueInput
+ ) -> GetTaskQueueOutput:
+ """Finds the appropriate task queue according to VIM type of VNF.
+
+ Collaborators:
+ DB Access Object
+
+ Raises (retryable):
+ DbException: If DB read operations fail
+
+ Activity Lifecycle:
+ This activity should complete relatively quickly (less than a
+ second). However, it would be reasonable to wait up to 10
+ seconds.
+
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ It is not necessary to implement a back-off strategy for this
+ activity, the operation is idempotent.
+
+ """
+ vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
+ vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]})
+ task_queue = vim_type_task_queue_mappings[vim_record["vim-type"]]
+ self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
+ return GetTaskQueueOutput(task_queue)
+
+
+class VnfDbActivity:
+
+ """Perform Database operations for NS accounts.
+
+ Args:
+ db (object): Data Access Object
+ """
+
+ def __init__(self, db):
+ self.db = db
+ self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+
+ @activity.defn(name=ACTIVITY_CHANGE_NF_STATE)
+ async def change_nf_state(self, nf_state_input: ChangeNFStateInput) -> None:
+ """Updates the VNF State in VNFR.
+
+ Collaborators:
+ DB Access Object
+
+ Raises (retryable):
+ DbException: If DB access/update fails, the collection or DB record ID does not exist.
+
+ Activity Lifecycle:
+ This activity should complete relatively quickly (less than a
+ second). However, it would be reasonable to wait up to 10
+ seconds.
+
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ It is not necessary to implement a back-off strategy for this
+ activity, the operation is idempotent.
+
+ """
+ update_nf_state = {"vnfState": nf_state_input.nf_state}
+ self.db.set_one("vnfrs", {"_id": nf_state_input.vnfr_uuid}, update_nf_state)
+ self.logger.debug(
+ f"VNF {nf_state_input.vnfr_uuid} state is updated to {nf_state_input.nf_state}."
+ )
+
+ @activity.defn(name=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE)
+ async def change_nf_instantiation_state(
+ self, nf_instantiation_state_input: ChangeNFInstantiationStateInput
+ ) -> None:
+ """Updates the VNF Instantiation State in VNFR.
+
+ Collaborators:
+ DB Access Object
+
+ Raises (retryable):
+ DbException: If DB access or update fails
+
+ Activity Lifecycle:
+ This activity should complete relatively quickly (less than a
+ second). However, it would be reasonable to wait up to 10
+ seconds.
+
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ It is not necessary to implement a back-off strategy for this
+ activity, the operation is idempotent.
+
+ """
+ update_nf_instantiation_state = {
+ "vnfState": nf_instantiation_state_input.nf_instantiation_state
+ }
+ self.db.set_one(
+ "vnfrs",
+ {"_id": nf_instantiation_state_input.vnfr_uuid},
+ update_nf_instantiation_state,
+ )
+ self.logger.debug(
+ f"VNF {nf_instantiation_state_input.vnfr_uuid} state is updated to {nf_instantiation_state_input.nf_instantiation_state}."
+ )
+
+ @activity.defn(name=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE)
+ async def change_nf_notification_state(self) -> None:
+ """If VNF LCM operation state changes, send notification updates.
+
+ This activity does nothing.
+
+ """
+ pass
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta
+import logging
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+
+from osm_common.dataclasses.temporal_dataclasses import (
+ VnfInstantiationState,
+ VnfState,
+ VnfInstantiateInput,
+ VduInstantiateInput,
+ ChangeNFInstantiationStateInput,
+ ChangeNFStateInput,
+ GetTaskQueueInput,
+ PrepareVnfInput,
+)
+
+from osm_common.temporal_constants import (
+ ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
+ ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+ ACTIVITY_CHANGE_NF_STATE,
+ ACTIVITY_GET_TASK_QUEUE,
+ LCM_TASK_QUEUE,
+ WORKFLOW_VDU_INSTANTIATE,
+ WORKFLOW_VNF_INSTANTIATE,
+ WORKFLOW_VNF_PREPARE,
+)
+from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
+
+_SANDBOXED = False
+retry_policy = RetryPolicy(maximum_attempts=3)
+default_schedule_to_close_timeout = timedelta(minutes=10)
+
+
+@workflow.defn(name=WORKFLOW_VNF_INSTANTIATE, sandboxed=_SANDBOXED)
+class VnfInstantiateWorkflow:
+ """Instantiate a VNF.
+
+ Workflow Identifier:
+ It is recommended that the ID for the VNF is referred as a workflow
+ ID when invoking this workflow.
+ """
+
+ def __init__(self):
+ self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
+ self.nf_instantiation_state = ChangeNFInstantiationStateInput(
+ None, VnfInstantiationState.NOT_INSTANTIATED
+ )
+ self.nf_state = ChangeNFStateInput(None, VnfState.STOPPED)
+
+ @workflow.run
+ async def run(self, input: VnfInstantiateInput) -> None:
+ try:
+ self.nf_state.vnfr_uuid = (
+ self.nf_instantiation_state.vnfr_uuid
+ ) = input.vnfr_uuid
+
+ await self.update_nf_instantiation_state()
+ await self.update_nf_state()
+ vnf_task_queue = await workflow.execute_activity(
+ activity=ACTIVITY_GET_TASK_QUEUE,
+ arg=GetTaskQueueInput(input.vnf_uuid),
+ activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnf_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ await workflow.execute_child_workflow(
+ workflow=WORKFLOW_VNF_PREPARE,
+ arg=PrepareVnfInput(input.vnf_uuid),
+ task_queue=vnf_task_queue,
+ id=f"{WORKFLOW_VNF_PREPARE}-{input.vnf_uuid}",
+ )
+
+ await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue)
+
+ self.nf_instantiation_state.state = VnfInstantiationState.INSTANTIATED
+ self.nf_state.state = VnfState.STARTED
+
+ except Exception as e:
+ workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}")
+ self.nf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED
+ self.nf_state.state = VnfState.STOPPED
+ raise e
+
+ finally:
+ await self.update_nf_instantiation_state()
+ await self.update_nf_state()
+ await self.update_nf_notification_state()
+
+ async def update_nf_state(self):
+ await workflow.execute_activity(
+ activity=ACTIVITY_CHANGE_NF_STATE,
+ arg=self.nf_state,
+ activity_id=f"{ACTIVITY_CHANGE_NF_STATE}-{self.nf_state.vnfr_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ async def update_nf_instantiation_state(self):
+ await workflow.execute_activity(
+ activity=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
+ arg=self.nf_instantiation_state,
+ activity_id=f"{ACTIVITY_CHANGE_NF_INSTANTIATION_STATE}-{self.nf_instantiation_state.vnfr_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ @staticmethod
+ async def update_nf_notification_state():
+ await workflow.execute_activity(
+ activity=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
+ arg=None,
+ activity_id=f"{ACTIVITY_CHANGE_NF_NOTIFICATION_STATE}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue):
+ # TODO: see OSMENG-989
+
+ vnfr = {"uuid", vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid})
+ vnfd = {} # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
+ for vdu in vnfd.get("vdu"):
+ (
+ vdu_instantiate_input,
+ vdu_instantiate_workflow_id,
+ ) = VnfInstantiateWorkflow.get_vdu_instantiate_input(vnfr, vnfd, vdu)
+ await workflow.execute_child_workflow(
+ workflow=WORKFLOW_VDU_INSTANTIATE,
+ arg=vdu_instantiate_input,
+ task_queue=vnf_task_queue,
+ id=vdu_instantiate_workflow_id,
+ )
+
+ @staticmethod
+ def get_vdu_instantiate_input(vnfr: dict, vnfd: dict, vdu: dict):
+ model_name = vnfr.get("namespace")
+ vim_id = vnfr.get("vim-account-id")
+ sw_image_descs = vnfd.get("sw-image-desc")
+ vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
+ vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
+ vdu_instantiate_workflow_id = (
+ vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
+ )
+ return vdu_instantiate_input, vdu_instantiate_workflow_id
+
+
+@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED)
+class PrepareVnfWorkflow:
+ """Prepare a VNF.
+
+ Workflow Identifier:
+ It is recommended that the ID for the VNF is referred as a workflow
+ ID when invoking this workflow.
+ """
+
+ @workflow.run
+ async def run(self, input: PrepareVnfInput) -> None:
+ pass