1 ########################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 from osm_common
.dataclasses
.temporal_dataclasses
import (
23 from osm_common
.temporal_constants
import (
26 WORKFLOW_NS_INSTANTIATE
,
28 from osm_nbi
.temporal
.nbi_temporal
import NbiTemporal
31 @asynctest.mock
.patch("osm_common.wftemporal.WFTemporal.start_workflow")
32 class TestNbiTemporal(asynctest
.TestCase
):
34 "id": "a388d577-c3cc-4c7c-9405-8db66c524e07",
35 "_id": "a388d577-c3cc-4c7c-9405-8db66c524e07",
36 "operationState": "PROCESSING",
37 "nsInstanceId": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
38 "lcmOperationType": "instantiate",
39 "startTime": 1683639339.1609678,
40 "isAutomaticInvocation": False,
42 "nsdId": "17dfb64e-34cd-4385-80c5-bc01d25eacf2",
44 "nsDescription": "default description",
45 "vimAccountId": "29ca8632-06a6-4bf2-a222-893e20a18b55",
46 "nsr_id": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
47 "lcmOperationType": "instantiate",
48 "nsInstanceId": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
50 "isCancelPending": False,
52 vim_id
= "f9c16d22-ea2c-45d3-b4c7-8aae8c0f7ada"
54 vim_operation_content
= {
58 "vim_url": "172.21.249.67:17070",
60 "vim_password": "mqK0X8DPs6cmTwpW7hrsjQ==",
61 "vim_tenant_name": "null",
63 "paas_provider": "juju",
65 "cloud_credentials": "microk8s",
66 "authorized_keys": "id_rsa.pub",
67 "ca_cert_content": "-----BEGIN CERTIFICATE-----\nMIIETQRzQ=\n-----END CERTIFICATE-----\n",
70 "created": 1683639881.2804828,
71 "modified": 1683639881.2804828,
72 "current_operation": None,
75 "schema_version": "1.11",
76 "op_id": "{}:{}".format(vim_id
, op_id
),
79 def setUp(self
) -> None:
80 self
.temporal
= NbiTemporal()
82 def test_start_vim_workflow(self
, mock_start_workflow
):
84 self
.temporal
.start_vim_workflow(action
, self
.vim_operation_content
)
85 mock_start_workflow
.assert_called_once_with(
86 task_queue
=LCM_TASK_QUEUE
,
87 workflow_name
=WORKFLOW_VIM_CREATE
,
88 workflow_data
=VimOperationInput(self
.vim_id
, self
.op_id
),
92 def test_start_vim_workflow_invalid_action(self
, mock_start_workflow
):
93 action
= "some_action"
94 with self
.assertRaises(KeyError):
95 self
.temporal
.start_vim_workflow(action
, self
.vim_operation_content
)
96 mock_start_workflow
.assert_not_called()
98 def test_start_ns_workflow(self
, mock_start_workflow
):
99 self
.temporal
.start_ns_workflow(self
.nslcmop
)
100 mock_start_workflow
.assert_called_once_with(
101 task_queue
=LCM_TASK_QUEUE
,
102 workflow_name
=WORKFLOW_NS_INSTANTIATE
,
103 workflow_data
=NsLcmOperationInput(nslcmop
=self
.nslcmop
),
104 id=self
.nslcmop
["nsInstanceId"],
107 def test_start_ns_workflow_invalid_operation(self
, mock_start_workflow
):
108 self
.nslcmop
["lcmOperationType"] = "some_operation"
109 with self
.assertRaises(KeyError):
110 self
.temporal
.start_ns_workflow(self
.nslcmop
)
111 mock_start_workflow
.assert_not_called()