VimUpdateWorkflow,
)
from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
-from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow
+from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow
from osm_lcm.temporal.vnf_activities import (
VnfDbActivity,
VnfOperations,
VimUpdateWorkflow,
VduInstantiateWorkflow,
VnfInstantiateWorkflow,
+ VnfPrepareWorkflow,
]
activities = [
ns_data_activity_instance.get_model_info,
- ns_data_activity_instance.prepare_vnf_records,
ns_data_activity_instance.update_ns_state,
ns_operation_instance.check_ns_instantiate_finished,
ns_operation_instance.deploy_ns,
vnf_data_activity_instance.change_vnf_state,
vnf_data_activity_instance.change_vnf_instantiation_state,
vnf_send_notifications_instance.send_notification_for_vnf,
+ vnf_data_activity_instance.set_vnf_model,
]
# Check if we are running under a debugger
ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
ACTIVITY_DEPLOY_NS,
ACTIVITY_GET_MODEL_INFO,
- ACTIVITY_PREPARE_VNF_RECORDS,
ACTIVITY_UPDATE_NS_STATE,
)
from osm_common.dataclasses.temporal_dataclasses import (
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- @activity.defn(name=ACTIVITY_PREPARE_VNF_RECORDS)
- async def prepare_vnf_records(
- self, ns_instantiate_input: NsInstantiateInput
- ) -> None:
- """Prepare VNFs to be deployed: Add namespace to the VNFr.
-
- Collaborators:
- DB Write: vnfrs
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
-
- """
- vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
- for vnfr in vnfrs:
- self._prepare_vnf_record(vnfr)
-
@activity.defn(name=ACTIVITY_GET_MODEL_INFO)
async def get_model_info(
self, ns_instantiate_input: NsInstantiateInput
model_name = self._get_namespace(ns_uuid, vim_uuid)
return ModelInfo(vim_uuid, model_name)
- def _get_namespace(self, ns_id: str, vim_id: str) -> str:
+ @staticmethod
+ def _get_namespace(ns_id: str, vim_id: str) -> str:
"""The NS namespace is the combination if the NS ID and the VIM ID."""
return ns_id[-12:] + "-" + vim_id[-12:]
- def _prepare_vnf_record(self, vnfr: dict) -> None:
- """Add namespace to the VNFr."""
- ns_id = vnfr["nsr-id-ref"]
- vim_id = vnfr["vim-account-id"]
- namespace = self._get_namespace(ns_id, vim_id)
- update_namespace = {"namespace": namespace}
- self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, update_namespace)
-
@activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
async def update_ns_state(self, data: UpdateNsStateInput) -> None:
"""
ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_GET_TASK_QUEUE,
VIM_TYPE_TASK_QUEUE_MAPPINGS,
+ ACTIVITY_SET_VNF_MODEL,
)
from osm_common.dataclasses.temporal_dataclasses import (
ChangeVnfInstantiationStateInput,
ChangeVnfStateInput,
GetTaskQueueInput,
GetTaskQueueOutput,
+ VnfInstantiateInput,
)
f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state}."
)
+ @activity.defn(name=ACTIVITY_SET_VNF_MODEL)
+ async def set_vnf_model(self, set_vnf_model_input: VnfInstantiateInput) -> None:
+ """Updates the model name of VNF in VNFR.
+
+ Collaborators:
+ DB Access Object
+
+ Raises (retryable):
+ DbException: If DB access or update fails, the collection or DB record ID does not exist.
+
+ Activity Lifecycle:
+ This activity should complete relatively quickly (less than a
+ second). However, it would be reasonable to wait up to 10
+ seconds.
+
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ It is not necessary to implement a back-off strategy for this
+ activity, the operation is idempotent.
+
+ """
+ update_namespace = {"namespace": set_vnf_model_input.model_name}
+ self.db.set_one(
+ "vnfrs", {"_id": set_vnf_model_input.vnfr_uuid}, update_namespace
+ )
+ self.logger.debug(
+ f"VNF {set_vnf_model_input.vnfr_uuid} model name is updated to {set_vnf_model_input.model_name}."
+ )
+
class VnfSendNotifications:
"""Perform Notification operations."""
ChangeVnfInstantiationStateInput,
ChangeVnfStateInput,
GetTaskQueueInput,
- PrepareVnfInput,
)
from osm_common.temporal_constants import (
ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_CHANGE_VNF_STATE,
ACTIVITY_GET_TASK_QUEUE,
+ ACTIVITY_SET_VNF_MODEL,
LCM_TASK_QUEUE,
WORKFLOW_VDU_INSTANTIATE,
WORKFLOW_VNF_INSTANTIATE,
await workflow.execute_child_workflow(
workflow=WORKFLOW_VNF_PREPARE,
- arg=PrepareVnfInput(input.vnfr_uuid),
+ arg=input,
task_queue=vnf_task_queue,
id=f"{WORKFLOW_VNF_PREPARE}-{input.vnfr_uuid}",
)
@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED)
-class PrepareVnfWorkflow:
+class VnfPrepareWorkflow:
"""Prepare a VNF.
Workflow Identifier:
ID when invoking this workflow.
"""
+ def __init__(self):
+ self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
+
@workflow.run
- async def run(self, input: PrepareVnfInput) -> None:
- # TODO: Set the model here OSM-991
- pass
+ async def run(self, wf_input: VnfInstantiateInput) -> None:
+ try:
+ await workflow.execute_activity(
+ activity=ACTIVITY_SET_VNF_MODEL,
+ arg=wf_input,
+ activity_id=f"{ACTIVITY_SET_VNF_MODEL}-{wf_input.vnfr_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+ except Exception as e:
+ self.logger.error(f"{WORKFLOW_VNF_PREPARE} failed with {str(e)}")
+ raise e
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asynctest
+from osm_common.dataclasses.temporal_dataclasses import VnfInstantiateInput
+from osm_common.dbbase import DbException
+from osm_lcm.temporal.vnf_activities import VnfDbActivity
+from temporalio.testing import ActivityEnvironment
+from unittest.mock import Mock
+
+
+vnfr_uuid = "d08d2da5-2120-476c-8538-deaeb4e88b3e"
+model_name = "a-model-name"
+vnf_instantiate_input = VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name)
+
+
+class TestVnfDbActivity(asynctest.TestCase):
+ def setUp(self):
+ self.db = Mock()
+ self.env = ActivityEnvironment()
+ self.vnf_db_activity = VnfDbActivity(self.db)
+
+ async def test_set_vnf_model(self):
+ await self.env.run(self.vnf_db_activity.set_vnf_model, vnf_instantiate_input)
+ self.db.set_one.assert_called_with(
+ "vnfrs", {"_id": vnfr_uuid}, {"namespace": model_name}
+ )
+
+ async def test_db_raises_exception(self):
+ self.db.set_one.side_effect = DbException("not found")
+ with self.assertRaises(DbException):
+ await self.env.run(
+ self.vnf_db_activity.set_vnf_model, vnf_instantiate_input
+ )
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asynctest
+import logging
+from osm_common.dataclasses.temporal_dataclasses import VnfInstantiateInput
+from osm_common.temporal_constants import (
+ LCM_TASK_QUEUE,
+ ACTIVITY_SET_VNF_MODEL,
+ WORKFLOW_VNF_PREPARE,
+)
+from osm_lcm.temporal.vnf_workflows import VnfPrepareWorkflow
+from temporalio import activity
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
+wf_input = VnfInstantiateInput(
+ vnfr_uuid="86b53d92-4f5a-402e-8ac2-585ec6b64698",
+ model_name="a-model-name",
+)
+
+
+@activity.defn(name=ACTIVITY_SET_VNF_MODEL)
+async def set_vnf_model_mocked(set_vnf_model_input: VnfInstantiateInput) -> None:
+ logging.debug(
+ f"VNF {set_vnf_model_input.vnfr_uuid} model name is updated to {set_vnf_model_input.model_name}."
+ )
+
+
+class TestVnfPrepareWorkflow(asynctest.TestCase):
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+
+ async def test_vnf_prepare_workflow(self):
+ async with self.env:
+ async with Worker(
+ self.client,
+ task_queue=LCM_TASK_QUEUE,
+ workflows=[VnfPrepareWorkflow],
+ activities=[set_vnf_model_mocked],
+ ):
+ await self.client.execute_workflow(
+ VnfPrepareWorkflow.run,
+ arg=wf_input,
+ id=f"{WORKFLOW_VNF_PREPARE}-{wf_input.vnfr_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ )