309fc1f474953f60621c031a8cf024dca23fa6b1
1 # Copyright 2021 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from unittest
import TestCase
17 from unittest
.mock
import Mock
, patch
, MagicMock
20 from osm_common
import msgbase
21 from osm_common
.dbbase
import DbException
22 from osm_lcm
.vim_sdn
import K8sClusterLcm
, VcaLcm
25 class AsyncMock(MagicMock
):
26 async def __call__(self
, *args
, **kwargs
):
27 return super(AsyncMock
, self
).__call
__(*args
, **kwargs
)
30 class TestVcaLcm(TestCase
):
31 @patch("osm_lcm.lcm_utils.Database")
32 @patch("osm_lcm.lcm_utils.Filesystem")
33 def setUp(self
, mock_filesystem
, mock_database
):
34 self
.loop
= asyncio
.get_event_loop()
35 self
.msg
= Mock(msgbase
.MsgBase())
36 self
.lcm_tasks
= Mock()
37 self
.config
= {"database": {"driver": "mongo"}}
38 self
.vca_lcm
= VcaLcm(self
.msg
, self
.lcm_tasks
, self
.config
, self
.loop
)
39 self
.vca_lcm
.db
= Mock()
40 self
.vca_lcm
.fs
= Mock()
42 def test_vca_lcm_create(self
):
43 vca_content
= {"op_id": "order-id", "_id": "id"}
48 "schema_version": "1.11",
51 self
.lcm_tasks
.lock_HA
.return_value
= True
52 self
.vca_lcm
.db
.get_one
.return_value
= db_vca
53 self
.vca_lcm
.n2vc
.validate_vca
= AsyncMock()
54 self
.vca_lcm
.update_db_2
= Mock()
56 self
.loop
.run_until_complete(self
.vca_lcm
.create(vca_content
, order_id
))
58 self
.lcm_tasks
.lock_HA
.assert_called_with("vca", "create", "order-id")
59 self
.vca_lcm
.db
.encrypt_decrypt_fields
.assert_called_with(
63 schema_version
="1.11",
66 self
.vca_lcm
.update_db_2
.assert_called_with(
70 "_admin.operationalState": "ENABLED",
71 "_admin.detailed-status": "Connectivity: ok",
74 self
.lcm_tasks
.unlock_HA
.assert_called_with(
78 operationState
="COMPLETED",
79 detailed_status
="VCA validated",
81 self
.lcm_tasks
.remove
.assert_called_with("vca", "id", "order-id")
83 def test_vca_lcm_create_exception(self
):
84 vca_content
= {"op_id": "order-id", "_id": "id"}
89 "schema_version": "1.11",
92 self
.lcm_tasks
.lock_HA
.return_value
= True
93 self
.vca_lcm
.db
.get_one
.return_value
= db_vca
94 self
.vca_lcm
.n2vc
.validate_vca
= AsyncMock()
95 self
.vca_lcm
.n2vc
.validate_vca
.side_effect
= Exception("failed")
96 self
.vca_lcm
.update_db_2
= Mock()
97 self
.vca_lcm
.update_db_2
.side_effect
= DbException("failed")
98 self
.loop
.run_until_complete(self
.vca_lcm
.create(vca_content
, order_id
))
100 self
.lcm_tasks
.lock_HA
.assert_called_with("vca", "create", "order-id")
101 self
.vca_lcm
.db
.encrypt_decrypt_fields
.assert_called_with(
104 ["secret", "cacert"],
105 schema_version
="1.11",
108 self
.vca_lcm
.update_db_2
.assert_called_with(
112 "_admin.operationalState": "ERROR",
113 "_admin.detailed-status": "Failed with exception: failed",
116 self
.lcm_tasks
.unlock_HA
.assert_not_called()
117 self
.lcm_tasks
.remove
.assert_called_with("vca", "id", "order-id")
119 def test_vca_lcm_delete(self
):
120 vca_content
= {"op_id": "order-id", "_id": "id"}
121 order_id
= "order-id"
122 self
.lcm_tasks
.lock_HA
.return_value
= True
123 self
.vca_lcm
.update_db_2
= Mock()
125 self
.loop
.run_until_complete(self
.vca_lcm
.delete(vca_content
, order_id
))
127 self
.lcm_tasks
.lock_HA
.assert_called_with("vca", "delete", "order-id")
128 self
.vca_lcm
.db
.del_one
.assert_called_with("vca", {"_id": "id"})
129 self
.vca_lcm
.update_db_2
.assert_called_with("vca", "id", None)
130 self
.lcm_tasks
.unlock_HA
.assert_called_with(
134 operationState
="COMPLETED",
135 detailed_status
="deleted",
137 self
.lcm_tasks
.remove
.assert_called_with("vca", "id", "order-id")
139 def test_vca_lcm_delete_exception(self
):
140 vca_content
= {"op_id": "order-id", "_id": "id"}
141 order_id
= "order-id"
142 self
.lcm_tasks
.lock_HA
.return_value
= True
143 self
.vca_lcm
.update_db_2
= Mock()
144 self
.vca_lcm
.db
.del_one
.side_effect
= Exception("failed deleting")
145 self
.vca_lcm
.update_db_2
.side_effect
= DbException("failed")
147 self
.loop
.run_until_complete(self
.vca_lcm
.delete(vca_content
, order_id
))
149 self
.lcm_tasks
.lock_HA
.assert_called_with("vca", "delete", "order-id")
150 self
.vca_lcm
.db
.del_one
.assert_called_with("vca", {"_id": "id"})
151 self
.vca_lcm
.update_db_2
.assert_called_with(
155 "_admin.operationalState": "ERROR",
156 "_admin.detailed-status": "Failed with exception: failed deleting",
159 self
.lcm_tasks
.unlock_HA
.not_called()
160 self
.lcm_tasks
.remove
.assert_called_with("vca", "id", "order-id")
163 class TestK8SClusterLcm(TestCase
):
164 @patch("osm_lcm.vim_sdn.K8sHelmConnector")
165 @patch("osm_lcm.vim_sdn.K8sHelm3Connector")
166 @patch("osm_lcm.vim_sdn.K8sJujuConnector")
167 @patch("osm_lcm.lcm_utils.Database")
168 @patch("osm_lcm.lcm_utils.Filesystem")
177 self
.loop
= asyncio
.get_event_loop()
178 self
.msg
= Mock(msgbase
.MsgBase())
179 self
.lcm_tasks
= Mock()
180 self
.config
= {"database": {"driver": "mongo"}}
183 "helmpath": "/usr/local/bin/helm",
184 "helm3path": "/usr/local/bin/helm3",
185 "kubectlpath": "/usr/bin/kubectl",
188 self
.k8scluster_lcm
= K8sClusterLcm(
189 self
.msg
, self
.lcm_tasks
, self
.vca_config
, self
.loop
191 self
.k8scluster_lcm
.db
= Mock()
192 self
.k8scluster_lcm
.fs
= Mock()
194 def test_k8scluster_edit(self
):
195 k8scluster_content
= {"op_id": "op-id", "_id": "id"}
196 order_id
= "order-id"
197 self
.lcm_tasks
.lock_HA
.return_value
= True
198 self
.loop
.run_until_complete(
199 self
.k8scluster_lcm
.edit(k8scluster_content
, order_id
)
201 self
.lcm_tasks
.unlock_HA
.assert_called_with(
205 operationState
="COMPLETED",
206 detailed_status
="Not implemented",
208 self
.lcm_tasks
.remove
.assert_called_with("k8scluster", "id", order_id
)
210 def test_k8scluster_edit_lock_false(self
):
211 k8scluster_content
= {"op_id": "op-id", "_id": "id"}
212 order_id
= "order-id"
213 self
.lcm_tasks
.lock_HA
.return_value
= False
214 self
.loop
.run_until_complete(
215 self
.k8scluster_lcm
.edit(k8scluster_content
, order_id
)
217 self
.lcm_tasks
.unlock_HA
.assert_not_called()
218 self
.lcm_tasks
.remove
.assert_not_called()
220 def test_k8scluster_edit_no_opid(self
):
221 k8scluster_content
= {"_id": "id"}
222 order_id
= "order-id"
223 self
.lcm_tasks
.lock_HA
.return_value
= True
224 self
.loop
.run_until_complete(
225 self
.k8scluster_lcm
.edit(k8scluster_content
, order_id
)
227 self
.lcm_tasks
.unlock_HA
.assert_called_with(
231 operationState
="COMPLETED",
232 detailed_status
="Not implemented",
234 self
.lcm_tasks
.remove
.assert_called_with("k8scluster", "id", order_id
)
236 def test_k8scluster_edit_no_orderid(self
):
237 k8scluster_content
= {"op_id": "op-id", "_id": "id"}
239 self
.lcm_tasks
.lock_HA
.return_value
= True
240 self
.loop
.run_until_complete(
241 self
.k8scluster_lcm
.edit(k8scluster_content
, order_id
)
243 self
.lcm_tasks
.unlock_HA
.assert_called_with(
247 operationState
="COMPLETED",
248 detailed_status
="Not implemented",
250 self
.lcm_tasks
.remove
.assert_called_with("k8scluster", "id", order_id
)