Code Coverage

Cobertura Coverage Report > osm_nbi.tests >

test_nbi_temporal.py

Trend

Classes100%
 
Lines100%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
test_nbi_temporal.py
100%
1/1
100%
30/30
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
test_nbi_temporal.py
100%
30/30
N/A

Source

osm_nbi/tests/test_nbi_temporal.py
1 ########################################################################
2 # Copyright ETSI Contributors and Others.
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17
18 1 import asynctest
19 1 from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
20 1 from osm_common.temporal.workflows.vim import (
21     VimCreateWorkflow,
22 )
23 1 from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
24 1 from osm_nbi.temporal.nbi_temporal import NbiTemporal
25
26
27 1 @asynctest.mock.patch("osm_common.wftemporal.WFTemporal.start_workflow")
28 1 class TestNbiTemporal(asynctest.TestCase):
29 1     nslcmop = {
30         "id": "a388d577-c3cc-4c7c-9405-8db66c524e07",
31         "_id": "a388d577-c3cc-4c7c-9405-8db66c524e07",
32         "operationState": "PROCESSING",
33         "nsInstanceId": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
34         "lcmOperationType": "instantiate",
35         "startTime": 1683639339.1609678,
36         "isAutomaticInvocation": False,
37         "operationParams": {
38             "nsdId": "17dfb64e-34cd-4385-80c5-bc01d25eacf2",
39             "nsName": "squid",
40             "nsDescription": "default description",
41             "vimAccountId": "29ca8632-06a6-4bf2-a222-893e20a18b55",
42             "nsr_id": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
43             "lcmOperationType": "instantiate",
44             "nsInstanceId": "3d43a41f-794c-4b9f-83e2-e22be11208b1",
45         },
46         "isCancelPending": False,
47     }
48 1     vim_id = "f9c16d22-ea2c-45d3-b4c7-8aae8c0f7ada"
49 1     op_id = "5"
50 1     vim_operation_content = {
51         "name": "juju_paas6",
52         "vim_type": "paas",
53         "description": None,
54         "vim_url": "172.21.249.67:17070",
55         "vim_user": "admin",
56         "vim_password": "mqK0X8DPs6cmTwpW7hrsjQ==",
57         "vim_tenant_name": "null",
58         "config": {
59             "paas_provider": "juju",
60             "cloud": "microk8s",
61             "cloud_credentials": "microk8s",
62             "authorized_keys": "id_rsa.pub",
63             "ca_cert_content": "-----BEGIN CERTIFICATE-----\nMIIETQRzQ=\n-----END CERTIFICATE-----\n",
64         },
65         "_admin": {
66             "created": 1683639881.2804828,
67             "modified": 1683639881.2804828,
68             "current_operation": None,
69         },
70         "_id": vim_id,
71         "schema_version": "1.11",
72         "op_id": "{}:{}".format(vim_id, op_id),
73     }
74
75 1     def setUp(self) -> None:
76 1         self.temporal = NbiTemporal()
77
78 1     def test_start_vim_workflow(self, mock_start_workflow):
79 1         action = "created"
80 1         self.temporal.start_vim_workflow(action, self.vim_operation_content)
81 1         mock_start_workflow.assert_called_once_with(
82             task_queue=LCM_TASK_QUEUE,
83             workflow_name=VimCreateWorkflow.__name__,
84             workflow_data=VimCreateWorkflow.Input(self.vim_id, self.op_id),
85             id=self.vim_id,
86         )
87
88 1     def test_start_vim_workflow_invalid_action(self, mock_start_workflow):
89 1         action = "some_action"
90 1         with self.assertRaises(KeyError):
91 1             self.temporal.start_vim_workflow(action, self.vim_operation_content)
92 1         mock_start_workflow.assert_not_called()
93
94 1     def test_start_ns_workflow(self, mock_start_workflow):
95 1         self.temporal.start_ns_workflow(self.nslcmop)
96 1         mock_start_workflow.assert_called_once_with(
97             task_queue=LCM_TASK_QUEUE,
98             workflow_name=NsInstantiateWorkflow.__name__,
99             workflow_data=NsInstantiateWorkflow.Input(nslcmop=self.nslcmop),
100             id=self.nslcmop["nsInstanceId"],
101         )
102
103 1     def test_start_ns_workflow_invalid_operation(self, mock_start_workflow):
104 1         self.nslcmop["lcmOperationType"] = "some_operation"
105 1         with self.assertRaises(KeyError):
106 1             self.temporal.start_ns_workflow(self.nslcmop)
107 1         mock_start_workflow.assert_not_called()