1 ########################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 from osm_common
.temporal_task_queues
.task_queues_mappings
import LCM_TASK_QUEUE
20 from osm_common
.temporal
.workflows
.vim
import (
23 from osm_common
.temporal
.workflows
.ns
import NsInstantiateWorkflow
24 from osm_nbi
.temporal
.nbi_temporal
import NbiTemporal
27 @asynctest.mock
.patch("osm_common.wftemporal.WFTemporal.start_workflow")
28 class TestNbiTemporal(asynctest
.TestCase
):
30 "id": "a388d577-c3cc-4c7c-9405-8db66c524e07",
31 "_id": "a388d577-c3cc-4c7c-9405-8db66c524e07",
32 "operationState": "PROCESSING",
33 "nsInstanceId": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
34 "lcmOperationType": "instantiate",
35 "startTime": 1683639339.1609678,
36 "isAutomaticInvocation": False,
38 "nsdId": "17dfb64e-34cd-4385-80c5-bc01d25eacf2",
40 "nsDescription": "default description",
41 "vimAccountId": "29ca8632-06a6-4bf2-a222-893e20a18b55",
42 "nsr_id": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
43 "lcmOperationType": "instantiate",
44 "nsInstanceId": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
46 "isCancelPending": False,
48 vim_id
= "f9c16d22-ea2c-45d3-b4c7-8aae8c0f7ada"
50 vim_operation_content
= {
54 "vim_url": "172.21.249.67:17070",
56 "vim_password": "mqK0X8DPs6cmTwpW7hrsjQ==",
57 "vim_tenant_name": "null",
59 "paas_provider": "juju",
61 "cloud_credentials": "microk8s",
62 "authorized_keys": "id_rsa.pub",
63 "ca_cert_content": "-----BEGIN CERTIFICATE-----\nMIIETQRzQ=\n-----END CERTIFICATE-----\n",
66 "created": 1683639881.2804828,
67 "modified": 1683639881.2804828,
68 "current_operation": None,
71 "schema_version": "1.11",
72 "op_id": "{}:{}".format(vim_id
, op_id
),
75 def setUp(self
) -> None:
76 self
.temporal
= NbiTemporal()
78 def test_start_vim_workflow(self
, mock_start_workflow
):
80 self
.temporal
.start_vim_workflow(action
, self
.vim_operation_content
)
81 mock_start_workflow
.assert_called_once_with(
82 task_queue
=LCM_TASK_QUEUE
,
83 workflow_name
=VimCreateWorkflow
.__name
__,
84 workflow_data
=VimCreateWorkflow
.Input(self
.vim_id
, self
.op_id
),
88 def test_start_vim_workflow_invalid_action(self
, mock_start_workflow
):
89 action
= "some_action"
90 with self
.assertRaises(KeyError):
91 self
.temporal
.start_vim_workflow(action
, self
.vim_operation_content
)
92 mock_start_workflow
.assert_not_called()
94 def test_start_ns_workflow(self
, mock_start_workflow
):
95 self
.temporal
.start_ns_workflow(self
.nslcmop
)
96 mock_start_workflow
.assert_called_once_with(
97 task_queue
=LCM_TASK_QUEUE
,
98 workflow_name
=NsInstantiateWorkflow
.__name
__,
99 workflow_data
=NsInstantiateWorkflow
.Input(nslcmop
=self
.nslcmop
),
100 id=self
.nslcmop
["nsInstanceId"],
103 def test_start_ns_workflow_invalid_operation(self
, mock_start_workflow
):
104 self
.nslcmop
["lcmOperationType"] = "some_operation"
105 with self
.assertRaises(KeyError):
106 self
.temporal
.start_ns_workflow(self
.nslcmop
)
107 mock_start_workflow
.assert_not_called()