86e2136dd0ebda582851090dd9933a5b138e8155
[osm/LCM.git] / osm_lcm / tests / test_ns.py
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
12 # under the License.
13 #
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
16 ##
17
18
19 import asynctest # pip3 install asynctest --user
20 import asyncio
21 import yaml
22 import copy
23 from os import getenv
24 from osm_lcm import ns
25 from osm_common.msgkafka import MsgKafka
26 from osm_lcm.lcm_utils import TaskRegistry
27 from osm_lcm.ng_ro import NgRoClient
28 from osm_lcm.data_utils.database.database import Database
29 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
30 from osm_lcm.data_utils.vnfd import find_software_version
31 from osm_lcm.lcm_utils import check_juju_bundle_existence, get_charm_artifact_path
32 from osm_lcm.lcm_utils import LcmException
33 from uuid import uuid4
34 from unittest.mock import Mock, patch
35
36 from osm_lcm.tests import test_db_descriptors as descriptors
37
38 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
39
40 """ Perform unittests using asynctest of osm_lcm.ns module
41 It allows, if some testing ENV are supplied, testing without mocking some external libraries for debugging:
42 OSMLCMTEST_NS_PUBKEY: public ssh-key returned by N2VC to inject to VMs
43 OSMLCMTEST_NS_NAME: change name of NS
44 OSMLCMTEST_PACKAGES_PATH: path where the vnf-packages are stored (de-compressed), each one on a 'vnfd_id' folder
45 OSMLCMTEST_NS_IPADDRESS: IP address where emulated VMs are reached. Comma separate list
46 OSMLCMTEST_RO_VIMID: VIM id of RO target vim IP. Obtain it with openmano datcenter-list on RO container
47 OSMLCMTEST_VCA_NOMOCK: Do no mock the VCA, N2VC library, for debugging it
48 OSMLCMTEST_RO_NOMOCK: Do no mock the ROClient library, for debugging it
49 OSMLCMTEST_DB_NOMOCK: Do no mock the database library, for debugging it
50 OSMLCMTEST_FS_NOMOCK: Do no mock the File Storage library, for debugging it
51 OSMLCMTEST_LOGGING_NOMOCK: Do no mock the logging
52 OSMLCM_VCA_XXX: configuration of N2VC
53 OSMLCM_RO_XXX: configuration of RO
54 """
55
56 lcm_config = {
57 "global": {"loglevel": "DEBUG"},
58 "timeout": {},
59 "VCA": { # TODO replace with os.get_env to get other configurations
60 "host": getenv("OSMLCM_VCA_HOST", "vca"),
61 "port": getenv("OSMLCM_VCA_PORT", 17070),
62 "user": getenv("OSMLCM_VCA_USER", "admin"),
63 "secret": getenv("OSMLCM_VCA_SECRET", "vca"),
64 "public_key": getenv("OSMLCM_VCA_PUBKEY", None),
65 "ca_cert": getenv("OSMLCM_VCA_CACERT", None),
66 "apiproxy": getenv("OSMLCM_VCA_APIPROXY", "192.168.1.1"),
67 },
68 "ro_config": {
69 "uri": "http://{}:{}/openmano".format(
70 getenv("OSMLCM_RO_HOST", "ro"), getenv("OSMLCM_RO_PORT", "9090")
71 ),
72 "tenant": getenv("OSMLCM_RO_TENANT", "osm"),
73 "logger_name": "lcm.ROclient",
74 "loglevel": "DEBUG",
75 "ng": True,
76 },
77 }
78
79
80 class TestMyNS(asynctest.TestCase):
81 async def _n2vc_DeployCharms(
82 self,
83 model_name,
84 application_name,
85 vnfd,
86 charm_path,
87 params={},
88 machine_spec={},
89 callback=None,
90 *callback_args
91 ):
92 if callback:
93 for status, message in (
94 ("maintenance", "installing sofwware"),
95 ("active", "Ready!"),
96 ):
97 # call callback after some time
98 asyncio.sleep(5, loop=self.loop)
99 callback(model_name, application_name, status, message, *callback_args)
100
101 @staticmethod
102 def _n2vc_FormatApplicationName(*args):
103 num_calls = 0
104 while True:
105 yield "app_name-{}".format(num_calls)
106 num_calls += 1
107
108 def _n2vc_CreateExecutionEnvironment(
109 self, namespace, reuse_ee_id, db_dict, *args, **kwargs
110 ):
111 k_list = namespace.split(".")
112 ee_id = k_list[1] + "."
113 if len(k_list) >= 2:
114 for k in k_list[2:4]:
115 ee_id += k[:8]
116 else:
117 ee_id += "_NS_"
118 return ee_id, {}
119
120 def _ro_status(self, *args, **kwargs):
121 print("Args > {}".format(args))
122 print("kwargs > {}".format(kwargs))
123 if kwargs.get("delete"):
124 ro_ns_desc = yaml.load(
125 descriptors.ro_delete_action_text, Loader=yaml.Loader
126 )
127 while True:
128 yield ro_ns_desc
129
130 ro_ns_desc = yaml.load(descriptors.ro_ns_text, Loader=yaml.Loader)
131
132 # if ip address provided, replace descriptor
133 ip_addresses = getenv("OSMLCMTEST_NS_IPADDRESS", "")
134 if ip_addresses:
135 ip_addresses_list = ip_addresses.split(",")
136 for vnf in ro_ns_desc["vnfs"]:
137 if not ip_addresses_list:
138 break
139 vnf["ip_address"] = ip_addresses_list[0]
140 for vm in vnf["vms"]:
141 if not ip_addresses_list:
142 break
143 vm["ip_address"] = ip_addresses_list.pop(0)
144
145 while True:
146 yield ro_ns_desc
147 for net in ro_ns_desc["nets"]:
148 if net["status"] != "ACTIVE":
149 net["status"] = "ACTIVE"
150 break
151 else:
152 for vnf in ro_ns_desc["vnfs"]:
153 for vm in vnf["vms"]:
154 if vm["status"] != "ACTIVE":
155 vm["status"] = "ACTIVE"
156 break
157
158 def _ro_deploy(self, *args, **kwargs):
159 return {"action_id": args[1]["action_id"], "nsr_id": args[0], "status": "ok"}
160
161 def _return_uuid(self, *args, **kwargs):
162 return str(uuid4())
163
164 async def setUp(self):
165
166 # Mock DB
167 if not getenv("OSMLCMTEST_DB_NOMOCK"):
168 # Cleanup singleton Database instance
169 Database.instance = None
170
171 self.db = Database({"database": {"driver": "memory"}}).instance.db
172 self.db.create_list(
173 "vnfds", yaml.load(descriptors.db_vnfds_text, Loader=yaml.Loader)
174 )
175 self.db.create_list(
176 "vnfds_revisions",
177 yaml.load(descriptors.db_vnfds_revisions_text, Loader=yaml.Loader),
178 )
179 self.db.create_list(
180 "nsds", yaml.load(descriptors.db_nsds_text, Loader=yaml.Loader)
181 )
182 self.db.create_list(
183 "nsrs", yaml.load(descriptors.db_nsrs_text, Loader=yaml.Loader)
184 )
185 self.db.create_list(
186 "vim_accounts",
187 yaml.load(descriptors.db_vim_accounts_text, Loader=yaml.Loader),
188 )
189 self.db.create_list(
190 "k8sclusters",
191 yaml.load(descriptors.db_k8sclusters_text, Loader=yaml.Loader),
192 )
193 self.db.create_list(
194 "nslcmops", yaml.load(descriptors.db_nslcmops_text, Loader=yaml.Loader)
195 )
196 self.db.create_list(
197 "vnfrs", yaml.load(descriptors.db_vnfrs_text, Loader=yaml.Loader)
198 )
199 self.db_vim_accounts = yaml.load(
200 descriptors.db_vim_accounts_text, Loader=yaml.Loader
201 )
202
203 # Mock kafka
204 self.msg = asynctest.Mock(MsgKafka())
205
206 # Mock filesystem
207 if not getenv("OSMLCMTEST_FS_NOMOCK"):
208 self.fs = asynctest.Mock(
209 Filesystem({"storage": {"driver": "local", "path": "/"}}).instance.fs
210 )
211 self.fs.get_params.return_value = {
212 "path": getenv("OSMLCMTEST_PACKAGES_PATH", "./test/temp/packages")
213 }
214 self.fs.file_open = asynctest.mock_open()
215 # self.fs.file_open.return_value.__enter__.return_value = asynctest.MagicMock() # called on a python "with"
216 # self.fs.file_open.return_value.__enter__.return_value.read.return_value = "" # empty file
217
218 # Mock TaskRegistry
219 self.lcm_tasks = asynctest.Mock(TaskRegistry())
220 self.lcm_tasks.lock_HA.return_value = True
221 self.lcm_tasks.waitfor_related_HA.return_value = None
222 self.lcm_tasks.lookfor_related.return_value = ("", [])
223
224 # Mock VCA - K8s
225 if not getenv("OSMLCMTEST_VCA_K8s_NOMOCK"):
226 ns.K8sJujuConnector = asynctest.MagicMock(ns.K8sJujuConnector)
227 ns.K8sHelmConnector = asynctest.MagicMock(ns.K8sHelmConnector)
228 ns.K8sHelm3Connector = asynctest.MagicMock(ns.K8sHelm3Connector)
229
230 if not getenv("OSMLCMTEST_VCA_NOMOCK"):
231 ns.N2VCJujuConnector = asynctest.MagicMock(ns.N2VCJujuConnector)
232 ns.LCMHelmConn = asynctest.MagicMock(ns.LCMHelmConn)
233
234 # Create NsLCM class
235 self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config, self.loop)
236 self.my_ns.fs = self.fs
237 self.my_ns.db = self.db
238 self.my_ns._wait_dependent_n2vc = asynctest.CoroutineMock()
239
240 # Mock logging
241 if not getenv("OSMLCMTEST_LOGGING_NOMOCK"):
242 self.my_ns.logger = asynctest.Mock(self.my_ns.logger)
243
244 # Mock VCA - N2VC
245 if not getenv("OSMLCMTEST_VCA_NOMOCK"):
246 pub_key = getenv("OSMLCMTEST_NS_PUBKEY", "ssh-rsa test-pub-key t@osm.com")
247 # self.my_ns.n2vc = asynctest.Mock(N2VC())
248 self.my_ns.n2vc.GetPublicKey.return_value = getenv(
249 "OSMLCM_VCA_PUBKEY", "public_key"
250 )
251 # allow several versions of n2vc
252 self.my_ns.n2vc.FormatApplicationName = asynctest.Mock(
253 side_effect=self._n2vc_FormatApplicationName()
254 )
255 self.my_ns.n2vc.DeployCharms = asynctest.CoroutineMock(
256 side_effect=self._n2vc_DeployCharms
257 )
258 self.my_ns.n2vc.create_execution_environment = asynctest.CoroutineMock(
259 side_effect=self._n2vc_CreateExecutionEnvironment
260 )
261 self.my_ns.n2vc.install_configuration_sw = asynctest.CoroutineMock(
262 return_value=pub_key
263 )
264 self.my_ns.n2vc.get_ee_ssh_public__key = asynctest.CoroutineMock(
265 return_value=pub_key
266 )
267 self.my_ns.n2vc.exec_primitive = asynctest.CoroutineMock(
268 side_effect=self._return_uuid
269 )
270 self.my_ns.n2vc.exec_primitive = asynctest.CoroutineMock(
271 side_effect=self._return_uuid
272 )
273 self.my_ns.n2vc.GetPrimitiveStatus = asynctest.CoroutineMock(
274 return_value="completed"
275 )
276 self.my_ns.n2vc.GetPrimitiveOutput = asynctest.CoroutineMock(
277 return_value={"result": "ok", "pubkey": pub_key}
278 )
279 self.my_ns.n2vc.delete_execution_environment = asynctest.CoroutineMock(
280 return_value=None
281 )
282 self.my_ns.n2vc.get_public_key = asynctest.CoroutineMock(
283 return_value=getenv("OSMLCM_VCA_PUBKEY", "public_key")
284 )
285 self.my_ns.n2vc.delete_namespace = asynctest.CoroutineMock(
286 return_value=None
287 )
288
289 # Mock RO
290 if not getenv("OSMLCMTEST_RO_NOMOCK"):
291 self.my_ns.RO = asynctest.Mock(
292 NgRoClient(self.loop, **lcm_config["ro_config"])
293 )
294 # TODO first time should be empty list, following should return a dict
295 # self.my_ns.RO.get_list = asynctest.CoroutineMock(self.my_ns.RO.get_list, return_value=[])
296 self.my_ns.RO.deploy = asynctest.CoroutineMock(
297 self.my_ns.RO.deploy, side_effect=self._ro_deploy
298 )
299 # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status)
300 # self.my_ns.RO.create_action = asynctest.CoroutineMock(self.my_ns.RO.create_action,
301 # return_value={"vm-id": {"vim_result": 200,
302 # "description": "done"}})
303 self.my_ns.RO.delete = asynctest.CoroutineMock(self.my_ns.RO.delete)
304
305 # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
306 # async def test_instantiate(self):
307 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
308 # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
309 # # print("Test instantiate started")
310
311 # # delete deployed information of database
312 # if not getenv("OSMLCMTEST_DB_NOMOCK"):
313 # if self.db.get_list("nsrs")[0]["_admin"].get("deployed"):
314 # del self.db.get_list("nsrs")[0]["_admin"]["deployed"]
315 # for db_vnfr in self.db.get_list("vnfrs"):
316 # db_vnfr.pop("ip_address", None)
317 # for db_vdur in db_vnfr["vdur"]:
318 # db_vdur.pop("ip_address", None)
319 # db_vdur.pop("mac_address", None)
320 # if getenv("OSMLCMTEST_RO_VIMID"):
321 # self.db.get_list("vim_accounts")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
322 # if getenv("OSMLCMTEST_RO_VIMID"):
323 # self.db.get_list("nsrs")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
324
325 # await self.my_ns.instantiate(nsr_id, nslcmop_id)
326
327 # self.msg.aiowrite.assert_called_once_with("ns", "instantiated",
328 # {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
329 # "operationState": "COMPLETED"},
330 # loop=self.loop)
331 # self.lcm_tasks.lock_HA.assert_called_once_with('ns', 'nslcmops', nslcmop_id)
332 # if not getenv("OSMLCMTEST_LOGGING_NOMOCK"):
333 # self.assertTrue(self.my_ns.logger.debug.called, "Debug method not called")
334 # self.my_ns.logger.error.assert_not_called()
335 # self.my_ns.logger.exception().assert_not_called()
336
337 # if not getenv("OSMLCMTEST_DB_NOMOCK"):
338 # self.assertTrue(self.db.set_one.called, "db.set_one not called")
339 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
340 # db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
341 # self.assertEqual(db_nsr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
342 # for vnfr in db_vnfrs_list:
343 # self.assertEqual(vnfr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
344
345 # if not getenv("OSMLCMTEST_VCA_NOMOCK"):
346 # # check intial-primitives called
347 # self.assertTrue(self.my_ns.n2vc.exec_primitive.called,
348 # "Exec primitive not called for initial config primitive")
349 # for _call in self.my_ns.n2vc.exec_primitive.call_args_list:
350 # self.assertIn(_call[1]["primitive_name"], ("config", "touch"),
351 # "called exec primitive with a primitive different than config or touch")
352
353 # # TODO add more checks of called methods
354 # # TODO add a terminate
355
356 # async def test_instantiate_ee_list(self):
357 # # Using modern IM where configuration is in the new format of execution_environment_list
358 # ee_descriptor_id = "charm_simple"
359 # non_used_initial_primitive = {
360 # "name": "not_to_be_called",
361 # "seq": 3,
362 # "execution-environment-ref": "not_used_ee"
363 # }
364 # ee_list = [
365 # {
366 # "id": ee_descriptor_id,
367 # "juju": {"charm": "simple"},
368
369 # },
370 # ]
371
372 # self.db.set_one(
373 # "vnfds",
374 # q_filter={"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"},
375 # update_dict={"vnf-configuration.0.execution-environment-list": ee_list,
376 # "vnf-configuration.0.initial-config-primitive.0.execution-environment-ref": ee_descriptor_id,
377 # "vnf-configuration.0.initial-config-primitive.1.execution-environment-ref": ee_descriptor_id,
378 # "vnf-configuration.0.initial-config-primitive.2": non_used_initial_primitive,
379 # "vnf-configuration.0.config-primitive.0.execution-environment-ref": ee_descriptor_id,
380 # "vnf-configuration.0.config-primitive.0.execution-environment-primitive": "touch_charm",
381 # },
382 # unset={"vnf-configuration.juju": None})
383 # await self.test_instantiate()
384 # # this will check that the initial-congig-primitive 'not_to_be_called' is not called
385
386 # Test scale() and related methods
387 @asynctest.fail_on(active_handles=True) # all async tasks must be completed
388 async def test_scale(self):
389 # print("Test scale started")
390
391 # TODO: Add more higher-lever tests here, for example:
392 # scale-out/scale-in operations with success/error result
393
394 # Test scale() with missing 'scaleVnfData', should return operationState = 'FAILED'
395 nsr_id = descriptors.test_ids["TEST-A"]["ns"]
396 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
397 await self.my_ns.scale(nsr_id, nslcmop_id)
398 expected_value = "FAILED"
399 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
400 "operationState"
401 )
402 self.assertEqual(return_value, expected_value)
403 # print("scale_result: {}".format(self.db.get_one("nslcmops", {"_id": nslcmop_id}).get("detailed-status")))
404
405 # Test scale() for native kdu
406 # this also includes testing _scale_kdu()
407 nsr_id = descriptors.test_ids["TEST-NATIVE-KDU"]["ns"]
408 nslcmop_id = descriptors.test_ids["TEST-NATIVE-KDU"]["instantiate"]
409
410 self.my_ns.k8sclusterjuju.scale = asynctest.mock.CoroutineMock()
411 self.my_ns.k8sclusterjuju.exec_primitive = asynctest.mock.CoroutineMock()
412 self.my_ns.k8sclusterjuju.get_scale_count = asynctest.mock.CoroutineMock(
413 return_value=1
414 )
415 await self.my_ns.scale(nsr_id, nslcmop_id)
416 expected_value = "COMPLETED"
417 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
418 "operationState"
419 )
420 self.assertEqual(return_value, expected_value)
421 self.my_ns.k8sclusterjuju.scale.assert_called_once()
422
423 # Test scale() for native kdu with 2 resource
424 nsr_id = descriptors.test_ids["TEST-NATIVE-KDU-2"]["ns"]
425 nslcmop_id = descriptors.test_ids["TEST-NATIVE-KDU-2"]["instantiate"]
426
427 self.my_ns.k8sclusterjuju.get_scale_count.return_value = 2
428 await self.my_ns.scale(nsr_id, nslcmop_id)
429 expected_value = "COMPLETED"
430 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
431 "operationState"
432 )
433 self.assertEqual(return_value, expected_value)
434 self.my_ns.k8sclusterjuju.scale.assert_called()
435
436 async def test_vca_status_refresh(self):
437 nsr_id = descriptors.test_ids["TEST-A"]["ns"]
438 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
439 await self.my_ns.vca_status_refresh(nsr_id, nslcmop_id)
440 expected_value = dict()
441 return_value = dict()
442 vnf_descriptors = self.db.get_list("vnfds")
443 for i, _ in enumerate(vnf_descriptors):
444 for j, value in enumerate(vnf_descriptors[i]["df"]):
445 if "lcm-operations-configuration" in vnf_descriptors[i]["df"][j]:
446 if (
447 "day1-2"
448 in value["lcm-operations-configuration"][
449 "operate-vnf-op-config"
450 ]
451 ):
452 for k, v in enumerate(
453 value["lcm-operations-configuration"][
454 "operate-vnf-op-config"
455 ]["day1-2"]
456 ):
457 if (
458 v.get("execution-environment-list")
459 and "juju" in v["execution-environment-list"][k]
460 ):
461 expected_value = self.db.get_list("nsrs")[i][
462 "vcaStatus"
463 ]
464 await self.my_ns._on_update_n2vc_db(
465 "nsrs", {"_id": nsr_id}, "_admin.deployed.VCA.0", {}
466 )
467 return_value = self.db.get_list("nsrs")[i]["vcaStatus"]
468 self.assertEqual(return_value, expected_value)
469
470 # Test _retry_or_skip_suboperation()
471 # Expected result:
472 # - if a suboperation's 'operationState' is marked as 'COMPLETED', SUBOPERATION_STATUS_SKIP is expected
473 # - if marked as anything but 'COMPLETED', the suboperation index is expected
474 def test_scale_retry_or_skip_suboperation(self):
475 # Load an alternative 'nslcmops' YAML for this test
476 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
477 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
478 op_index = 2
479 # Test when 'operationState' is 'COMPLETED'
480 db_nslcmop["_admin"]["operations"][op_index]["operationState"] = "COMPLETED"
481 return_value = self.my_ns._retry_or_skip_suboperation(db_nslcmop, op_index)
482 expected_value = self.my_ns.SUBOPERATION_STATUS_SKIP
483 self.assertEqual(return_value, expected_value)
484 # Test when 'operationState' is not 'COMPLETED'
485 db_nslcmop["_admin"]["operations"][op_index]["operationState"] = None
486 return_value = self.my_ns._retry_or_skip_suboperation(db_nslcmop, op_index)
487 expected_value = op_index
488 self.assertEqual(return_value, expected_value)
489
490 # Test _find_suboperation()
491 # Expected result: index of the found sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if not found
492 def test_scale_find_suboperation(self):
493 # Load an alternative 'nslcmops' YAML for this test
494 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
495 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
496 # Find this sub-operation
497 op_index = 2
498 vnf_index = db_nslcmop["_admin"]["operations"][op_index]["member_vnf_index"]
499 primitive = db_nslcmop["_admin"]["operations"][op_index]["primitive"]
500 primitive_params = db_nslcmop["_admin"]["operations"][op_index][
501 "primitive_params"
502 ]
503 match = {
504 "member_vnf_index": vnf_index,
505 "primitive": primitive,
506 "primitive_params": primitive_params,
507 }
508 found_op_index = self.my_ns._find_suboperation(db_nslcmop, match)
509 self.assertEqual(found_op_index, op_index)
510 # Test with not-matching params
511 match = {
512 "member_vnf_index": vnf_index,
513 "primitive": "",
514 "primitive_params": primitive_params,
515 }
516 found_op_index = self.my_ns._find_suboperation(db_nslcmop, match)
517 self.assertEqual(found_op_index, self.my_ns.SUBOPERATION_STATUS_NOT_FOUND)
518 # Test with None
519 match = None
520 found_op_index = self.my_ns._find_suboperation(db_nslcmop, match)
521 self.assertEqual(found_op_index, self.my_ns.SUBOPERATION_STATUS_NOT_FOUND)
522
523 # Test _update_suboperation_status()
524 def test_scale_update_suboperation_status(self):
525 self.db.set_one = asynctest.Mock()
526 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
527 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
528 op_index = 0
529 # Force the initial values to be distinct from the updated ones
530 q_filter = {"_id": db_nslcmop["_id"]}
531 # Test to change 'operationState' and 'detailed-status'
532 operationState = "COMPLETED"
533 detailed_status = "Done"
534 expected_update_dict = {
535 "_admin.operations.0.operationState": operationState,
536 "_admin.operations.0.detailed-status": detailed_status,
537 }
538 self.my_ns._update_suboperation_status(
539 db_nslcmop, op_index, operationState, detailed_status
540 )
541 self.db.set_one.assert_called_once_with(
542 "nslcmops",
543 q_filter=q_filter,
544 update_dict=expected_update_dict,
545 fail_on_empty=False,
546 )
547
548 def test_scale_add_suboperation(self):
549 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
550 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
551 vnf_index = "1"
552 num_ops_before = len(db_nslcmop.get("_admin", {}).get("operations", [])) - 1
553 vdu_id = None
554 vdu_count_index = None
555 vdu_name = None
556 primitive = "touch"
557 mapped_primitive_params = {
558 "parameter": [
559 {
560 "data-type": "STRING",
561 "name": "filename",
562 "default-value": "<touch_filename2>",
563 }
564 ],
565 "name": "touch",
566 }
567 operationState = "PROCESSING"
568 detailed_status = "In progress"
569 operationType = "PRE-SCALE"
570 # Add a 'pre-scale' suboperation
571 op_index_after = self.my_ns._add_suboperation(
572 db_nslcmop,
573 vnf_index,
574 vdu_id,
575 vdu_count_index,
576 vdu_name,
577 primitive,
578 mapped_primitive_params,
579 operationState,
580 detailed_status,
581 operationType,
582 )
583 self.assertEqual(op_index_after, num_ops_before + 1)
584
585 # Delete all suboperations and add the same operation again
586 del db_nslcmop["_admin"]["operations"]
587 op_index_zero = self.my_ns._add_suboperation(
588 db_nslcmop,
589 vnf_index,
590 vdu_id,
591 vdu_count_index,
592 vdu_name,
593 primitive,
594 mapped_primitive_params,
595 operationState,
596 detailed_status,
597 operationType,
598 )
599 self.assertEqual(op_index_zero, 0)
600
601 # Add a 'RO' suboperation
602 RO_nsr_id = "1234567890"
603 RO_scaling_info = [
604 {
605 "type": "create",
606 "count": 1,
607 "member-vnf-index": "1",
608 "osm_vdu_id": "dataVM",
609 }
610 ]
611 op_index = self.my_ns._add_suboperation(
612 db_nslcmop,
613 vnf_index,
614 vdu_id,
615 vdu_count_index,
616 vdu_name,
617 primitive,
618 mapped_primitive_params,
619 operationState,
620 detailed_status,
621 operationType,
622 RO_nsr_id,
623 RO_scaling_info,
624 )
625 db_RO_nsr_id = db_nslcmop["_admin"]["operations"][op_index]["RO_nsr_id"]
626 self.assertEqual(op_index, 1)
627 self.assertEqual(RO_nsr_id, db_RO_nsr_id)
628
629 # Try to add an invalid suboperation, should return SUBOPERATION_STATUS_NOT_FOUND
630 op_index_invalid = self.my_ns._add_suboperation(
631 None, None, None, None, None, None, None, None, None, None, None
632 )
633 self.assertEqual(op_index_invalid, self.my_ns.SUBOPERATION_STATUS_NOT_FOUND)
634
635 # Test _check_or_add_scale_suboperation() and _check_or_add_scale_suboperation_RO()
636 # check the possible return values:
637 # - SUBOPERATION_STATUS_NEW: This is a new sub-operation
638 # - op_index (non-negative number): This is an existing sub-operation, operationState != 'COMPLETED'
639 # - SUBOPERATION_STATUS_SKIP: This is an existing sub-operation, operationState == 'COMPLETED'
640 def test_scale_check_or_add_scale_suboperation(self):
641 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
642 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
643 operationType = "PRE-SCALE"
644 vnf_index = "1"
645 primitive = "touch"
646 primitive_params = {
647 "parameter": [
648 {
649 "data-type": "STRING",
650 "name": "filename",
651 "default-value": "<touch_filename2>",
652 }
653 ],
654 "name": "touch",
655 }
656
657 # Delete all sub-operations to be sure this is a new sub-operation
658 del db_nslcmop["_admin"]["operations"]
659
660 # Add a new sub-operation
661 # For new sub-operations, operationState is set to 'PROCESSING' by default
662 op_index_new = self.my_ns._check_or_add_scale_suboperation(
663 db_nslcmop, vnf_index, primitive, primitive_params, operationType
664 )
665 self.assertEqual(op_index_new, self.my_ns.SUBOPERATION_STATUS_NEW)
666
667 # Use the same parameters again to match the already added sub-operation
668 # which has status 'PROCESSING' (!= 'COMPLETED') by default
669 # The expected return value is a non-negative number
670 op_index_existing = self.my_ns._check_or_add_scale_suboperation(
671 db_nslcmop, vnf_index, primitive, primitive_params, operationType
672 )
673 self.assertTrue(op_index_existing >= 0)
674
675 # Change operationState 'manually' for this sub-operation
676 db_nslcmop["_admin"]["operations"][op_index_existing][
677 "operationState"
678 ] = "COMPLETED"
679 # Then use the same parameters again to match the already added sub-operation,
680 # which now has status 'COMPLETED'
681 # The expected return value is SUBOPERATION_STATUS_SKIP
682 op_index_skip = self.my_ns._check_or_add_scale_suboperation(
683 db_nslcmop, vnf_index, primitive, primitive_params, operationType
684 )
685 self.assertEqual(op_index_skip, self.my_ns.SUBOPERATION_STATUS_SKIP)
686
687 # RO sub-operation test:
688 # Repeat tests for the very similar _check_or_add_scale_suboperation_RO(),
689 RO_nsr_id = "1234567890"
690 RO_scaling_info = [
691 {
692 "type": "create",
693 "count": 1,
694 "member-vnf-index": "1",
695 "osm_vdu_id": "dataVM",
696 }
697 ]
698 op_index_new_RO = self.my_ns._check_or_add_scale_suboperation(
699 db_nslcmop, vnf_index, None, None, "SCALE-RO", RO_nsr_id, RO_scaling_info
700 )
701 self.assertEqual(op_index_new_RO, self.my_ns.SUBOPERATION_STATUS_NEW)
702
703 # Use the same parameters again to match the already added RO sub-operation
704 op_index_existing_RO = self.my_ns._check_or_add_scale_suboperation(
705 db_nslcmop, vnf_index, None, None, "SCALE-RO", RO_nsr_id, RO_scaling_info
706 )
707 self.assertTrue(op_index_existing_RO >= 0)
708
709 # Change operationState 'manually' for this RO sub-operation
710 db_nslcmop["_admin"]["operations"][op_index_existing_RO][
711 "operationState"
712 ] = "COMPLETED"
713 # Then use the same parameters again to match the already added sub-operation,
714 # which now has status 'COMPLETED'
715 # The expected return value is SUBOPERATION_STATUS_SKIP
716 op_index_skip_RO = self.my_ns._check_or_add_scale_suboperation(
717 db_nslcmop, vnf_index, None, None, "SCALE-RO", RO_nsr_id, RO_scaling_info
718 )
719 self.assertEqual(op_index_skip_RO, self.my_ns.SUBOPERATION_STATUS_SKIP)
720
721 async def test_deploy_kdus(self):
722 nsr_id = descriptors.test_ids["TEST-KDU"]["ns"]
723 nslcmop_id = descriptors.test_ids["TEST-KDU"]["instantiate"]
724 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
725 db_vnfr = self.db.get_one(
726 "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "multikdu"}
727 )
728 db_vnfrs = {"multikdu": db_vnfr}
729 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
730 db_vnfds = [db_vnfd]
731 task_register = {}
732 logging_text = "KDU"
733 self.my_ns.k8sclusterhelm3.generate_kdu_instance_name = asynctest.mock.Mock()
734 self.my_ns.k8sclusterhelm3.generate_kdu_instance_name.return_value = "k8s_id"
735 self.my_ns.k8sclusterhelm3.install = asynctest.CoroutineMock()
736 self.my_ns.k8sclusterhelm3.synchronize_repos = asynctest.CoroutineMock(
737 return_value=("", "")
738 )
739 self.my_ns.k8sclusterhelm3.get_services = asynctest.CoroutineMock(
740 return_value=([])
741 )
742 await self.my_ns.deploy_kdus(
743 logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register
744 )
745 await asyncio.wait(list(task_register.keys()), timeout=100)
746 db_nsr = self.db.get_list("nsrs")[1]
747 self.assertIn(
748 "K8s",
749 db_nsr["_admin"]["deployed"],
750 "K8s entry not created at '_admin.deployed'",
751 )
752 self.assertIsInstance(
753 db_nsr["_admin"]["deployed"]["K8s"], list, "K8s entry is not of type list"
754 )
755 self.assertEqual(
756 len(db_nsr["_admin"]["deployed"]["K8s"]), 2, "K8s entry is not of type list"
757 )
758 k8s_instace_info = {
759 "kdu-instance": "k8s_id",
760 "k8scluster-uuid": "73d96432-d692-40d2-8440-e0c73aee209c",
761 "k8scluster-type": "helm-chart-v3",
762 "kdu-name": "ldap",
763 "member-vnf-index": "multikdu",
764 "namespace": None,
765 "kdu-deployment-name": None,
766 }
767
768 nsr_result = copy.deepcopy(db_nsr["_admin"]["deployed"]["K8s"][0])
769 nsr_kdu_model_result = nsr_result.pop("kdu-model")
770 expected_kdu_model = "stable/openldap:1.2.1"
771 self.assertEqual(nsr_result, k8s_instace_info)
772 self.assertTrue(
773 nsr_kdu_model_result in expected_kdu_model
774 or expected_kdu_model in nsr_kdu_model_result
775 )
776 nsr_result = copy.deepcopy(db_nsr["_admin"]["deployed"]["K8s"][1])
777 nsr_kdu_model_result = nsr_result.pop("kdu-model")
778 k8s_instace_info["kdu-name"] = "mongo"
779 expected_kdu_model = "stable/mongodb"
780 self.assertEqual(nsr_result, k8s_instace_info)
781 self.assertTrue(
782 nsr_kdu_model_result in expected_kdu_model
783 or expected_kdu_model in nsr_kdu_model_result
784 )
785
786 # async def test_instantiate_pdu(self):
787 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
788 # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
789 # # Modify vnfd/vnfr to change KDU for PDU. Adding keys that NBI will already set
790 # self.db.set_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "1"},
791 # update_dict={"ip-address": "10.205.1.46",
792 # "vdur.0.pdu-id": "53e1ec21-2464-451e-a8dc-6e311d45b2c8",
793 # "vdur.0.pdu-type": "PDU-TYPE-1",
794 # "vdur.0.ip-address": "10.205.1.46",
795 # },
796 # unset={"vdur.status": None})
797 # self.db.set_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "2"},
798 # update_dict={"ip-address": "10.205.1.47",
799 # "vdur.0.pdu-id": "53e1ec21-2464-451e-a8dc-6e311d45b2c8",
800 # "vdur.0.pdu-type": "PDU-TYPE-1",
801 # "vdur.0.ip-address": "10.205.1.47",
802 # },
803 # unset={"vdur.status": None})
804
805 # await self.my_ns.instantiate(nsr_id, nslcmop_id)
806 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
807 # self.assertEqual(db_nsr.get("nsState"), "READY", str(db_nsr.get("errorDescription ")))
808 # self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'")
809 # self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None")
810 # self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None")
811 # self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None")
812
813 # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
814 # async def test_terminate_without_configuration(self):
815 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
816 # nslcmop_id = descriptors.test_ids["TEST-A"]["terminate"]
817 # # set instantiation task as completed
818 # self.db.set_list("nslcmops", {"nsInstanceId": nsr_id, "_id.ne": nslcmop_id},
819 # update_dict={"operationState": "COMPLETED"})
820 # self.db.set_one("nsrs", {"_id": nsr_id},
821 # update_dict={"_admin.deployed.VCA.0": None, "_admin.deployed.VCA.1": None})
822
823 # await self.my_ns.terminate(nsr_id, nslcmop_id)
824 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
825 # self.assertEqual(db_nslcmop.get("operationState"), 'COMPLETED', db_nslcmop.get("detailed-status"))
826 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
827 # self.assertEqual(db_nsr.get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
828 # self.assertEqual(db_nsr["_admin"].get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
829 # self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'")
830 # self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None")
831 # self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None")
832 # self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None")
833 # db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
834 # for vnfr in db_vnfrs_list:
835 # self.assertEqual(vnfr["_admin"].get("nsState"), "NOT_INSTANTIATED", "Not instantiated")
836
837 # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
838 # async def test_terminate_primitive(self):
839 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
840 # nslcmop_id = descriptors.test_ids["TEST-A"]["terminate"]
841 # # set instantiation task as completed
842 # self.db.set_list("nslcmops", {"nsInstanceId": nsr_id, "_id.ne": nslcmop_id},
843 # update_dict={"operationState": "COMPLETED"})
844
845 # # modify vnfd descriptor to include terminate_primitive
846 # terminate_primitive = [{
847 # "name": "touch",
848 # "parameter": [{"name": "filename", "value": "terminate_filename"}],
849 # "seq": '1'
850 # }]
851 # db_vnfr = self.db.get_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "1"})
852 # self.db.set_one("vnfds", {"_id": db_vnfr["vnfd-id"]},
853 # {"vnf-configuration.0.terminate-config-primitive": terminate_primitive})
854
855 # await self.my_ns.terminate(nsr_id, nslcmop_id)
856 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
857 # self.assertEqual(db_nslcmop.get("operationState"), 'COMPLETED', db_nslcmop.get("detailed-status"))
858 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
859 # self.assertEqual(db_nsr.get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
860 # self.assertEqual(db_nsr["_admin"].get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
861 # self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'")
862 # self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None")
863 # self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None")
864 # self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None")
865
866 # Test update method
867
868 async def test_update(self):
869
870 nsr_id = descriptors.test_ids["TEST-A"]["ns"]
871 nslcmop_id = descriptors.test_ids["TEST-A"]["update"]
872 vnfr_id = "6421c7c9-d865-4fb4-9a13-d4275d243e01"
873 vnfd_id = "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"
874
875 def mock_reset():
876 mock_charm_hash.reset_mock()
877 mock_juju_bundle.reset_mock()
878 fs.sync.reset_mock()
879 mock_charm_upgrade.reset_mock()
880 mock_software_version.reset_mock()
881
882 with self.subTest(
883 i=1,
884 t="Update type: CHANGE_VNFPKG, latest_vnfd revision changed,"
885 "Charm package changed, sw-version is not changed.",
886 ):
887
888 self.db.set_one(
889 "vnfds",
890 q_filter={"_id": vnfd_id},
891 update_dict={"_admin.revision": 3, "kdu": []},
892 )
893
894 self.db.set_one(
895 "vnfds_revisions",
896 q_filter={"_id": vnfd_id + ":1"},
897 update_dict={"_admin.revision": 1, "kdu": []},
898 )
899
900 self.db.set_one(
901 "vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1}
902 )
903
904 mock_charm_hash = Mock(autospec=True)
905 mock_charm_hash.return_value = True
906
907 mock_juju_bundle = Mock(return_value=None)
908
909 mock_software_version = Mock(autospec=True)
910 mock_software_version.side_effect = ["1.0", "1.0"]
911
912 mock_charm_upgrade = asynctest.Mock(autospec=True)
913 task = asyncio.Future()
914 task.set_result(("COMPLETED", "some_output"))
915 mock_charm_upgrade.return_value = task
916
917 fs = Mock(autospec=True)
918 fs.path.__add__ = Mock()
919 fs.path.side_effect = ["/", "/", "/", "/"]
920 fs.sync.side_effect = [None, None]
921
922 instance = self.my_ns
923
924 expected_operation_state = "COMPLETED"
925 expected_operation_error = ""
926 expected_vnfr_revision = 3
927 expected_ns_state = "INSTANTIATED"
928 expected_ns_operational_state = "running"
929
930 with patch.object(instance, "fs", fs), patch(
931 "osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed", mock_charm_hash
932 ), patch("osm_lcm.ns.NsLcm._ns_charm_upgrade", mock_charm_upgrade), patch(
933 "osm_lcm.data_utils.vnfd.find_software_version", mock_software_version
934 ), patch(
935 "osm_lcm.lcm_utils.check_juju_bundle_existence", mock_juju_bundle
936 ):
937
938 await instance.update(nsr_id, nslcmop_id)
939 return_operation_state = self.db.get_one(
940 "nslcmops", {"_id": nslcmop_id}
941 ).get("operationState")
942 return_operation_error = self.db.get_one(
943 "nslcmops", {"_id": nslcmop_id}
944 ).get("errorMessage")
945 return_ns_operational_state = self.db.get_one(
946 "nsrs", {"_id": nsr_id}
947 ).get("operational-status")
948
949 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
950 "revision"
951 )
952
953 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
954 "nsState"
955 )
956
957 mock_charm_hash.assert_called_with(
958 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
959 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
960 )
961
962 self.assertEqual(fs.sync.call_count, 2)
963 self.assertEqual(return_ns_state, expected_ns_state)
964 self.assertEqual(return_operation_state, expected_operation_state)
965 self.assertEqual(return_operation_error, expected_operation_error)
966 self.assertEqual(
967 return_ns_operational_state, expected_ns_operational_state
968 )
969 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
970
971 mock_reset()
972
973 with self.subTest(
974 i=2, t="Update type: CHANGE_VNFPKG, latest_vnfd revision not changed"
975 ):
976
977 self.db.set_one(
978 "vnfds", q_filter={"_id": vnfd_id}, update_dict={"_admin.revision": 1}
979 )
980
981 self.db.set_one(
982 "vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1}
983 )
984
985 mock_charm_hash = Mock(autospec=True)
986 mock_charm_hash.return_value = True
987
988 mock_juju_bundle = Mock(return_value=None)
989 mock_software_version = Mock(autospec=True)
990
991 mock_charm_upgrade = asynctest.Mock(autospec=True)
992 task = asyncio.Future()
993 task.set_result(("COMPLETED", "some_output"))
994 mock_charm_upgrade.return_value = task
995
996 fs = Mock(autospec=True)
997 fs.path.__add__ = Mock()
998 fs.path.side_effect = ["/", "/", "/", "/"]
999 fs.sync.side_effect = [None, None]
1000
1001 instance = self.my_ns
1002
1003 expected_operation_state = "COMPLETED"
1004 expected_operation_error = ""
1005 expected_vnfr_revision = 1
1006 expected_ns_state = "INSTANTIATED"
1007 expected_ns_operational_state = "running"
1008
1009 with patch.object(instance, "fs", fs), patch(
1010 "osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed", mock_charm_hash
1011 ), patch("osm_lcm.ns.NsLcm._ns_charm_upgrade", mock_charm_upgrade), patch(
1012 "osm_lcm.lcm_utils.check_juju_bundle_existence", mock_juju_bundle
1013 ):
1014
1015 await instance.update(nsr_id, nslcmop_id)
1016
1017 return_operation_state = self.db.get_one(
1018 "nslcmops", {"_id": nslcmop_id}
1019 ).get("operationState")
1020
1021 return_operation_error = self.db.get_one(
1022 "nslcmops", {"_id": nslcmop_id}
1023 ).get("errorMessage")
1024
1025 return_ns_operational_state = self.db.get_one(
1026 "nsrs", {"_id": nsr_id}
1027 ).get("operational-status")
1028
1029 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1030 "nsState"
1031 )
1032
1033 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1034 "revision"
1035 )
1036
1037 mock_charm_hash.assert_not_called()
1038 mock_software_version.assert_not_called()
1039 mock_juju_bundle.assert_not_called()
1040 mock_charm_upgrade.assert_not_called()
1041 fs.sync.assert_not_called()
1042
1043 self.assertEqual(return_ns_state, expected_ns_state)
1044 self.assertEqual(return_operation_state, expected_operation_state)
1045 self.assertEqual(return_operation_error, expected_operation_error)
1046 self.assertEqual(
1047 return_ns_operational_state, expected_ns_operational_state
1048 )
1049 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1050
1051 mock_reset()
1052
1053 with self.subTest(
1054 i=3,
1055 t="Update type: CHANGE_VNFPKG, latest_vnfd revision changed, "
1056 "Charm package is not changed, sw-version is not changed.",
1057 ):
1058
1059 self.db.set_one(
1060 "vnfds", q_filter={"_id": vnfd_id}, update_dict={"_admin.revision": 3}
1061 )
1062
1063 self.db.set_one(
1064 "vnfds_revisions",
1065 q_filter={"_id": vnfd_id + ":1"},
1066 update_dict={"_admin.revision": 1},
1067 )
1068
1069 self.db.set_one(
1070 "vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1}
1071 )
1072
1073 mock_charm_hash = Mock(autospec=True)
1074 mock_charm_hash.return_value = False
1075
1076 mock_juju_bundle = Mock(return_value=None)
1077
1078 mock_software_version = Mock(autospec=True)
1079
1080 mock_charm_upgrade = asynctest.Mock(autospec=True)
1081 task = asyncio.Future()
1082 task.set_result(("COMPLETED", "some_output"))
1083 mock_charm_upgrade.return_value = task
1084 mock_software_version.side_effect = ["1.0", "1.0"]
1085
1086 fs = Mock(autospec=True)
1087 fs.path.__add__ = Mock()
1088 fs.path.side_effect = ["/", "/", "/", "/"]
1089 fs.sync.side_effect = [None, None]
1090
1091 instance = self.my_ns
1092
1093 expected_operation_state = "COMPLETED"
1094 expected_operation_error = ""
1095 expected_vnfr_revision = 3
1096 expected_ns_state = "INSTANTIATED"
1097 expected_ns_operational_state = "running"
1098
1099 with patch.object(instance, "fs", fs), patch(
1100 "osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed", mock_charm_hash
1101 ), patch("osm_lcm.ns.NsLcm._ns_charm_upgrade", mock_charm_upgrade), patch(
1102 "osm_lcm.lcm_utils.check_juju_bundle_existence", mock_juju_bundle
1103 ):
1104
1105 await instance.update(nsr_id, nslcmop_id)
1106
1107 return_operation_state = self.db.get_one(
1108 "nslcmops", {"_id": nslcmop_id}
1109 ).get("operationState")
1110
1111 return_operation_error = self.db.get_one(
1112 "nslcmops", {"_id": nslcmop_id}
1113 ).get("errorMessage")
1114
1115 return_ns_operational_state = self.db.get_one(
1116 "nsrs", {"_id": nsr_id}
1117 ).get("operational-status")
1118
1119 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1120 "revision"
1121 )
1122
1123 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1124 "nsState"
1125 )
1126
1127 mock_charm_hash.assert_called_with(
1128 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
1129 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
1130 )
1131
1132 self.assertEqual(fs.sync.call_count, 2)
1133 self.assertEqual(mock_charm_hash.call_count, 1)
1134
1135 mock_juju_bundle.assert_not_called()
1136 mock_charm_upgrade.assert_not_called()
1137
1138 self.assertEqual(return_ns_state, expected_ns_state)
1139 self.assertEqual(return_operation_state, expected_operation_state)
1140 self.assertEqual(return_operation_error, expected_operation_error)
1141 self.assertEqual(
1142 return_ns_operational_state, expected_ns_operational_state
1143 )
1144 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1145
1146 mock_reset()
1147
1148 with self.subTest(
1149 i=4,
1150 t="Update type: CHANGE_VNFPKG, latest_vnfd revision changed, "
1151 "Charm package exists, sw-version changed.",
1152 ):
1153
1154 self.db.set_one(
1155 "vnfds",
1156 q_filter={"_id": vnfd_id},
1157 update_dict={"_admin.revision": 3, "software-version": "3.0"},
1158 )
1159
1160 self.db.set_one(
1161 "vnfds_revisions",
1162 q_filter={"_id": vnfd_id + ":1"},
1163 update_dict={"_admin.revision": 1},
1164 )
1165
1166 self.db.set_one(
1167 "vnfrs",
1168 q_filter={"_id": vnfr_id},
1169 update_dict={"revision": 1},
1170 )
1171
1172 mock_charm_hash = Mock(autospec=True)
1173 mock_charm_hash.return_value = False
1174
1175 mock_juju_bundle = Mock(return_value=None)
1176
1177 mock_charm_upgrade = asynctest.Mock(autospec=True)
1178 task = asyncio.Future()
1179 task.set_result(("COMPLETED", "some_output"))
1180 mock_charm_upgrade.return_value = task
1181
1182 mock_charm_artifact = Mock(autospec=True)
1183 mock_charm_artifact.side_effect = [
1184 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
1185 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
1186 ]
1187
1188 fs = Mock(autospec=True)
1189 fs.path.__add__ = Mock()
1190 fs.path.side_effect = ["/", "/", "/", "/"]
1191 fs.sync.side_effect = [None, None]
1192
1193 instance = self.my_ns
1194
1195 expected_operation_state = "FAILED"
1196 expected_operation_error = "FAILED Checking if existing VNF has charm: Software version change is not supported as VNF instance 6421c7c9-d865-4fb4-9a13-d4275d243e01 has charm."
1197 expected_vnfr_revision = 1
1198 expected_ns_state = "INSTANTIATED"
1199 expected_ns_operational_state = "running"
1200
1201 with patch.object(instance, "fs", fs), patch(
1202 "osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed", mock_charm_hash
1203 ), patch("osm_lcm.ns.NsLcm._ns_charm_upgrade", mock_charm_upgrade), patch(
1204 "osm_lcm.lcm_utils.get_charm_artifact_path", mock_charm_artifact
1205 ):
1206
1207 await instance.update(nsr_id, nslcmop_id)
1208
1209 return_operation_state = self.db.get_one(
1210 "nslcmops", {"_id": nslcmop_id}
1211 ).get("operationState")
1212
1213 return_operation_error = self.db.get_one(
1214 "nslcmops", {"_id": nslcmop_id}
1215 ).get("errorMessage")
1216
1217 return_ns_operational_state = self.db.get_one(
1218 "nsrs", {"_id": nsr_id}
1219 ).get("operational-status")
1220
1221 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1222 "revision"
1223 )
1224
1225 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1226 "nsState"
1227 )
1228
1229 self.assertEqual(fs.sync.call_count, 2)
1230 mock_charm_hash.assert_not_called()
1231
1232 mock_juju_bundle.assert_not_called()
1233 mock_charm_upgrade.assert_not_called()
1234
1235 self.assertEqual(return_ns_state, expected_ns_state)
1236 self.assertEqual(return_operation_state, expected_operation_state)
1237 self.assertEqual(return_operation_error, expected_operation_error)
1238 self.assertEqual(
1239 return_ns_operational_state, expected_ns_operational_state
1240 )
1241 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1242
1243 mock_reset()
1244
1245 with self.subTest(
1246 i=5,
1247 t="Update type: CHANGE_VNFPKG, latest_vnfd revision changed,"
1248 "Charm package exists, sw-version not changed, juju-bundle exists",
1249 ):
1250
1251 self.db.set_one(
1252 "vnfds",
1253 q_filter={"_id": vnfd_id},
1254 update_dict={
1255 "_admin.revision": 3,
1256 "software-version": "1.0",
1257 "kdu.0.juju-bundle": "stable/native-kdu",
1258 },
1259 )
1260
1261 self.db.set_one(
1262 "vnfds_revisions",
1263 q_filter={"_id": vnfd_id + ":1"},
1264 update_dict={
1265 "_admin.revision": 1,
1266 "software-version": "1.0",
1267 "kdu.0.juju-bundle": "stable/native-kdu",
1268 },
1269 )
1270
1271 self.db.set_one(
1272 "vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1}
1273 )
1274
1275 mock_charm_hash = Mock(autospec=True)
1276 mock_charm_hash.return_value = True
1277
1278 mock_charm_artifact = Mock(autospec=True)
1279 mock_charm_artifact.side_effect = [
1280 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
1281 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
1282 ]
1283
1284 fs = Mock(autospec=True)
1285 fs.path.__add__ = Mock()
1286 fs.path.side_effect = ["/", "/", "/", "/"]
1287 fs.sync.side_effect = [None, None]
1288
1289 instance = self.my_ns
1290
1291 expected_operation_state = "FAILED"
1292 expected_operation_error = "FAILED Checking whether VNF uses juju bundle: Charm upgrade is not supported for the instance which uses juju-bundle: stable/native-kdu"
1293 expected_vnfr_revision = 1
1294 expected_ns_state = "INSTANTIATED"
1295 expected_ns_operational_state = "running"
1296
1297 with patch.object(instance, "fs", fs), patch(
1298 "osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed", mock_charm_hash
1299 ), patch("osm_lcm.lcm_utils.get_charm_artifact_path", mock_charm_artifact):
1300
1301 await instance.update(nsr_id, nslcmop_id)
1302
1303 return_operation_state = self.db.get_one(
1304 "nslcmops", {"_id": nslcmop_id}
1305 ).get("operationState")
1306
1307 return_operation_error = self.db.get_one(
1308 "nslcmops", {"_id": nslcmop_id}
1309 ).get("errorMessage")
1310
1311 return_ns_operational_state = self.db.get_one(
1312 "nsrs", {"_id": nsr_id}
1313 ).get("operational-status")
1314
1315 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1316 "revision"
1317 )
1318
1319 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1320 "nsState"
1321 )
1322
1323 self.assertEqual(fs.sync.call_count, 2)
1324 self.assertEqual(mock_charm_hash.call_count, 1)
1325 self.assertEqual(mock_charm_hash.call_count, 1)
1326
1327 mock_charm_upgrade.assert_not_called()
1328
1329 self.assertEqual(return_ns_state, expected_ns_state)
1330 self.assertEqual(return_operation_state, expected_operation_state)
1331 self.assertEqual(return_operation_error, expected_operation_error)
1332 self.assertEqual(
1333 return_ns_operational_state, expected_ns_operational_state
1334 )
1335 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1336
1337 mock_reset()
1338
1339 with self.subTest(
1340 i=6,
1341 t="Update type: CHANGE_VNFPKG, latest_vnfd revision changed,"
1342 "Charm package exists, sw-version not changed, charm-upgrade failed",
1343 ):
1344
1345 self.db.set_one(
1346 "vnfds",
1347 q_filter={"_id": vnfd_id},
1348 update_dict={
1349 "_admin.revision": 3,
1350 "software-version": "1.0",
1351 "kdu": [],
1352 },
1353 )
1354
1355 self.db.set_one(
1356 "vnfds_revisions",
1357 q_filter={"_id": vnfd_id + ":1"},
1358 update_dict={
1359 "_admin.revision": 1,
1360 "software-version": "1.0",
1361 "kdu": [],
1362 },
1363 )
1364
1365 self.db.set_one(
1366 "vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1}
1367 )
1368
1369 mock_charm_hash = Mock(autospec=True)
1370 mock_charm_hash.return_value = True
1371
1372 mock_charm_upgrade = asynctest.Mock(autospec=True)
1373 task = asyncio.Future()
1374 task.set_result(("FAILED", "some_error"))
1375 mock_charm_upgrade.return_value = task
1376
1377 mock_charm_artifact = Mock(autospec=True)
1378 mock_charm_artifact.side_effect = [
1379 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
1380 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
1381 ]
1382
1383 fs = Mock(autospec=True)
1384 fs.path.__add__ = Mock()
1385 fs.path.side_effect = ["/", "/", "/", "/"]
1386 fs.sync.side_effect = [None, None]
1387
1388 instance = self.my_ns
1389
1390 expected_operation_state = "FAILED"
1391 expected_operation_error = "some_error"
1392 expected_vnfr_revision = 1
1393 expected_ns_state = "INSTANTIATED"
1394 expected_ns_operational_state = "running"
1395
1396 with patch.object(instance, "fs", fs), patch(
1397 "osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed", mock_charm_hash
1398 ), patch("osm_lcm.ns.NsLcm._ns_charm_upgrade", mock_charm_upgrade):
1399
1400 await instance.update(nsr_id, nslcmop_id)
1401
1402 return_operation_state = self.db.get_one(
1403 "nslcmops", {"_id": nslcmop_id}
1404 ).get("operationState")
1405
1406 return_operation_error = self.db.get_one(
1407 "nslcmops", {"_id": nslcmop_id}
1408 ).get("errorMessage")
1409
1410 return_ns_operational_state = self.db.get_one(
1411 "nsrs", {"_id": nsr_id}
1412 ).get("operational-status")
1413
1414 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1415 "revision"
1416 )
1417
1418 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1419 "nsState"
1420 )
1421
1422 self.assertEqual(fs.sync.call_count, 2)
1423 self.assertEqual(mock_charm_hash.call_count, 1)
1424 self.assertEqual(mock_charm_upgrade.call_count, 1)
1425
1426 self.assertEqual(return_ns_state, expected_ns_state)
1427 self.assertEqual(return_operation_state, expected_operation_state)
1428 self.assertEqual(return_operation_error, expected_operation_error)
1429 self.assertEqual(
1430 return_ns_operational_state, expected_ns_operational_state
1431 )
1432 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1433
1434 mock_reset()
1435
1436 def test_ns_update_helper_methods(self):
1437 def mock_reset():
1438 fs.mock_reset()
1439 mock_path.mock_reset()
1440 mock_checksumdir.mock_reset()
1441
1442 with self.subTest(
1443 i=1, t="Find software version, VNFD does not have have software version"
1444 ):
1445 # Testing method find_software_version
1446
1447 db_vnfd = self.db.get_one(
1448 "vnfds", {"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"}
1449 )
1450 expected_result = "1.0"
1451 result = find_software_version(db_vnfd)
1452 self.assertEqual(
1453 result, expected_result, "Default sw version should be 1.0"
1454 )
1455
1456 with self.subTest(
1457 i=2, t="Find software version, VNFD includes software version"
1458 ):
1459 # Testing method find_software_version
1460
1461 db_vnfd = self.db.get_one(
1462 "vnfds", {"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"}
1463 )
1464 db_vnfd["software-version"] = "3.1"
1465 expected_result = "3.1"
1466 result = find_software_version(db_vnfd)
1467 self.assertEqual(result, expected_result, "VNFD software version is wrong")
1468
1469 with self.subTest(i=3, t="Check charm hash, Hash has did not change"):
1470 # Testing method check_charm_hash_changed
1471
1472 current_path, target_path = "/tmp/charm1", "/tmp/charm1"
1473 fs = Mock(autospec=True)
1474 fs.path.__add__ = Mock()
1475 fs.path.side_effect = ["/", "/", "/", "/"]
1476
1477 mock_path = Mock(autospec=True)
1478 mock_path.exists.side_effect = [True, True]
1479
1480 mock_checksumdir = Mock(autospec=True)
1481 mock_checksumdir.dirhash.side_effect = ["hash_value", "hash_value"]
1482
1483 instance = self.my_ns
1484 expected_result = False
1485
1486 with patch.object(instance, "fs", fs), patch(
1487 "checksumdir.dirhash", mock_checksumdir.dirhash
1488 ), patch("os.path.exists", mock_path.exists):
1489
1490 result = instance.check_charm_hash_changed(current_path, target_path)
1491 self.assertEqual(
1492 result, expected_result, "Wrong charm hash control value"
1493 )
1494 self.assertEqual(mock_path.exists.call_count, 2)
1495 self.assertEqual(mock_checksumdir.dirhash.call_count, 2)
1496
1497 mock_reset()
1498
1499 with self.subTest(i=4, t="Check charm hash, Hash has changed"):
1500 # Testing method check_charm_hash_changed
1501
1502 current_path, target_path = "/tmp/charm1", "/tmp/charm2"
1503 fs = Mock(autospec=True)
1504 fs.path.__add__ = Mock()
1505 fs.path.side_effect = ["/", "/", "/", "/"]
1506
1507 mock_path = Mock(autospec=True)
1508 mock_path.exists.side_effect = [True, True]
1509
1510 mock_checksumdir = Mock(autospec=True)
1511 mock_checksumdir.dirhash.side_effect = ["hash_value", "another_hash_value"]
1512
1513 instance = self.my_ns
1514 expected_result = True
1515
1516 with patch.object(instance, "fs", fs), patch(
1517 "checksumdir.dirhash", mock_checksumdir.dirhash
1518 ), patch("os.path.exists", mock_path.exists):
1519
1520 result = instance.check_charm_hash_changed(current_path, target_path)
1521 self.assertEqual(
1522 result, expected_result, "Wrong charm hash control value"
1523 )
1524 self.assertEqual(mock_path.exists.call_count, 2)
1525 self.assertEqual(mock_checksumdir.dirhash.call_count, 2)
1526
1527 mock_reset()
1528
1529 with self.subTest(i=5, t="Check charm hash, Charm path does not exists"):
1530 # Testing method check_charm_hash_changed
1531
1532 current_path, target_path = "/tmp/charm1", "/tmp/charm2"
1533 fs = Mock(autospec=True)
1534 fs.path.__add__ = Mock()
1535 fs.path.side_effect = ["/", "/", "/", "/"]
1536
1537 mock_path = Mock(autospec=True)
1538 mock_path.exists.side_effect = [True, False]
1539
1540 mock_checksumdir = Mock(autospec=True)
1541 mock_checksumdir.dirhash.side_effect = ["hash_value", "hash_value"]
1542
1543 instance = self.my_ns
1544
1545 with patch.object(instance, "fs", fs), patch(
1546 "checksumdir.dirhash", mock_checksumdir.dirhash
1547 ), patch("os.path.exists", mock_path.exists):
1548
1549 with self.assertRaises(LcmException):
1550
1551 instance.check_charm_hash_changed(current_path, target_path)
1552 self.assertEqual(mock_path.exists.call_count, 2)
1553 self.assertEqual(mock_checksumdir.dirhash.call_count, 0)
1554
1555 mock_reset()
1556
1557 with self.subTest(i=6, t="Check juju bundle existence"):
1558 # Testing method check_juju_bundle_existence
1559
1560 test_vnfd1 = self.db.get_one(
1561 "vnfds", {"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"}
1562 )
1563 test_vnfd2 = self.db.get_one(
1564 "vnfds", {"_id": "d96b1cdf-5ad6-49f7-bf65-907ada989293"}
1565 )
1566
1567 expected_result = None
1568 result = check_juju_bundle_existence(test_vnfd1)
1569 self.assertEqual(result, expected_result, "Wrong juju bundle name")
1570
1571 expected_result = "stable/native-kdu"
1572 result = check_juju_bundle_existence(test_vnfd2)
1573 self.assertEqual(result, expected_result, "Wrong juju bundle name")
1574
1575 with self.subTest(i=7, t="Check charm artifacts"):
1576 # Testing method check_juju_bundle_existence
1577
1578 base_folder = {
1579 "folder": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77",
1580 "pkg-dir": "hackfest_3charmed_vnfd",
1581 }
1582 charm_name = "simple"
1583 charm_type = "lxc_proxy_charm"
1584 revision = 3
1585
1586 expected_result = "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple"
1587 result = get_charm_artifact_path(
1588 base_folder, charm_name, charm_type, revision
1589 )
1590 self.assertEqual(result, expected_result, "Wrong charm artifact path")
1591
1592 # SOL004 packages
1593 base_folder = {
1594 "folder": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77",
1595 }
1596 charm_name = "basic"
1597 charm_type = ""
1598 revision = ""
1599
1600 expected_result = (
1601 "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/Scripts/helm-charts/basic"
1602 )
1603 result = get_charm_artifact_path(
1604 base_folder, charm_name, charm_type, revision
1605 )
1606 self.assertEqual(result, expected_result, "Wrong charm artifact path")
1607
1608
1609 if __name__ == "__main__":
1610 asynctest.main()