from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow
from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations
+from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
+from osm_lcm.temporal.ns_activities import NsOperations, NsDbActivity
from temporalio.client import Client
from temporalio.worker import Worker
f"{self.main_config.temporal.host}:{str(self.main_config.temporal.port)}"
)
client = await Client.connect(temporal_api)
- vim_data_activity_instance = VimDbActivity(self.db)
- paas_connector_instance = JujuPaasConnector(self.db)
+
+ ns_operation_instance = NsOperations(self.db)
+ ns_data_activity_instance = NsDbActivity(self.db)
nslcm_activity_instance = NsLcmActivity(self.db)
- vnf_operation_instance = VnfOperations(self.db)
+ paas_connector_instance = JujuPaasConnector(self.db)
+ vim_data_activity_instance = VimDbActivity(self.db)
vnf_data_activity_instance = VnfDbActivity(self.db)
+ vnf_operation_instance = VnfOperations(self.db)
workflows = [
+ NsInstantiateWorkflow,
NsNoOpWorkflow,
VimCreateWorkflow,
VimDeleteWorkflow,
VnfInstantiateWorkflow,
]
activities = [
- vim_data_activity_instance.update_vim_operation_state,
- vim_data_activity_instance.update_vim_state,
- vim_data_activity_instance.delete_vim_record,
+ ns_data_activity_instance.prepare_vnf_records,
+ ns_data_activity_instance.update_ns_state,
+ ns_operation_instance.check_ns_instantiate_finished,
+ ns_operation_instance.deploy_ns,
nslcm_activity_instance.update_ns_lcm_operation_state,
nslcm_activity_instance.no_op,
- paas_connector_instance.test_vim_connectivity,
paas_connector_instance.create_model_if_doesnt_exist,
paas_connector_instance.deploy_charm,
paas_connector_instance.check_charm_status,
+ paas_connector_instance.test_vim_connectivity,
+ vim_data_activity_instance.update_vim_operation_state,
+ vim_data_activity_instance.update_vim_state,
+ vim_data_activity_instance.delete_vim_record,
vnf_operation_instance.get_task_queue,
vnf_data_activity_instance.change_nf_state,
vnf_data_activity_instance.change_nf_instantiation_state,
async def create_model_if_doesnt_exist(
self, create_model_input: CreateModelInput
) -> None:
+ # TODO: OSM-991
"""Connects to Juju Controller. Create a new model if model_name does not exist
Collaborators:
back-off strategy for this activity, as it will naturally block
and wait on each connection attempt.
"""
+ # TODO: Implement OSM-986
pass
def __init__(self, db):
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- self.count = 0
@activity.defn(name=ACTIVITY_NSLCM_NO_OP)
async def no_op(self, input: NsLcmOperationInput) -> None:
self.op_state.op_state = LcmOperationState.COMPLETED
except ActivityError as e:
+ # TODO: Deteremine the best content for Activity Errors OSM-994
self.logger.exception(e)
self.op_state.op_state = LcmOperationState.FAILED
self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__))
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from time import time
+from temporalio import activity
+from osm_common.temporal_constants import (
+ ACTIVITY_UPDATE_NS_STATE,
+ ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
+ ACTIVITY_PREPARE_VNF_RECORDS,
+ ACTIVITY_DEPLOY_NS,
+)
+from osm_common.dataclasses.temporal_dataclasses import (
+ NsInstantiateInput,
+ UpdateNsStateInput,
+ VduInstantiateInput,
+)
+from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
+
+
+class NsOperations:
+ def __init__(self, db):
+ self.db = db
+ self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+
+ # TODO: Change to a workflow OSM-990
+ @activity.defn(name=ACTIVITY_DEPLOY_NS)
+ async def deploy_ns(self, ns_instantiate_input: NsInstantiateInput) -> None:
+ vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
+ for vnfr in vnfrs:
+ await self.deploy_vnf(vnfr)
+
+ async def deploy_vnf(self, vnfr: dict):
+ vim_id = vnfr.get("vim-account-id")
+ model_name = vnfr.get("namespace")
+ vnfd_id = vnfr["vnfd-id"]
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ sw_image_descs = vnfd.get("sw-image-desc")
+ for vdu in vnfd.get("vdu"):
+ await self.deploy_vdu(vdu, sw_image_descs, vim_id, model_name)
+
+ async def deploy_vdu(
+ self, vdu: dict, sw_image_descs: str, vim_id: str, model_name: str
+ ) -> None:
+ vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
+ vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
+ workflow_id = (
+ vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
+ )
+ self.logger.info(f"TODO: Start VDU Workflow {workflow_id}")
+
+ @activity.defn(name=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED)
+ async def check_ns_instantiate_finished(
+ self, ns_instantiate: NsInstantiateInput
+ ) -> None:
+ # TODO: Implement OSM-993
+ pass
+
+
+class NsDbActivity:
+
+ """Perform Database operations for NS accounts.
+
+ Args:
+ db (object): Data Access Object
+ """
+
+ def __init__(self, db):
+ self.db = db
+ self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+
+ @activity.defn(name=ACTIVITY_PREPARE_VNF_RECORDS)
+ async def prepare_vnf_records(
+ self, ns_instantiate_input: NsInstantiateInput
+ ) -> None:
+ """Prepare VNFs to be deployed: Add namespace to the VNFr.
+
+ Collaborators:
+ DB Write: vnfrs
+
+ Raises (Retryable):
+ DbException If the target DB record does not exist or DB is not reachable.
+
+ Activity Lifecycle:
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ As this is a direct DB update, it is not recommended to have
+ any specific retry policy
+
+ """
+ vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
+ for vnfr in vnfrs:
+ self._prepare_vnf_record(vnfr)
+
+ def _get_namespace(self, ns_id: str, vim_id: str) -> str:
+ """The NS namespace is the combination if the NS ID and the VIM ID."""
+ return ns_id[-12:] + "-" + vim_id[-12:]
+
+ def _prepare_vnf_record(self, vnfr: dict) -> None:
+ """Add namespace to the VNFr."""
+ ns_id = vnfr["nsr-id-ref"]
+ vim_id = vnfr["vim-account-id"]
+ namespace = self._get_namespace(ns_id, vim_id)
+ update_namespace = {"namespace": namespace}
+ self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, update_namespace)
+
+ @activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
+ async def update_ns_state(self, data: UpdateNsStateInput) -> None:
+ """
+ Changes the state of the NS itself.
+
+ Collaborators:
+ DB Write: nsrs
+
+ Raises (Retryable):
+ DbException If the target DB record does not exist or DB is not reachable.
+
+ Activity Lifecycle:
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ As this is a direct DB update, it is not recommended to have
+ any specific retry policy
+ """
+ update_ns_state = {
+ "nsState": data.state.name,
+ # "errorDescription" : data.message,
+ "_admin.nsState": data.state.name,
+ "_admin.detailed-status": data.message,
+ "_admin.modified": time(),
+ }
+ self.db.set_one("nsrs", {"_id": data.ns_uuid}, update_ns_state)
+ self.logger.debug(f"Updated NS {data.ns_uuid} to {data.state.name}")
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from temporalio import workflow
+from temporalio.exceptions import ActivityError
+
+from osm_common.dataclasses.temporal_dataclasses import (
+ NsInstantiateInput,
+ NsLcmOperationInput,
+ NsState,
+ UpdateNsStateInput,
+)
+from osm_common.temporal_constants import (
+ WORKFLOW_NS_INSTANTIATE,
+ ACTIVITY_UPDATE_NS_STATE,
+ ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
+ ACTIVITY_DEPLOY_NS,
+ LCM_TASK_QUEUE,
+)
+
+from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow
+
+
+@workflow.defn(name=WORKFLOW_NS_INSTANTIATE, sandboxed=LcmOperationWorkflow._SANDBOXED)
+class NsInstantiateWorkflow(LcmOperationWorkflow):
+ """Instantiate a NS"""
+
+ @workflow.run
+ async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
+ await super().wrap_nslcmop(input=input)
+
+ async def workflow(self, input: NsLcmOperationInput) -> None:
+ # Temporary until we align on what is really needed where. For now, this
+ # reduced set looks to be all that is needed.
+ input = NsInstantiateInput(
+ ns_uuid=input.nslcmop["nsInstanceId"], op_id=input.nslcmop["_id"]
+ )
+
+ ns_state = UpdateNsStateInput(input.ns_uuid, NsState.INSTANTIATED, "Done")
+ try:
+ # TODO: Create the model here OSM-991
+
+ # TODO: Change this to a workflow OSM-990
+ await workflow.execute_activity(
+ activity=ACTIVITY_DEPLOY_NS,
+ arg=input,
+ activity_id=f"{ACTIVITY_DEPLOY_NS}-{input.ns_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.no_retry_policy,
+ )
+
+ await workflow.execute_activity(
+ activity=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
+ arg=input,
+ activity_id=f"{ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED}-{input.ns_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.retry_policy,
+ )
+
+ except ActivityError as e:
+ ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, e.cause.message)
+ self.logger.error(
+ f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e.cause.message)}"
+ )
+ raise e
+
+ except Exception as e:
+ ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, str(e))
+ self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e)}")
+ raise e
+
+ finally:
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_NS_STATE,
+ arg=ns_state,
+ activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{input.ns_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.retry_policy,
+ )
--- /dev/null
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import TestCase
+from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
+from osm_common.dataclasses.temporal_dataclasses import CharmInfo
+import yaml
+
+nsr_id = "ea958ba5-4e58-4405-bf42-6e3be15d4c3a"
+vim_id = "70b47595-fafa-4f63-904b-fc3ada60eebb"
+expected_default_ns_model = "6e3be15d4c3a-fc3ada60eebb"
+
+vdu_nominal = """
+---
+vdu:
+ - id: test-vdu-id
+ name: test-vdu-name
+ int-cpd:
+ - id: internal
+ int-virtual-link-desc: network1
+ - id: mgmt
+ virtual-compute-desc: compute-id
+ virtual-storage-desc:
+ - storage-id
+ sw-image-desc: image-test2
+ configurable-properties:
+ - key: "track"
+ value: "latest"
+ - key: "channel"
+ value: "edge"
+"""
+
+vdu_no_channel = """
+---
+vdu:
+ - id: test-vdu-id
+ name: test-vdu-name
+ int-cpd:
+ - id: internal
+ int-virtual-link-desc: network1
+ - id: mgmt
+ virtual-compute-desc: compute-id
+ virtual-storage-desc:
+ - storage-id
+ sw-image-desc: image-test2
+ configurable-properties:
+ - key: "track"
+ value: "latest"
+ - key: "key"
+ value: "edge"
+"""
+
+vdu_invalid_image = """
+---
+vdu:
+ - id: test-vdu-id
+ name: test-vdu-name
+ int-cpd:
+ - id: internal
+ int-virtual-link-desc: network1
+ - id: mgmt
+ virtual-compute-desc: compute-id
+ virtual-storage-desc:
+ - storage-id
+ sw-image-desc: invalid_image
+ configurable-properties:
+ - key: "track"
+ value: "latest"
+ - key: "key"
+ value: "edge"
+"""
+
+vdu_no_sw_image_desc = """
+---
+vdu:
+ - id: test-vdu-id
+ name: test-vdu-name
+ int-cpd:
+ - id: internal
+ int-virtual-link-desc: network1
+ - id: mgmt
+ virtual-compute-desc: compute-id
+ virtual-storage-desc:
+ - storage-id
+ sw-image-desc: invalid_image
+ configurable-properties:
+ - key: "track"
+ value: "latest"
+ - key: "key"
+ value: "edge"
+"""
+
+sw_image_desc = """
+---
+sw-image-desc:
+ - id: image-test1
+ name: charm-name1
+ image: ch:mysql
+ version: "1.0"
+ - id: image-test2
+ name: charm-name2
+ image: ch:my-charm
+ version: "1.0"
+"""
+
+
+class TestCharmInfoUtils(TestCase):
+ def setUp(self):
+ self.charm_info_utils = CharmInfoUtils()
+
+ def get_loaded_descriptor(self, descriptor):
+ return yaml.load(descriptor, Loader=yaml.Loader)
+
+ def test_get_charm_info_nominal_case(self):
+ vdu_descriptor = self.get_loaded_descriptor(vdu_nominal).get("vdu")
+ sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc")
+ result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs)
+ expected = CharmInfo("test-vdu-id", "edge", "ch:my-charm")
+ self.assertEqual(result, expected)
+
+ def test_get_charm_info_no_channel(self):
+ vdu_descriptor = self.get_loaded_descriptor(vdu_no_channel).get("vdu")
+ sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc")
+ result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs)
+ expected = CharmInfo("test-vdu-id", None, "ch:my-charm")
+ self.assertEqual(result, expected)
+
+ def test_get_charm_info_invalid_image(self):
+ vdu_descriptor = self.get_loaded_descriptor(vdu_invalid_image).get("vdu")
+ sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc")
+ result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs)
+ expected = CharmInfo("test-vdu-id", None, None)
+ self.assertEqual(result, expected)
+
+ def test_get_charm_info_no_sw_image_desc(self):
+ vdu_descriptor = self.get_loaded_descriptor(vdu_no_sw_image_desc).get("vdu")
+ sw_image_descs = self.get_loaded_descriptor(sw_image_desc).get("sw-image-desc")
+ result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs)
+ expected = CharmInfo("test-vdu-id", None, None)
+ self.assertEqual(result, expected)
+
+ def test_get_charm_info_empty_sw_image_descs(self):
+ vdu_descriptor = self.get_loaded_descriptor(vdu_nominal).get("vdu")
+ sw_image_descs = []
+ result = self.charm_info_utils.get_charm_info(vdu_descriptor[0], sw_image_descs)
+ expected = CharmInfo("test-vdu-id", "edge", None)
+ self.assertEqual(result, expected)
ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
ACTIVITY_GET_TASK_QUEUE,
- vim_type_task_queue_mappings,
+ VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
from osm_common.dataclasses.temporal_dataclasses import (
ChangeNFInstantiationStateInput,
"""
vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]})
- task_queue = vim_type_task_queue_mappings[vim_record["vim-type"]]
+ task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim-type"]]
self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
return GetTaskQueueOutput(task_queue)
@workflow.run
async def run(self, input: PrepareVnfInput) -> None:
+ # TODO: Set the model here OSM-991
pass