blob: 592c5232dc9f80aed0b726f9bf9c66c59a5092b7 [file] [log] [blame]
Dario Faccin39301762023-04-17 16:56:37 +02001#######################################################################################
2# Copyright ETSI Contributors and Others.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17
18import asynctest
Gulsum Atici36365402023-04-07 00:07:07 +030019from asyncio.exceptions import CancelledError
20from copy import deepcopy
21from osm_common.dataclasses.temporal_dataclasses import (
Gulsum Aticia1898b42023-04-25 15:48:10 +030022 GetVimCloudInput,
Gulsum Atici36365402023-04-07 00:07:07 +030023 GetVnfDetailsInput,
24 VnfInstantiateInput,
25)
Dario Faccin39301762023-04-17 16:56:37 +020026from osm_common.dbbase import DbException
Gulsum Atici36365402023-04-07 00:07:07 +030027from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations
Dario Faccin39301762023-04-17 16:56:37 +020028from temporalio.testing import ActivityEnvironment
29from unittest.mock import Mock
30
Dario Faccin39301762023-04-17 16:56:37 +020031vnfr_uuid = "d08d2da5-2120-476c-8538-deaeb4e88b3e"
32model_name = "a-model-name"
33vnf_instantiate_input = VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name)
Gulsum Aticia1898b42023-04-25 15:48:10 +030034cloud = "microk8s"
35sample_vnfr = {
36 "_id": "9f472177-95c0-4335-b357-5cdc17a79965",
37 "id": "9f472177-95c0-4335-b357-5cdc17a79965",
38 "nsr-id-ref": "dcf4c922-5a73-41bf-a6ca-870c22d6336c",
39 "vnfd-ref": "jar_vnfd_scalable",
40 "vnfd-id": "f1b38eac-190c-485d-9a74-c6e169c929d8",
41 "vim-account-id": "9b0bedc3-ea8e-42fd-acc9-dd03f4dee73c",
42}
43sample_vnfd = {
44 "_id": "97784f19-d254-4252-946c-cf92d85443ca",
45 "id": "sol006-vnf",
46 "provider": "Canonical",
47 "product-name": "test-vnf",
48 "software-version": "1.0",
49}
50sample_vim_record = {
51 "_id": "a64f7c6c-bc27-4ec8-b664-5500a3324eca",
52 "name": "juju",
53 "vim_type": "paas",
54 "vim_url": "192.168.1.100:17070",
55 "vim_user": "admin",
56 "vim_password": "c16gylWEepEREN6vWw==",
57 "config": {
58 "paas_provider": "juju",
59 "cloud": cloud,
60 "cloud_credentials": "microk8s",
61 "authorized_keys": "$HOME/.local/share/juju/ssh/juju_id_rsa.pub",
62 "ca_cert_content": "-----BEGIN-----",
63 },
64}
Dario Faccin39301762023-04-17 16:56:37 +020065
66
67class TestVnfDbActivity(asynctest.TestCase):
68 def setUp(self):
69 self.db = Mock()
70 self.env = ActivityEnvironment()
71 self.vnf_db_activity = VnfDbActivity(self.db)
72
73 async def test_set_vnf_model(self):
74 await self.env.run(self.vnf_db_activity.set_vnf_model, vnf_instantiate_input)
75 self.db.set_one.assert_called_with(
76 "vnfrs", {"_id": vnfr_uuid}, {"namespace": model_name}
77 )
78
79 async def test_db_raises_exception(self):
80 self.db.set_one.side_effect = DbException("not found")
81 with self.assertRaises(DbException):
82 await self.env.run(
83 self.vnf_db_activity.set_vnf_model, vnf_instantiate_input
84 )
Gulsum Atici36365402023-04-07 00:07:07 +030085
86
Gulsum Atici36365402023-04-07 00:07:07 +030087class TestVnfDetails(asynctest.TestCase):
88 async def setUp(self):
89 self.db = Mock()
90 self.vnf_operations_instance = VnfOperations(self.db)
91 self.env = ActivityEnvironment()
92
93 async def test_activity_succeeded(self):
94 self.db.get_one.side_effect = [sample_vnfr, sample_vnfd]
95 activity_result = await self.env.run(
96 self.vnf_operations_instance.get_vnf_details,
97 GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
98 )
99 self.assertEqual(activity_result.vnfd, sample_vnfd)
100 self.assertEqual(activity_result.vnfr, sample_vnfr)
101
102 async def test_activity_failed_db_exception(self):
103 self.db.get_one.side_effect = DbException("Can not connect to Database.")
104 with self.assertRaises(DbException) as err:
105 activity_result = await self.env.run(
106 self.vnf_operations_instance.get_vnf_details,
107 GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
108 )
109 self.assertEqual(activity_result, None)
110 self.assertEqual(
111 str(err.exception), "database exception Can not connect to Database."
112 )
113
114 async def test_activity_failed_key_error(self):
115 vnfr = deepcopy(sample_vnfr)
116 vnfr.pop("vnfd-id")
117 self.db.get_one.side_effect = [vnfr, sample_vnfd]
118 with self.assertRaises(KeyError) as err:
119 activity_result = await self.env.run(
120 self.vnf_operations_instance.get_vnf_details,
121 GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
122 )
123 self.assertEqual(activity_result, None)
124 self.assertEqual(str(err.exception.args[0]), "vnfd-id")
125
126 async def test_activity_cancelled(self):
127 self.env._cancelled = True
128 self.db.get_one.side_effect = [sample_vnfr, sample_vnfd]
129 with self.assertRaises(CancelledError):
130 activity_result = await self.env.run(
131 self.vnf_operations_instance.get_vnf_details,
132 GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
133 )
134 self.assertEqual(activity_result, None)
135
136
Gulsum Aticia1898b42023-04-25 15:48:10 +0300137class TestGetVimCloud(asynctest.TestCase):
138 async def setUp(self):
139 self.db = Mock()
140 self.vnf_operations_instance = VnfOperations(self.db)
141 self.env = ActivityEnvironment()
142
143 async def test_activity_succeeded(self):
144 self.db.get_one.side_effect = [sample_vnfr, sample_vim_record]
145 activity_result = await self.env.run(
146 self.vnf_operations_instance.get_vim_cloud,
147 GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]),
148 )
149 self.assertEqual(activity_result.cloud, cloud)
150
151 async def test_activity_vim_record_without_cloud(self):
152 vim_record = deepcopy(sample_vim_record)
153 vim_record["config"].pop("cloud")
154 self.db.get_one.side_effect = [sample_vnfr, vim_record]
155 activity_result = await self.env.run(
156 self.vnf_operations_instance.get_vim_cloud,
157 GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]),
158 )
159 self.assertEqual(activity_result.cloud, "")
160
161 async def test_activity_failed_db_exception(self):
162 self.db.get_one.side_effect = DbException("Can not connect to Database.")
163 with self.assertRaises(DbException) as err:
164 activity_result = await self.env.run(
165 self.vnf_operations_instance.get_vim_cloud,
166 GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]),
167 )
168 self.assertEqual(activity_result, None)
169 self.assertEqual(
170 str(err.exception), "database exception Can not connect to Database."
171 )
172
173 async def test_activity_failed_key_error(self):
174 vim_record = deepcopy(sample_vim_record)
175 vim_record.pop("config")
176 self.db.get_one.side_effect = [sample_vnfr, vim_record]
177 with self.assertRaises(KeyError) as err:
178 activity_result = await self.env.run(
179 self.vnf_operations_instance.get_vim_cloud,
180 GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]),
181 )
182 self.assertEqual(activity_result, None)
183 self.assertEqual(str(err.exception.args[0]), "config")
184
185 async def test_activity_cancelled(self):
186 self.env._cancelled = True
187 self.db.get_one.side_effect = [sample_vnfr, sample_vim_record]
188 with self.assertRaises(CancelledError):
189 activity_result = await self.env.run(
190 self.vnf_operations_instance.get_vim_cloud,
191 GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]),
192 )
193 self.assertEqual(activity_result, None)
194
195
Gulsum Atici36365402023-04-07 00:07:07 +0300196if __name__ == "__main__":
197 asynctest.main()