Bug 2210 NS instantiation fails in basic12
[osm/LCM.git] / osm_lcm / tests / test_ns.py
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
12 # under the License.
13 #
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
16 ##
17
18
19 import asynctest # pip3 install asynctest --user
20 import asyncio
21 from copy import deepcopy
22 import yaml
23 import copy
24 from n2vc.exceptions import N2VCException
25 from os import getenv
26 from osm_lcm import ns
27 from osm_common.msgkafka import MsgKafka
28
29 from osm_lcm.data_utils.lcm_config import LcmCfg
30 from osm_lcm.lcm_utils import TaskRegistry
31 from osm_lcm.ng_ro import NgRoClient
32 from osm_lcm.data_utils.database.database import Database
33 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
34 from osm_lcm.data_utils.vca import Relation, EERelation, DeployedVCA
35 from osm_lcm.data_utils.vnfd import find_software_version
36 from osm_lcm.lcm_utils import check_juju_bundle_existence, get_charm_artifact_path
37 from osm_lcm.lcm_utils import LcmException
38 from uuid import uuid4
39 from unittest.mock import Mock, patch
40
41 from osm_lcm.tests import test_db_descriptors as descriptors
42
43 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
44
45 """ Perform unittests using asynctest of osm_lcm.ns module
46 It allows, if some testing ENV are supplied, testing without mocking some external libraries for debugging:
47 OSMLCMTEST_NS_PUBKEY: public ssh-key returned by N2VC to inject to VMs
48 OSMLCMTEST_NS_NAME: change name of NS
49 OSMLCMTEST_PACKAGES_PATH: path where the vnf-packages are stored (de-compressed), each one on a 'vnfd_id' folder
50 OSMLCMTEST_NS_IPADDRESS: IP address where emulated VMs are reached. Comma separate list
51 OSMLCMTEST_RO_VIMID: VIM id of RO target vim IP. Obtain it with openmano datcenter-list on RO container
52 OSMLCMTEST_VCA_NOMOCK: Do no mock the VCA, N2VC library, for debugging it
53 OSMLCMTEST_RO_NOMOCK: Do no mock the ROClient library, for debugging it
54 OSMLCMTEST_DB_NOMOCK: Do no mock the database library, for debugging it
55 OSMLCMTEST_FS_NOMOCK: Do no mock the File Storage library, for debugging it
56 OSMLCMTEST_LOGGING_NOMOCK: Do no mock the logging
57 OSMLCM_VCA_XXX: configuration of N2VC
58 OSMLCM_RO_XXX: configuration of RO
59 """
60
61 lcm_config_dict = {
62 "global": {"loglevel": "DEBUG"},
63 "timeout": {},
64 "VCA": { # TODO replace with os.get_env to get other configurations
65 "host": getenv("OSMLCM_VCA_HOST", "vca"),
66 "port": getenv("OSMLCM_VCA_PORT", 17070),
67 "user": getenv("OSMLCM_VCA_USER", "admin"),
68 "secret": getenv("OSMLCM_VCA_SECRET", "vca"),
69 "public_key": getenv("OSMLCM_VCA_PUBKEY", None),
70 "ca_cert": getenv("OSMLCM_VCA_CACERT", None),
71 "apiproxy": getenv("OSMLCM_VCA_APIPROXY", "192.168.1.1"),
72 },
73 "RO": {
74 "uri": "http://{}:{}/openmano".format(
75 getenv("OSMLCM_RO_HOST", "ro"), getenv("OSMLCM_RO_PORT", "9090")
76 ),
77 "tenant": getenv("OSMLCM_RO_TENANT", "osm"),
78 "logger_name": "lcm.ROclient",
79 "loglevel": "DEBUG",
80 "ng": True,
81 },
82 }
83
84 lcm_config = LcmCfg()
85 lcm_config.set_from_dict(lcm_config_dict)
86 lcm_config.transform()
87
88 nsr_id = descriptors.test_ids["TEST-A"]["ns"]
89 nslcmop_id = descriptors.test_ids["TEST-A"]["update"]
90 vnfr_id = "6421c7c9-d865-4fb4-9a13-d4275d243e01"
91 vnfd_id = "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"
92 update_fs = Mock(autospec=True)
93 update_fs.path.__add__ = Mock()
94 update_fs.path.side_effect = ["/", "/", "/", "/"]
95 update_fs.sync.side_effect = [None, None]
96
97
98 def callable(a):
99 return a
100
101
102 class TestBaseNS(asynctest.TestCase):
103 async def _n2vc_DeployCharms(
104 self,
105 model_name,
106 application_name,
107 vnfd,
108 charm_path,
109 params={},
110 machine_spec={},
111 callback=None,
112 *callback_args,
113 ):
114 if callback:
115 for status, message in (
116 ("maintenance", "installing sofwware"),
117 ("active", "Ready!"),
118 ):
119 # call callback after some time
120 asyncio.sleep(5, loop=self.loop)
121 callback(model_name, application_name, status, message, *callback_args)
122
123 @staticmethod
124 def _n2vc_FormatApplicationName(*args):
125 num_calls = 0
126 while True:
127 yield "app_name-{}".format(num_calls)
128 num_calls += 1
129
130 def _n2vc_CreateExecutionEnvironment(
131 self, namespace, reuse_ee_id, db_dict, *args, **kwargs
132 ):
133 k_list = namespace.split(".")
134 ee_id = k_list[1] + "."
135 if len(k_list) >= 2:
136 for k in k_list[2:4]:
137 ee_id += k[:8]
138 else:
139 ee_id += "_NS_"
140 return ee_id, {}
141
142 def _ro_status(self, *args, **kwargs):
143 print("Args > {}".format(args))
144 print("kwargs > {}".format(kwargs))
145 if args:
146 if "update" in args:
147 ro_ns_desc = yaml.safe_load(descriptors.ro_update_action_text)
148 while True:
149 yield ro_ns_desc
150 if kwargs.get("delete"):
151 ro_ns_desc = yaml.safe_load(descriptors.ro_delete_action_text)
152 while True:
153 yield ro_ns_desc
154
155 ro_ns_desc = yaml.safe_load(descriptors.ro_ns_text)
156
157 # if ip address provided, replace descriptor
158 ip_addresses = getenv("OSMLCMTEST_NS_IPADDRESS", "")
159 if ip_addresses:
160 ip_addresses_list = ip_addresses.split(",")
161 for vnf in ro_ns_desc["vnfs"]:
162 if not ip_addresses_list:
163 break
164 vnf["ip_address"] = ip_addresses_list[0]
165 for vm in vnf["vms"]:
166 if not ip_addresses_list:
167 break
168 vm["ip_address"] = ip_addresses_list.pop(0)
169
170 while True:
171 yield ro_ns_desc
172 for net in ro_ns_desc["nets"]:
173 if net["status"] != "ACTIVE":
174 net["status"] = "ACTIVE"
175 break
176 else:
177 for vnf in ro_ns_desc["vnfs"]:
178 for vm in vnf["vms"]:
179 if vm["status"] != "ACTIVE":
180 vm["status"] = "ACTIVE"
181 break
182
183 def _ro_deploy(self, *args, **kwargs):
184 return {"action_id": args[1]["action_id"], "nsr_id": args[0], "status": "ok"}
185
186 def _return_uuid(self, *args, **kwargs):
187 return str(uuid4())
188
189 async def setUp(self):
190 self.mock_db()
191 self.mock_kafka()
192 self.mock_filesystem()
193 self.mock_task_registry()
194 self.mock_vca_k8s()
195 self.create_nslcm_class()
196 self.mock_logging()
197 self.mock_vca_n2vc()
198 self.mock_ro()
199
200 def mock_db(self):
201 if not getenv("OSMLCMTEST_DB_NOMOCK"):
202 # Cleanup singleton Database instance
203 Database.instance = None
204
205 self.db = Database({"database": {"driver": "memory"}}).instance.db
206 self.db.create_list("vnfds", yaml.safe_load(descriptors.db_vnfds_text))
207 self.db.create_list(
208 "vnfds_revisions", yaml.safe_load(descriptors.db_vnfds_revisions_text)
209 )
210 self.db.create_list("nsds", yaml.safe_load(descriptors.db_nsds_text))
211 self.db.create_list("nsrs", yaml.safe_load(descriptors.db_nsrs_text))
212 self.db.create_list(
213 "vim_accounts", yaml.safe_load(descriptors.db_vim_accounts_text)
214 )
215 self.db.create_list(
216 "k8sclusters", yaml.safe_load(descriptors.db_k8sclusters_text)
217 )
218 self.db.create_list(
219 "nslcmops", yaml.safe_load(descriptors.db_nslcmops_text)
220 )
221 self.db.create_list("vnfrs", yaml.safe_load(descriptors.db_vnfrs_text))
222 self.db_vim_accounts = yaml.safe_load(descriptors.db_vim_accounts_text)
223
224 def mock_kafka(self):
225 self.msg = asynctest.Mock(MsgKafka())
226
227 def mock_filesystem(self):
228 if not getenv("OSMLCMTEST_FS_NOMOCK"):
229 self.fs = asynctest.Mock(
230 Filesystem({"storage": {"driver": "local", "path": "/"}}).instance.fs
231 )
232 self.fs.get_params.return_value = {
233 "path": getenv("OSMLCMTEST_PACKAGES_PATH", "./test/temp/packages")
234 }
235 self.fs.file_open = asynctest.mock_open()
236 # self.fs.file_open.return_value.__enter__.return_value = asynctest.MagicMock() # called on a python "with"
237 # self.fs.file_open.return_value.__enter__.return_value.read.return_value = "" # empty file
238
239 def mock_task_registry(self):
240 self.lcm_tasks = asynctest.Mock(TaskRegistry())
241 self.lcm_tasks.lock_HA.return_value = True
242 self.lcm_tasks.waitfor_related_HA.return_value = None
243 self.lcm_tasks.lookfor_related.return_value = ("", [])
244
245 def mock_vca_k8s(self):
246 if not getenv("OSMLCMTEST_VCA_K8s_NOMOCK"):
247 ns.K8sJujuConnector = asynctest.MagicMock(ns.K8sJujuConnector)
248 ns.K8sHelmConnector = asynctest.MagicMock(ns.K8sHelmConnector)
249 ns.K8sHelm3Connector = asynctest.MagicMock(ns.K8sHelm3Connector)
250
251 if not getenv("OSMLCMTEST_VCA_NOMOCK"):
252 ns.N2VCJujuConnector = asynctest.MagicMock(ns.N2VCJujuConnector)
253 ns.LCMHelmConn = asynctest.MagicMock(ns.LCMHelmConn)
254
255 def create_nslcm_class(self):
256 self.my_ns = ns.NsLcm(self.msg, self.lcm_tasks, lcm_config, self.loop)
257 self.my_ns.fs = self.fs
258 self.my_ns.db = self.db
259 self.my_ns._wait_dependent_n2vc = asynctest.CoroutineMock()
260
261 def mock_logging(self):
262 if not getenv("OSMLCMTEST_LOGGING_NOMOCK"):
263 self.my_ns.logger = asynctest.Mock(self.my_ns.logger)
264
265 def mock_vca_n2vc(self):
266 if not getenv("OSMLCMTEST_VCA_NOMOCK"):
267 pub_key = getenv("OSMLCMTEST_NS_PUBKEY", "ssh-rsa test-pub-key t@osm.com")
268 # self.my_ns.n2vc = asynctest.Mock(N2VC())
269 self.my_ns.n2vc.GetPublicKey.return_value = getenv(
270 "OSMLCM_VCA_PUBKEY", "public_key"
271 )
272 # allow several versions of n2vc
273 self.my_ns.n2vc.FormatApplicationName = asynctest.Mock(
274 side_effect=self._n2vc_FormatApplicationName()
275 )
276 self.my_ns.n2vc.DeployCharms = asynctest.CoroutineMock(
277 side_effect=self._n2vc_DeployCharms
278 )
279 self.my_ns.n2vc.create_execution_environment = asynctest.CoroutineMock(
280 side_effect=self._n2vc_CreateExecutionEnvironment
281 )
282 self.my_ns.n2vc.install_configuration_sw = asynctest.CoroutineMock(
283 return_value=pub_key
284 )
285 self.my_ns.n2vc.get_ee_ssh_public__key = asynctest.CoroutineMock(
286 return_value=pub_key
287 )
288 self.my_ns.n2vc.exec_primitive = asynctest.CoroutineMock(
289 side_effect=self._return_uuid
290 )
291 self.my_ns.n2vc.exec_primitive = asynctest.CoroutineMock(
292 side_effect=self._return_uuid
293 )
294 self.my_ns.n2vc.GetPrimitiveStatus = asynctest.CoroutineMock(
295 return_value="completed"
296 )
297 self.my_ns.n2vc.GetPrimitiveOutput = asynctest.CoroutineMock(
298 return_value={"result": "ok", "pubkey": pub_key}
299 )
300 self.my_ns.n2vc.delete_execution_environment = asynctest.CoroutineMock(
301 return_value=None
302 )
303 self.my_ns.n2vc.get_public_key = asynctest.CoroutineMock(
304 return_value=getenv("OSMLCM_VCA_PUBKEY", "public_key")
305 )
306 self.my_ns.n2vc.delete_namespace = asynctest.CoroutineMock(
307 return_value=None
308 )
309 self.my_ns.n2vc.register_execution_environment = asynctest.CoroutineMock(
310 return_value="model-name.application-name.k8s"
311 )
312
313 def mock_ro(self):
314 if not getenv("OSMLCMTEST_RO_NOMOCK"):
315 self.my_ns.RO = asynctest.Mock(
316 NgRoClient(self.loop, **lcm_config.RO.to_dict())
317 )
318 # TODO first time should be empty list, following should return a dict
319 # self.my_ns.RO.get_list = asynctest.CoroutineMock(self.my_ns.RO.get_list, return_value=[])
320 self.my_ns.RO.deploy = asynctest.CoroutineMock(
321 self.my_ns.RO.deploy, side_effect=self._ro_deploy
322 )
323 # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status)
324 # self.my_ns.RO.create_action = asynctest.CoroutineMock(self.my_ns.RO.create_action,
325 # return_value={"vm-id": {"vim_result": 200,
326 # "description": "done"}})
327 self.my_ns.RO.delete = asynctest.CoroutineMock(self.my_ns.RO.delete)
328
329 # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
330 # async def test_instantiate(self):
331 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
332 # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
333 # # print("Test instantiate started")
334
335 # # delete deployed information of database
336 # if not getenv("OSMLCMTEST_DB_NOMOCK"):
337 # if self.db.get_list("nsrs")[0]["_admin"].get("deployed"):
338 # del self.db.get_list("nsrs")[0]["_admin"]["deployed"]
339 # for db_vnfr in self.db.get_list("vnfrs"):
340 # db_vnfr.pop("ip_address", None)
341 # for db_vdur in db_vnfr["vdur"]:
342 # db_vdur.pop("ip_address", None)
343 # db_vdur.pop("mac_address", None)
344 # if getenv("OSMLCMTEST_RO_VIMID"):
345 # self.db.get_list("vim_accounts")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
346 # if getenv("OSMLCMTEST_RO_VIMID"):
347 # self.db.get_list("nsrs")[0]["_admin"]["deployed"]["RO"] = getenv("OSMLCMTEST_RO_VIMID")
348
349 # await self.my_ns.instantiate(nsr_id, nslcmop_id)
350
351 # self.msg.aiowrite.assert_called_once_with("ns", "instantiated",
352 # {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
353 # "operationState": "COMPLETED"},
354 # loop=self.loop)
355 # self.lcm_tasks.lock_HA.assert_called_once_with('ns', 'nslcmops', nslcmop_id)
356 # if not getenv("OSMLCMTEST_LOGGING_NOMOCK"):
357 # self.assertTrue(self.my_ns.logger.debug.called, "Debug method not called")
358 # self.my_ns.logger.error.assert_not_called()
359 # self.my_ns.logger.exception().assert_not_called()
360
361 # if not getenv("OSMLCMTEST_DB_NOMOCK"):
362 # self.assertTrue(self.db.set_one.called, "db.set_one not called")
363 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
364 # db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
365 # self.assertEqual(db_nsr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
366 # for vnfr in db_vnfrs_list:
367 # self.assertEqual(vnfr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
368
369 # if not getenv("OSMLCMTEST_VCA_NOMOCK"):
370 # # check intial-primitives called
371 # self.assertTrue(self.my_ns.n2vc.exec_primitive.called,
372 # "Exec primitive not called for initial config primitive")
373 # for _call in self.my_ns.n2vc.exec_primitive.call_args_list:
374 # self.assertIn(_call[1]["primitive_name"], ("config", "touch"),
375 # "called exec primitive with a primitive different than config or touch")
376
377 # # TODO add more checks of called methods
378 # # TODO add a terminate
379
380 # async def test_instantiate_ee_list(self):
381 # # Using modern IM where configuration is in the new format of execution_environment_list
382 # ee_descriptor_id = "charm_simple"
383 # non_used_initial_primitive = {
384 # "name": "not_to_be_called",
385 # "seq": 3,
386 # "execution-environment-ref": "not_used_ee"
387 # }
388 # ee_list = [
389 # {
390 # "id": ee_descriptor_id,
391 # "juju": {"charm": "simple"},
392
393 # },
394 # ]
395
396 # self.db.set_one(
397 # "vnfds",
398 # q_filter={"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"},
399 # update_dict={"vnf-configuration.0.execution-environment-list": ee_list,
400 # "vnf-configuration.0.initial-config-primitive.0.execution-environment-ref": ee_descriptor_id,
401 # "vnf-configuration.0.initial-config-primitive.1.execution-environment-ref": ee_descriptor_id,
402 # "vnf-configuration.0.initial-config-primitive.2": non_used_initial_primitive,
403 # "vnf-configuration.0.config-primitive.0.execution-environment-ref": ee_descriptor_id,
404 # "vnf-configuration.0.config-primitive.0.execution-environment-primitive": "touch_charm",
405 # },
406 # unset={"vnf-configuration.juju": None})
407 # await self.test_instantiate()
408 # # this will check that the initial-congig-primitive 'not_to_be_called' is not called
409
410
411 class TestMyNS(TestBaseNS):
412 @asynctest.fail_on(active_handles=True)
413 async def test_start_stop_rebuild_pass(self):
414 nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
415 nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"]
416 vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
417 additional_param = {"count-index": "0"}
418 operation_type = "start"
419 await self.my_ns.rebuild_start_stop(
420 nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
421 )
422 expected_value = "COMPLETED"
423 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
424 "operationState"
425 )
426 self.assertEqual(return_value, expected_value)
427
428 @asynctest.fail_on(active_handles=True)
429 async def test_start_stop_rebuild_fail(self):
430 nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
431 nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"]
432 vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
433 additional_param = {"count-index": "0"}
434 operation_type = "stop"
435 await self.my_ns.rebuild_start_stop(
436 nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
437 )
438 expected_value = "Error"
439 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
440 "operationState"
441 )
442 self.assertEqual(return_value, expected_value)
443
444 # Test scale() and related methods
445 @asynctest.fail_on(active_handles=True) # all async tasks must be completed
446 async def test_scale(self):
447 # print("Test scale started")
448
449 # TODO: Add more higher-lever tests here, for example:
450 # scale-out/scale-in operations with success/error result
451
452 # Test scale() with missing 'scaleVnfData', should return operationState = 'FAILED'
453 nsr_id = descriptors.test_ids["TEST-A"]["ns"]
454 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
455 await self.my_ns.scale(nsr_id, nslcmop_id)
456 expected_value = "FAILED"
457 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
458 "operationState"
459 )
460 self.assertEqual(return_value, expected_value)
461 # print("scale_result: {}".format(self.db.get_one("nslcmops", {"_id": nslcmop_id}).get("detailed-status")))
462
463 # Test scale() for native kdu
464 # this also includes testing _scale_kdu()
465 nsr_id = descriptors.test_ids["TEST-NATIVE-KDU"]["ns"]
466 nslcmop_id = descriptors.test_ids["TEST-NATIVE-KDU"]["instantiate"]
467
468 self.my_ns.k8sclusterjuju.scale = asynctest.mock.CoroutineMock()
469 self.my_ns.k8sclusterjuju.exec_primitive = asynctest.mock.CoroutineMock()
470 self.my_ns.k8sclusterjuju.get_scale_count = asynctest.mock.CoroutineMock(
471 return_value=1
472 )
473 await self.my_ns.scale(nsr_id, nslcmop_id)
474 expected_value = "COMPLETED"
475 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
476 "operationState"
477 )
478 self.assertEqual(return_value, expected_value)
479 self.my_ns.k8sclusterjuju.scale.assert_called_once()
480
481 # Test scale() for native kdu with 2 resource
482 nsr_id = descriptors.test_ids["TEST-NATIVE-KDU-2"]["ns"]
483 nslcmop_id = descriptors.test_ids["TEST-NATIVE-KDU-2"]["instantiate"]
484
485 self.my_ns.k8sclusterjuju.get_scale_count.return_value = 2
486 await self.my_ns.scale(nsr_id, nslcmop_id)
487 expected_value = "COMPLETED"
488 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
489 "operationState"
490 )
491 self.assertEqual(return_value, expected_value)
492 self.my_ns.k8sclusterjuju.scale.assert_called()
493
494 async def test_vca_status_refresh(self):
495 nsr_id = descriptors.test_ids["TEST-A"]["ns"]
496 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
497 await self.my_ns.vca_status_refresh(nsr_id, nslcmop_id)
498 expected_value = dict()
499 return_value = dict()
500 vnf_descriptors = self.db.get_list("vnfds")
501 for i, _ in enumerate(vnf_descriptors):
502 for j, value in enumerate(vnf_descriptors[i]["df"]):
503 if "lcm-operations-configuration" in vnf_descriptors[i]["df"][j]:
504 if (
505 "day1-2"
506 in value["lcm-operations-configuration"][
507 "operate-vnf-op-config"
508 ]
509 ):
510 for k, v in enumerate(
511 value["lcm-operations-configuration"][
512 "operate-vnf-op-config"
513 ]["day1-2"]
514 ):
515 if (
516 v.get("execution-environment-list")
517 and "juju" in v["execution-environment-list"][0]
518 ):
519 expected_value = self.db.get_list("nsrs")[i][
520 "vcaStatus"
521 ]
522 await self.my_ns._on_update_n2vc_db(
523 "nsrs",
524 {"_id": nsr_id},
525 "_admin.deployed.VCA.{}".format(k),
526 {},
527 )
528 return_value = self.db.get_list("nsrs")[i]["vcaStatus"]
529 self.assertEqual(return_value, expected_value)
530
531 # Test _retry_or_skip_suboperation()
532 # Expected result:
533 # - if a suboperation's 'operationState' is marked as 'COMPLETED', SUBOPERATION_STATUS_SKIP is expected
534 # - if marked as anything but 'COMPLETED', the suboperation index is expected
535 def test_scale_retry_or_skip_suboperation(self):
536 # Load an alternative 'nslcmops' YAML for this test
537 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
538 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
539 op_index = 2
540 # Test when 'operationState' is 'COMPLETED'
541 db_nslcmop["_admin"]["operations"][op_index]["operationState"] = "COMPLETED"
542 return_value = self.my_ns._retry_or_skip_suboperation(db_nslcmop, op_index)
543 expected_value = self.my_ns.SUBOPERATION_STATUS_SKIP
544 self.assertEqual(return_value, expected_value)
545 # Test when 'operationState' is not 'COMPLETED'
546 db_nslcmop["_admin"]["operations"][op_index]["operationState"] = None
547 return_value = self.my_ns._retry_or_skip_suboperation(db_nslcmop, op_index)
548 expected_value = op_index
549 self.assertEqual(return_value, expected_value)
550
551 # Test _find_suboperation()
552 # Expected result: index of the found sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if not found
553 def test_scale_find_suboperation(self):
554 # Load an alternative 'nslcmops' YAML for this test
555 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
556 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
557 # Find this sub-operation
558 op_index = 2
559 vnf_index = db_nslcmop["_admin"]["operations"][op_index]["member_vnf_index"]
560 primitive = db_nslcmop["_admin"]["operations"][op_index]["primitive"]
561 primitive_params = db_nslcmop["_admin"]["operations"][op_index][
562 "primitive_params"
563 ]
564 match = {
565 "member_vnf_index": vnf_index,
566 "primitive": primitive,
567 "primitive_params": primitive_params,
568 }
569 found_op_index = self.my_ns._find_suboperation(db_nslcmop, match)
570 self.assertEqual(found_op_index, op_index)
571 # Test with not-matching params
572 match = {
573 "member_vnf_index": vnf_index,
574 "primitive": "",
575 "primitive_params": primitive_params,
576 }
577 found_op_index = self.my_ns._find_suboperation(db_nslcmop, match)
578 self.assertEqual(found_op_index, self.my_ns.SUBOPERATION_STATUS_NOT_FOUND)
579 # Test with None
580 match = None
581 found_op_index = self.my_ns._find_suboperation(db_nslcmop, match)
582 self.assertEqual(found_op_index, self.my_ns.SUBOPERATION_STATUS_NOT_FOUND)
583
584 # Test _update_suboperation_status()
585 def test_scale_update_suboperation_status(self):
586 self.db.set_one = asynctest.Mock()
587 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
588 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
589 op_index = 0
590 # Force the initial values to be distinct from the updated ones
591 q_filter = {"_id": db_nslcmop["_id"]}
592 # Test to change 'operationState' and 'detailed-status'
593 operationState = "COMPLETED"
594 detailed_status = "Done"
595 expected_update_dict = {
596 "_admin.operations.0.operationState": operationState,
597 "_admin.operations.0.detailed-status": detailed_status,
598 }
599 self.my_ns._update_suboperation_status(
600 db_nslcmop, op_index, operationState, detailed_status
601 )
602 self.db.set_one.assert_called_once_with(
603 "nslcmops",
604 q_filter=q_filter,
605 update_dict=expected_update_dict,
606 fail_on_empty=False,
607 )
608
609 def test_scale_add_suboperation(self):
610 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
611 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
612 vnf_index = "1"
613 num_ops_before = len(db_nslcmop.get("_admin", {}).get("operations", [])) - 1
614 vdu_id = None
615 vdu_count_index = None
616 vdu_name = None
617 primitive = "touch"
618 mapped_primitive_params = {
619 "parameter": [
620 {
621 "data-type": "STRING",
622 "name": "filename",
623 "default-value": "<touch_filename2>",
624 }
625 ],
626 "name": "touch",
627 }
628 operationState = "PROCESSING"
629 detailed_status = "In progress"
630 operationType = "PRE-SCALE"
631 # Add a 'pre-scale' suboperation
632 op_index_after = self.my_ns._add_suboperation(
633 db_nslcmop,
634 vnf_index,
635 vdu_id,
636 vdu_count_index,
637 vdu_name,
638 primitive,
639 mapped_primitive_params,
640 operationState,
641 detailed_status,
642 operationType,
643 )
644 self.assertEqual(op_index_after, num_ops_before + 1)
645
646 # Delete all suboperations and add the same operation again
647 del db_nslcmop["_admin"]["operations"]
648 op_index_zero = self.my_ns._add_suboperation(
649 db_nslcmop,
650 vnf_index,
651 vdu_id,
652 vdu_count_index,
653 vdu_name,
654 primitive,
655 mapped_primitive_params,
656 operationState,
657 detailed_status,
658 operationType,
659 )
660 self.assertEqual(op_index_zero, 0)
661
662 # Add a 'RO' suboperation
663 RO_nsr_id = "1234567890"
664 RO_scaling_info = [
665 {
666 "type": "create",
667 "count": 1,
668 "member-vnf-index": "1",
669 "osm_vdu_id": "dataVM",
670 }
671 ]
672 op_index = self.my_ns._add_suboperation(
673 db_nslcmop,
674 vnf_index,
675 vdu_id,
676 vdu_count_index,
677 vdu_name,
678 primitive,
679 mapped_primitive_params,
680 operationState,
681 detailed_status,
682 operationType,
683 RO_nsr_id,
684 RO_scaling_info,
685 )
686 db_RO_nsr_id = db_nslcmop["_admin"]["operations"][op_index]["RO_nsr_id"]
687 self.assertEqual(op_index, 1)
688 self.assertEqual(RO_nsr_id, db_RO_nsr_id)
689
690 # Try to add an invalid suboperation, should return SUBOPERATION_STATUS_NOT_FOUND
691 op_index_invalid = self.my_ns._add_suboperation(
692 None, None, None, None, None, None, None, None, None, None, None
693 )
694 self.assertEqual(op_index_invalid, self.my_ns.SUBOPERATION_STATUS_NOT_FOUND)
695
696 # Test _check_or_add_scale_suboperation() and _check_or_add_scale_suboperation_RO()
697 # check the possible return values:
698 # - SUBOPERATION_STATUS_NEW: This is a new sub-operation
699 # - op_index (non-negative number): This is an existing sub-operation, operationState != 'COMPLETED'
700 # - SUBOPERATION_STATUS_SKIP: This is an existing sub-operation, operationState == 'COMPLETED'
701 def test_scale_check_or_add_scale_suboperation(self):
702 nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
703 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
704 operationType = "PRE-SCALE"
705 vnf_index = "1"
706 primitive = "touch"
707 primitive_params = {
708 "parameter": [
709 {
710 "data-type": "STRING",
711 "name": "filename",
712 "default-value": "<touch_filename2>",
713 }
714 ],
715 "name": "touch",
716 }
717
718 # Delete all sub-operations to be sure this is a new sub-operation
719 del db_nslcmop["_admin"]["operations"]
720
721 # Add a new sub-operation
722 # For new sub-operations, operationState is set to 'PROCESSING' by default
723 op_index_new = self.my_ns._check_or_add_scale_suboperation(
724 db_nslcmop, vnf_index, primitive, primitive_params, operationType
725 )
726 self.assertEqual(op_index_new, self.my_ns.SUBOPERATION_STATUS_NEW)
727
728 # Use the same parameters again to match the already added sub-operation
729 # which has status 'PROCESSING' (!= 'COMPLETED') by default
730 # The expected return value is a non-negative number
731 op_index_existing = self.my_ns._check_or_add_scale_suboperation(
732 db_nslcmop, vnf_index, primitive, primitive_params, operationType
733 )
734 self.assertTrue(op_index_existing >= 0)
735
736 # Change operationState 'manually' for this sub-operation
737 db_nslcmop["_admin"]["operations"][op_index_existing][
738 "operationState"
739 ] = "COMPLETED"
740 # Then use the same parameters again to match the already added sub-operation,
741 # which now has status 'COMPLETED'
742 # The expected return value is SUBOPERATION_STATUS_SKIP
743 op_index_skip = self.my_ns._check_or_add_scale_suboperation(
744 db_nslcmop, vnf_index, primitive, primitive_params, operationType
745 )
746 self.assertEqual(op_index_skip, self.my_ns.SUBOPERATION_STATUS_SKIP)
747
748 # RO sub-operation test:
749 # Repeat tests for the very similar _check_or_add_scale_suboperation_RO(),
750 RO_nsr_id = "1234567890"
751 RO_scaling_info = [
752 {
753 "type": "create",
754 "count": 1,
755 "member-vnf-index": "1",
756 "osm_vdu_id": "dataVM",
757 }
758 ]
759 op_index_new_RO = self.my_ns._check_or_add_scale_suboperation(
760 db_nslcmop, vnf_index, None, None, "SCALE-RO", RO_nsr_id, RO_scaling_info
761 )
762 self.assertEqual(op_index_new_RO, self.my_ns.SUBOPERATION_STATUS_NEW)
763
764 # Use the same parameters again to match the already added RO sub-operation
765 op_index_existing_RO = self.my_ns._check_or_add_scale_suboperation(
766 db_nslcmop, vnf_index, None, None, "SCALE-RO", RO_nsr_id, RO_scaling_info
767 )
768 self.assertTrue(op_index_existing_RO >= 0)
769
770 # Change operationState 'manually' for this RO sub-operation
771 db_nslcmop["_admin"]["operations"][op_index_existing_RO][
772 "operationState"
773 ] = "COMPLETED"
774 # Then use the same parameters again to match the already added sub-operation,
775 # which now has status 'COMPLETED'
776 # The expected return value is SUBOPERATION_STATUS_SKIP
777 op_index_skip_RO = self.my_ns._check_or_add_scale_suboperation(
778 db_nslcmop, vnf_index, None, None, "SCALE-RO", RO_nsr_id, RO_scaling_info
779 )
780 self.assertEqual(op_index_skip_RO, self.my_ns.SUBOPERATION_STATUS_SKIP)
781
782 async def test_deploy_kdus(self):
783 nsr_id = descriptors.test_ids["TEST-KDU"]["ns"]
784 nslcmop_id = descriptors.test_ids["TEST-KDU"]["instantiate"]
785 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
786 db_vnfr = self.db.get_one(
787 "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "multikdu"}
788 )
789 db_vnfrs = {"multikdu": db_vnfr}
790 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
791 db_vnfds = [db_vnfd]
792 task_register = {}
793 logging_text = "KDU"
794 self.my_ns.k8sclusterhelm3.generate_kdu_instance_name = asynctest.mock.Mock()
795 self.my_ns.k8sclusterhelm3.generate_kdu_instance_name.return_value = "k8s_id"
796 self.my_ns.k8sclusterhelm3.install = asynctest.CoroutineMock()
797 self.my_ns.k8sclusterhelm3.synchronize_repos = asynctest.CoroutineMock(
798 return_value=("", "")
799 )
800 self.my_ns.k8sclusterhelm3.get_services = asynctest.CoroutineMock(
801 return_value=([])
802 )
803 await self.my_ns.deploy_kdus(
804 logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register
805 )
806 await asyncio.wait(list(task_register.keys()), timeout=100)
807 db_nsr = self.db.get_list("nsrs")[1]
808 self.assertIn(
809 "K8s",
810 db_nsr["_admin"]["deployed"],
811 "K8s entry not created at '_admin.deployed'",
812 )
813 self.assertIsInstance(
814 db_nsr["_admin"]["deployed"]["K8s"], list, "K8s entry is not of type list"
815 )
816 self.assertEqual(
817 len(db_nsr["_admin"]["deployed"]["K8s"]), 2, "K8s entry is not of type list"
818 )
819 k8s_instace_info = {
820 "kdu-instance": "k8s_id",
821 "k8scluster-uuid": "73d96432-d692-40d2-8440-e0c73aee209c",
822 "k8scluster-type": "helm-chart-v3",
823 "kdu-name": "ldap",
824 "member-vnf-index": "multikdu",
825 "namespace": None,
826 "kdu-deployment-name": None,
827 }
828
829 nsr_result = copy.deepcopy(db_nsr["_admin"]["deployed"]["K8s"][0])
830 nsr_kdu_model_result = nsr_result.pop("kdu-model")
831 expected_kdu_model = "stable/openldap:1.2.1"
832 self.assertEqual(nsr_result, k8s_instace_info)
833 self.assertTrue(
834 nsr_kdu_model_result in expected_kdu_model
835 or expected_kdu_model in nsr_kdu_model_result
836 )
837 nsr_result = copy.deepcopy(db_nsr["_admin"]["deployed"]["K8s"][1])
838 nsr_kdu_model_result = nsr_result.pop("kdu-model")
839 k8s_instace_info["kdu-name"] = "mongo"
840 expected_kdu_model = "stable/mongodb"
841 self.assertEqual(nsr_result, k8s_instace_info)
842 self.assertTrue(
843 nsr_kdu_model_result in expected_kdu_model
844 or expected_kdu_model in nsr_kdu_model_result
845 )
846
847 # Test remove_vnf() and related methods
848 @asynctest.fail_on(active_handles=True) # all async tasks must be completed
849 async def test_remove_vnf(self):
850 # Test REMOVE_VNF
851 nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
852 nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
853 vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
854 mock_wait_ng_ro = asynctest.CoroutineMock()
855 with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
856 await self.my_ns.update(nsr_id, nslcmop_id)
857 expected_value = "COMPLETED"
858 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
859 "operationState"
860 )
861 self.assertEqual(return_value, expected_value)
862 with self.assertRaises(Exception) as context:
863 self.db.get_one("vnfrs", {"_id": vnf_instance_id})
864 self.assertTrue(
865 "database exception Not found entry with filter"
866 in str(context.exception)
867 )
868
869 # test vertical scale executes sucessfully
870 # @patch("osm_lcm.ng_ro.status.response")
871 @asynctest.fail_on(active_handles=True)
872 async def test_vertical_scaling(self):
873 nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
874 nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"]
875
876 # calling the vertical scale fucntion
877 # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
878 mock_wait_ng_ro = asynctest.CoroutineMock()
879 with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
880 await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
881 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
882 "operationState"
883 )
884 expected_value = "COMPLETED"
885 self.assertEqual(return_value, expected_value)
886
887 # test vertical scale executes fail
888 @asynctest.fail_on(active_handles=True)
889 async def test_vertical_scaling_fail(self):
890 # get th nsr nad nslcmops id from descriptors
891 nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
892 nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"]
893
894 # calling the vertical scale fucntion
895 await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
896 return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
897 "operationState"
898 )
899 expected_value = "FAILED"
900 self.assertEqual(return_value, expected_value)
901
902 # async def test_instantiate_pdu(self):
903 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
904 # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
905 # # Modify vnfd/vnfr to change KDU for PDU. Adding keys that NBI will already set
906 # self.db.set_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "1"},
907 # update_dict={"ip-address": "10.205.1.46",
908 # "vdur.0.pdu-id": "53e1ec21-2464-451e-a8dc-6e311d45b2c8",
909 # "vdur.0.pdu-type": "PDU-TYPE-1",
910 # "vdur.0.ip-address": "10.205.1.46",
911 # },
912 # unset={"vdur.status": None})
913 # self.db.set_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "2"},
914 # update_dict={"ip-address": "10.205.1.47",
915 # "vdur.0.pdu-id": "53e1ec21-2464-451e-a8dc-6e311d45b2c8",
916 # "vdur.0.pdu-type": "PDU-TYPE-1",
917 # "vdur.0.ip-address": "10.205.1.47",
918 # },
919 # unset={"vdur.status": None})
920
921 # await self.my_ns.instantiate(nsr_id, nslcmop_id)
922 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
923 # self.assertEqual(db_nsr.get("nsState"), "READY", str(db_nsr.get("errorDescription ")))
924 # self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'")
925 # self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None")
926 # self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None")
927 # self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None")
928
929 # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
930 # async def test_terminate_without_configuration(self):
931 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
932 # nslcmop_id = descriptors.test_ids["TEST-A"]["terminate"]
933 # # set instantiation task as completed
934 # self.db.set_list("nslcmops", {"nsInstanceId": nsr_id, "_id.ne": nslcmop_id},
935 # update_dict={"operationState": "COMPLETED"})
936 # self.db.set_one("nsrs", {"_id": nsr_id},
937 # update_dict={"_admin.deployed.VCA.0": None, "_admin.deployed.VCA.1": None})
938
939 # await self.my_ns.terminate(nsr_id, nslcmop_id)
940 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
941 # self.assertEqual(db_nslcmop.get("operationState"), 'COMPLETED', db_nslcmop.get("detailed-status"))
942 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
943 # self.assertEqual(db_nsr.get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
944 # self.assertEqual(db_nsr["_admin"].get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
945 # self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'")
946 # self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None")
947 # self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None")
948 # self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None")
949 # db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
950 # for vnfr in db_vnfrs_list:
951 # self.assertEqual(vnfr["_admin"].get("nsState"), "NOT_INSTANTIATED", "Not instantiated")
952
953 # @asynctest.fail_on(active_handles=True) # all async tasks must be completed
954 # async def test_terminate_primitive(self):
955 # nsr_id = descriptors.test_ids["TEST-A"]["ns"]
956 # nslcmop_id = descriptors.test_ids["TEST-A"]["terminate"]
957 # # set instantiation task as completed
958 # self.db.set_list("nslcmops", {"nsInstanceId": nsr_id, "_id.ne": nslcmop_id},
959 # update_dict={"operationState": "COMPLETED"})
960
961 # # modify vnfd descriptor to include terminate_primitive
962 # terminate_primitive = [{
963 # "name": "touch",
964 # "parameter": [{"name": "filename", "value": "terminate_filename"}],
965 # "seq": '1'
966 # }]
967 # db_vnfr = self.db.get_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "1"})
968 # self.db.set_one("vnfds", {"_id": db_vnfr["vnfd-id"]},
969 # {"vnf-configuration.0.terminate-config-primitive": terminate_primitive})
970
971 # await self.my_ns.terminate(nsr_id, nslcmop_id)
972 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
973 # self.assertEqual(db_nslcmop.get("operationState"), 'COMPLETED', db_nslcmop.get("detailed-status"))
974 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
975 # self.assertEqual(db_nsr.get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
976 # self.assertEqual(db_nsr["_admin"].get("nsState"), "NOT_INSTANTIATED", str(db_nsr.get("errorDescription ")))
977 # self.assertEqual(db_nsr.get("currentOperation"), "IDLE", "currentOperation different than 'IDLE'")
978 # self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None")
979 # self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None")
980 # self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None")
981
982 @patch("osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed")
983 @patch(
984 "osm_lcm.ns.NsLcm._ns_charm_upgrade", new_callable=asynctest.Mock(autospec=True)
985 )
986 @patch("osm_lcm.data_utils.vnfd.find_software_version")
987 @patch("osm_lcm.lcm_utils.check_juju_bundle_existence")
988 async def test_update_change_vnfpkg_sw_version_not_changed(
989 self,
990 mock_juju_bundle,
991 mock_software_version,
992 mock_charm_upgrade,
993 mock_charm_hash,
994 ):
995 """Update type: CHANGE_VNFPKG, latest_vnfd revision changed,
996 Charm package changed, sw-version is not changed"""
997 self.db.set_one(
998 "vnfds",
999 q_filter={"_id": vnfd_id},
1000 update_dict={"_admin.revision": 3, "kdu": []},
1001 )
1002
1003 self.db.set_one(
1004 "vnfds_revisions",
1005 q_filter={"_id": vnfd_id + ":1"},
1006 update_dict={"_admin.revision": 1, "kdu": []},
1007 )
1008
1009 self.db.set_one("vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1})
1010
1011 mock_charm_hash.return_value = True
1012 mock_software_version.side_effect = ["1.0", "1.0"]
1013
1014 task = asyncio.Future()
1015 task.set_result(("COMPLETED", "some_output"))
1016 mock_charm_upgrade.return_value = task
1017
1018 instance = self.my_ns
1019 fs = deepcopy(update_fs)
1020 instance.fs = fs
1021
1022 expected_operation_state = "COMPLETED"
1023 expected_operation_error = ""
1024 expected_vnfr_revision = 3
1025 expected_ns_state = "INSTANTIATED"
1026 expected_ns_operational_state = "running"
1027
1028 await instance.update(nsr_id, nslcmop_id)
1029 return_operation_state = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1030 "operationState"
1031 )
1032 return_operation_error = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1033 "errorMessage"
1034 )
1035 return_ns_operational_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1036 "operational-status"
1037 )
1038 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1039 "revision"
1040 )
1041 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get("nsState")
1042 mock_charm_hash.assert_called_with(
1043 f"{vnfd_id}:1/hackfest_3charmed_vnfd/charms/simple",
1044 f"{vnfd_id}:3/hackfest_3charmed_vnfd/charms/simple",
1045 )
1046 self.assertEqual(fs.sync.call_count, 2)
1047 self.assertEqual(return_ns_state, expected_ns_state)
1048 self.assertEqual(return_operation_state, expected_operation_state)
1049 self.assertEqual(return_operation_error, expected_operation_error)
1050 self.assertEqual(return_ns_operational_state, expected_ns_operational_state)
1051 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1052
1053 @patch("osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed")
1054 @patch(
1055 "osm_lcm.ns.NsLcm._ns_charm_upgrade", new_callable=asynctest.Mock(autospec=True)
1056 )
1057 @patch("osm_lcm.data_utils.vnfd.find_software_version")
1058 @patch("osm_lcm.lcm_utils.check_juju_bundle_existence")
1059 async def test_update_change_vnfpkg_vnfd_revision_not_changed(
1060 self,
1061 mock_juju_bundle,
1062 mock_software_version,
1063 mock_charm_upgrade,
1064 mock_charm_hash,
1065 ):
1066 """Update type: CHANGE_VNFPKG, latest_vnfd revision not changed"""
1067 self.db.set_one(
1068 "vnfds", q_filter={"_id": vnfd_id}, update_dict={"_admin.revision": 1}
1069 )
1070 self.db.set_one("vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1})
1071
1072 mock_charm_hash.return_value = True
1073
1074 task = asyncio.Future()
1075 task.set_result(("COMPLETED", "some_output"))
1076 mock_charm_upgrade.return_value = task
1077
1078 instance = self.my_ns
1079
1080 expected_operation_state = "COMPLETED"
1081 expected_operation_error = ""
1082 expected_vnfr_revision = 1
1083 expected_ns_state = "INSTANTIATED"
1084 expected_ns_operational_state = "running"
1085
1086 await instance.update(nsr_id, nslcmop_id)
1087 return_operation_state = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1088 "operationState"
1089 )
1090 return_operation_error = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1091 "errorMessage"
1092 )
1093 return_ns_operational_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1094 "operational-status"
1095 )
1096 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get("nsState")
1097 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1098 "revision"
1099 )
1100 mock_charm_hash.assert_not_called()
1101 mock_software_version.assert_not_called()
1102 mock_juju_bundle.assert_not_called()
1103 mock_charm_upgrade.assert_not_called()
1104 update_fs.sync.assert_not_called()
1105 self.assertEqual(return_ns_state, expected_ns_state)
1106 self.assertEqual(return_operation_state, expected_operation_state)
1107 self.assertEqual(return_operation_error, expected_operation_error)
1108 self.assertEqual(return_ns_operational_state, expected_ns_operational_state)
1109 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1110
1111 @patch("osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed")
1112 @patch(
1113 "osm_lcm.ns.NsLcm._ns_charm_upgrade", new_callable=asynctest.Mock(autospec=True)
1114 )
1115 @patch("osm_lcm.data_utils.vnfd.find_software_version")
1116 @patch("osm_lcm.lcm_utils.check_juju_bundle_existence")
1117 async def test_update_change_vnfpkg_charm_is_not_changed(
1118 self,
1119 mock_juju_bundle,
1120 mock_software_version,
1121 mock_charm_upgrade,
1122 mock_charm_hash,
1123 ):
1124 """Update type: CHANGE_VNFPKG, latest_vnfd revision changed
1125 Charm package is not changed, sw-version is not changed"""
1126 self.db.set_one(
1127 "vnfds",
1128 q_filter={"_id": vnfd_id},
1129 update_dict={"_admin.revision": 3, "kdu": []},
1130 )
1131 self.db.set_one(
1132 "vnfds_revisions",
1133 q_filter={"_id": vnfd_id + ":1"},
1134 update_dict={"_admin.revision": 1, "kdu": []},
1135 )
1136 self.db.set_one("vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1})
1137
1138 mock_charm_hash.return_value = False
1139 mock_software_version.side_effect = ["1.0", "1.0"]
1140
1141 instance = self.my_ns
1142 fs = deepcopy(update_fs)
1143 instance.fs = fs
1144 expected_operation_state = "COMPLETED"
1145 expected_operation_error = ""
1146 expected_vnfr_revision = 3
1147 expected_ns_state = "INSTANTIATED"
1148 expected_ns_operational_state = "running"
1149
1150 await instance.update(nsr_id, nslcmop_id)
1151 return_operation_state = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1152 "operationState"
1153 )
1154 return_operation_error = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1155 "errorMessage"
1156 )
1157 return_ns_operational_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1158 "operational-status"
1159 )
1160 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1161 "revision"
1162 )
1163 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get("nsState")
1164 mock_charm_hash.assert_called_with(
1165 f"{vnfd_id}:1/hackfest_3charmed_vnfd/charms/simple",
1166 f"{vnfd_id}:3/hackfest_3charmed_vnfd/charms/simple",
1167 )
1168 self.assertEqual(fs.sync.call_count, 2)
1169 self.assertEqual(mock_charm_hash.call_count, 1)
1170 mock_juju_bundle.assert_not_called()
1171 mock_charm_upgrade.assert_not_called()
1172 self.assertEqual(return_ns_state, expected_ns_state)
1173 self.assertEqual(return_operation_state, expected_operation_state)
1174 self.assertEqual(return_operation_error, expected_operation_error)
1175 self.assertEqual(return_ns_operational_state, expected_ns_operational_state)
1176 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1177
1178 @patch("osm_lcm.lcm_utils.check_juju_bundle_existence")
1179 @patch("osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed")
1180 @patch(
1181 "osm_lcm.ns.NsLcm._ns_charm_upgrade", new_callable=asynctest.Mock(autospec=True)
1182 )
1183 @patch("osm_lcm.lcm_utils.get_charm_artifact_path")
1184 async def test_update_change_vnfpkg_sw_version_changed(
1185 self, mock_charm_artifact, mock_charm_upgrade, mock_charm_hash, mock_juju_bundle
1186 ):
1187 """Update type: CHANGE_VNFPKG, latest_vnfd revision changed
1188 Charm package exists, sw-version changed."""
1189 self.db.set_one(
1190 "vnfds",
1191 q_filter={"_id": vnfd_id},
1192 update_dict={"_admin.revision": 3, "software-version": "3.0", "kdu": []},
1193 )
1194 self.db.set_one(
1195 "vnfds_revisions",
1196 q_filter={"_id": vnfd_id + ":1"},
1197 update_dict={"_admin.revision": 1, "kdu": []},
1198 )
1199 self.db.set_one("vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1})
1200 mock_charm_hash.return_value = False
1201
1202 mock_charm_artifact.side_effect = [
1203 f"{vnfd_id}:1/hackfest_3charmed_vnfd/charms/simple",
1204 f"{vnfd_id}:3/hackfest_3charmed_vnfd/charms/simple",
1205 ]
1206
1207 instance = self.my_ns
1208 fs = deepcopy(update_fs)
1209 instance.fs = fs
1210 expected_operation_state = "FAILED"
1211 expected_operation_error = "FAILED Checking if existing VNF has charm: Software version change is not supported as VNF instance 6421c7c9-d865-4fb4-9a13-d4275d243e01 has charm."
1212 expected_vnfr_revision = 1
1213 expected_ns_state = "INSTANTIATED"
1214 expected_ns_operational_state = "running"
1215
1216 await instance.update(nsr_id, nslcmop_id)
1217 return_operation_state = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1218 "operationState"
1219 )
1220 return_operation_error = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1221 "errorMessage"
1222 )
1223 return_ns_operational_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1224 "operational-status"
1225 )
1226 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1227 "revision"
1228 )
1229 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get("nsState")
1230 self.assertEqual(fs.sync.call_count, 2)
1231 mock_charm_hash.assert_not_called()
1232 mock_juju_bundle.assert_not_called()
1233 mock_charm_upgrade.assert_not_called()
1234 self.assertEqual(return_ns_state, expected_ns_state)
1235 self.assertEqual(return_operation_state, expected_operation_state)
1236 self.assertEqual(return_operation_error, expected_operation_error)
1237 self.assertEqual(return_ns_operational_state, expected_ns_operational_state)
1238 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1239
1240 @patch("osm_lcm.lcm_utils.check_juju_bundle_existence")
1241 @patch("osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed")
1242 @patch(
1243 "osm_lcm.ns.NsLcm._ns_charm_upgrade", new_callable=asynctest.Mock(autospec=True)
1244 )
1245 @patch("osm_lcm.data_utils.vnfd.find_software_version")
1246 async def test_update_change_vnfpkg_juju_bundle_exists(
1247 self,
1248 mock_software_version,
1249 mock_charm_upgrade,
1250 mock_charm_hash,
1251 mock_juju_bundle,
1252 ):
1253 """Update type: CHANGE_VNFPKG, latest_vnfd revision changed
1254 Charm package exists, sw-version not changed, juju-bundle exists"""
1255 # Upgrade is not allowed with juju bundles, this will cause TypeError
1256 self.db.set_one(
1257 "vnfds",
1258 q_filter={"_id": vnfd_id},
1259 update_dict={
1260 "_admin.revision": 5,
1261 "software-version": "1.0",
1262 "kdu": [{"kdu_name": "native-kdu", "juju-bundle": "stable/native-kdu"}],
1263 },
1264 )
1265 self.db.set_one(
1266 "vnfds_revisions",
1267 q_filter={"_id": vnfd_id + ":1"},
1268 update_dict={
1269 "_admin.revision": 1,
1270 "software-version": "1.0",
1271 "kdu": [{"kdu_name": "native-kdu", "juju-bundle": "stable/native-kdu"}],
1272 },
1273 )
1274 self.db.set_one(
1275 "nsrs",
1276 q_filter={"_id": nsr_id},
1277 update_dict={"_admin.deployed.VCA.0.kdu_name": "native-kdu"},
1278 )
1279 self.db.set_one("vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1})
1280
1281 mock_charm_hash.side_effect = [True]
1282 mock_software_version.side_effect = ["1.0", "1.0"]
1283 mock_juju_bundle.return_value = True
1284 instance = self.my_ns
1285 fs = deepcopy(update_fs)
1286 instance.fs = fs
1287
1288 expected_vnfr_revision = 1
1289 expected_ns_state = "INSTANTIATED"
1290 expected_ns_operational_state = "running"
1291
1292 await instance.update(nsr_id, nslcmop_id)
1293 return_ns_operational_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1294 "operational-status"
1295 )
1296 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1297 "revision"
1298 )
1299 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get("nsState")
1300 self.assertEqual(fs.sync.call_count, 2)
1301 mock_charm_upgrade.assert_not_called()
1302 mock_charm_hash.assert_not_called()
1303 self.assertEqual(return_ns_state, expected_ns_state)
1304 self.assertEqual(return_ns_operational_state, expected_ns_operational_state)
1305 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1306
1307 @patch("osm_lcm.lcm_utils.LcmBase.check_charm_hash_changed")
1308 @patch(
1309 "osm_lcm.ns.NsLcm._ns_charm_upgrade", new_callable=asynctest.Mock(autospec=True)
1310 )
1311 async def test_update_change_vnfpkg_charm_upgrade_failed(
1312 self, mock_charm_upgrade, mock_charm_hash
1313 ):
1314 """ "Update type: CHANGE_VNFPKG, latest_vnfd revision changed"
1315 Charm package exists, sw-version not changed, charm-upgrade failed"""
1316 self.db.set_one(
1317 "vnfds",
1318 q_filter={"_id": vnfd_id},
1319 update_dict={"_admin.revision": 3, "software-version": "1.0", "kdu": []},
1320 )
1321 self.db.set_one(
1322 "vnfds_revisions",
1323 q_filter={"_id": vnfd_id + ":1"},
1324 update_dict={"_admin.revision": 1, "software-version": "1.0", "kdu": []},
1325 )
1326 self.db.set_one("vnfrs", q_filter={"_id": vnfr_id}, update_dict={"revision": 1})
1327
1328 mock_charm_hash.return_value = True
1329
1330 task = asyncio.Future()
1331 task.set_result(("FAILED", "some_error"))
1332 mock_charm_upgrade.return_value = task
1333
1334 instance = self.my_ns
1335 fs = deepcopy(update_fs)
1336 instance.fs = fs
1337 expected_operation_state = "FAILED"
1338 expected_operation_error = "some_error"
1339 expected_vnfr_revision = 1
1340 expected_ns_state = "INSTANTIATED"
1341 expected_ns_operational_state = "running"
1342
1343 await instance.update(nsr_id, nslcmop_id)
1344 return_operation_state = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1345 "operationState"
1346 )
1347 return_operation_error = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
1348 "errorMessage"
1349 )
1350 return_ns_operational_state = self.db.get_one("nsrs", {"_id": nsr_id}).get(
1351 "operational-status"
1352 )
1353 return_vnfr_revision = self.db.get_one("vnfrs", {"_id": vnfr_id}).get(
1354 "revision"
1355 )
1356 return_ns_state = self.db.get_one("nsrs", {"_id": nsr_id}).get("nsState")
1357 self.assertEqual(fs.sync.call_count, 2)
1358 self.assertEqual(mock_charm_hash.call_count, 1)
1359 self.assertEqual(mock_charm_upgrade.call_count, 1)
1360 self.assertEqual(return_ns_state, expected_ns_state)
1361 self.assertEqual(return_operation_state, expected_operation_state)
1362 self.assertEqual(return_operation_error, expected_operation_error)
1363 self.assertEqual(return_ns_operational_state, expected_ns_operational_state)
1364 self.assertEqual(return_vnfr_revision, expected_vnfr_revision)
1365
1366 def test_ns_update_find_sw_version_vnfd_not_includes(self):
1367 """Find software version, VNFD does not have software version"""
1368
1369 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1370 expected_result = "1.0"
1371 result = find_software_version(db_vnfd)
1372 self.assertEqual(result, expected_result, "Default sw version should be 1.0")
1373
1374 def test_ns_update_find_sw_version_vnfd_includes(self):
1375 """Find software version, VNFD includes software version"""
1376
1377 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1378 db_vnfd["software-version"] = "3.1"
1379 expected_result = "3.1"
1380 result = find_software_version(db_vnfd)
1381 self.assertEqual(result, expected_result, "VNFD software version is wrong")
1382
1383 @patch("os.path.exists")
1384 @patch("osm_lcm.lcm_utils.LcmBase.compare_charmdir_hash")
1385 @patch("osm_lcm.lcm_utils.LcmBase.compare_charm_hash")
1386 def test_ns_update_check_charm_hash_not_changed(
1387 self, mock_compare_charm_hash, mock_compare_charmdir_hash, mock_path_exists
1388 ):
1389 """Check charm hash, Hash did not change"""
1390
1391 current_path, target_path = "/tmp/charm1", "/tmp/charm1"
1392
1393 fs = Mock()
1394 fs.path.__add__ = Mock()
1395 fs.path.side_effect = [current_path, target_path]
1396 fs.path.__add__.side_effect = [current_path, target_path]
1397
1398 mock_path_exists.side_effect = [True, True]
1399
1400 mock_compare_charmdir_hash.return_value = callable(False)
1401 mock_compare_charm_hash.return_value = callable(False)
1402
1403 instance = self.my_ns
1404 instance.fs = fs
1405 expected_result = False
1406
1407 result = instance.check_charm_hash_changed(current_path, target_path)
1408 self.assertEqual(result, expected_result, "Wrong charm hash control value")
1409 self.assertEqual(mock_path_exists.call_count, 2)
1410 self.assertEqual(mock_compare_charmdir_hash.call_count, 1)
1411 self.assertEqual(mock_compare_charm_hash.call_count, 0)
1412
1413 @patch("os.path.exists")
1414 @patch("osm_lcm.lcm_utils.LcmBase.compare_charmdir_hash")
1415 @patch("osm_lcm.lcm_utils.LcmBase.compare_charm_hash")
1416 def test_ns_update_check_charm_hash_changed(
1417 self, mock_compare_charm_hash, mock_compare_charmdir_hash, mock_path_exists
1418 ):
1419 """Check charm hash, Hash has changed"""
1420
1421 current_path, target_path = "/tmp/charm1", "/tmp/charm2"
1422
1423 fs = Mock()
1424 fs.path.__add__ = Mock()
1425 fs.path.side_effect = [current_path, target_path, current_path, target_path]
1426 fs.path.__add__.side_effect = [
1427 current_path,
1428 target_path,
1429 current_path,
1430 target_path,
1431 ]
1432
1433 mock_path_exists.side_effect = [True, True]
1434 mock_compare_charmdir_hash.return_value = callable(True)
1435 mock_compare_charm_hash.return_value = callable(True)
1436
1437 instance = self.my_ns
1438 instance.fs = fs
1439 expected_result = True
1440
1441 result = instance.check_charm_hash_changed(current_path, target_path)
1442 self.assertEqual(result, expected_result, "Wrong charm hash control value")
1443 self.assertEqual(mock_path_exists.call_count, 2)
1444 self.assertEqual(mock_compare_charmdir_hash.call_count, 1)
1445 self.assertEqual(mock_compare_charm_hash.call_count, 0)
1446
1447 @patch("os.path.exists")
1448 @patch("osm_lcm.lcm_utils.LcmBase.compare_charmdir_hash")
1449 @patch("osm_lcm.lcm_utils.LcmBase.compare_charm_hash")
1450 def test_ns_update_check_no_charm_path(
1451 self, mock_compare_charm_hash, mock_compare_charmdir_hash, mock_path_exists
1452 ):
1453 """Check charm hash, Charm path does not exist"""
1454
1455 current_path, target_path = "/tmp/charm1", "/tmp/charm2"
1456
1457 fs = Mock()
1458 fs.path.__add__ = Mock()
1459 fs.path.side_effect = [current_path, target_path, current_path, target_path]
1460 fs.path.__add__.side_effect = [
1461 current_path,
1462 target_path,
1463 current_path,
1464 target_path,
1465 ]
1466
1467 mock_path_exists.side_effect = [True, False]
1468
1469 mock_compare_charmdir_hash.return_value = callable(False)
1470 mock_compare_charm_hash.return_value = callable(False)
1471 instance = self.my_ns
1472 instance.fs = fs
1473
1474 with self.assertRaises(LcmException):
1475 instance.check_charm_hash_changed(current_path, target_path)
1476 self.assertEqual(mock_path_exists.call_count, 2)
1477 self.assertEqual(mock_compare_charmdir_hash.call_count, 0)
1478 self.assertEqual(mock_compare_charm_hash.call_count, 0)
1479
1480 def test_ns_update_check_juju_bundle_existence_bundle_exists(self):
1481 """Check juju bundle existence"""
1482 test_vnfd2 = self.db.get_one(
1483 "vnfds", {"_id": "d96b1cdf-5ad6-49f7-bf65-907ada989293"}
1484 )
1485 expected_result = "stable/native-kdu"
1486 result = check_juju_bundle_existence(test_vnfd2)
1487 self.assertEqual(result, expected_result, "Wrong juju bundle name")
1488
1489 def test_ns_update_check_juju_bundle_existence_bundle_does_not_exist(self):
1490 """Check juju bundle existence"""
1491 test_vnfd1 = self.db.get_one("vnfds", {"_id": vnfd_id})
1492 expected_result = None
1493 result = check_juju_bundle_existence(test_vnfd1)
1494 self.assertEqual(result, expected_result, "Wrong juju bundle name")
1495
1496 def test_ns_update_check_juju_bundle_existence_empty_vnfd(self):
1497 """Check juju bundle existence"""
1498 test_vnfd1 = {}
1499 expected_result = None
1500 result = check_juju_bundle_existence(test_vnfd1)
1501 self.assertEqual(result, expected_result, "Wrong juju bundle name")
1502
1503 def test_ns_update_check_juju_bundle_existence_invalid_vnfd(self):
1504 """Check juju bundle existence"""
1505 test_vnfd1 = [{"_id": vnfd_id}]
1506 with self.assertRaises(AttributeError):
1507 check_juju_bundle_existence(test_vnfd1)
1508
1509 def test_ns_update_check_juju_charm_artifacts_base_folder_wth_pkgdir(self):
1510 """Check charm artifacts"""
1511 base_folder = {"folder": vnfd_id, "pkg-dir": "hackfest_3charmed_vnfd"}
1512 charm_name = "simple"
1513 charm_type = "lxc_proxy_charm"
1514 revision = 3
1515 expected_result = f"{vnfd_id}:3/hackfest_3charmed_vnfd/charms/simple"
1516 result = get_charm_artifact_path(base_folder, charm_name, charm_type, revision)
1517 self.assertEqual(result, expected_result, "Wrong charm artifact path")
1518
1519 def test_ns_update_check_juju_charm_artifacts_base_folder_wthout_pkgdir(self):
1520 """Check charm artifacts, SOL004 packages"""
1521 base_folder = {"folder": vnfd_id}
1522 charm_name = "basic"
1523 charm_type, revision = "", ""
1524 expected_result = f"{vnfd_id}/Scripts/helm-charts/basic"
1525 result = get_charm_artifact_path(base_folder, charm_name, charm_type, revision)
1526 self.assertEqual(result, expected_result, "Wrong charm artifact path")
1527
1528
1529 class TestInstantiateN2VC(TestBaseNS):
1530 async def setUp(self):
1531 await super().setUp()
1532 self.db_nsr = yaml.safe_load(descriptors.db_nsrs_text)[0]
1533 self.db_vnfr = yaml.safe_load(descriptors.db_vnfrs_text)[0]
1534 self.vca_index = 1
1535 self.my_ns._write_configuration_status = Mock()
1536
1537 async def call_instantiate_N2VC(self):
1538 logging_text = "N2VC Instantiation"
1539 config_descriptor = {"config-access": {"ssh-access": {"default-user": "admin"}}}
1540 base_folder = {"pkg-dir": "", "folder": "~"}
1541 stage = ["Stage", "Message"]
1542
1543 await self.my_ns.instantiate_N2VC(
1544 logging_text=logging_text,
1545 vca_index=self.vca_index,
1546 nsi_id="nsi_id",
1547 db_nsr=self.db_nsr,
1548 db_vnfr=self.db_vnfr,
1549 vdu_id=None,
1550 kdu_name=None,
1551 vdu_index=None,
1552 kdu_index=None,
1553 config_descriptor=config_descriptor,
1554 deploy_params={},
1555 base_folder=base_folder,
1556 nslcmop_id="nslcmop_id",
1557 stage=stage,
1558 vca_type="native_charm",
1559 vca_name="vca_name",
1560 ee_config_descriptor={},
1561 )
1562
1563 def check_config_status(self, expected_status):
1564 self.my_ns._write_configuration_status.assert_called_with(
1565 nsr_id=self.db_nsr["_id"], vca_index=self.vca_index, status=expected_status
1566 )
1567
1568 async def call_ns_add_relation(self):
1569 ee_relation = EERelation(
1570 {
1571 "nsr-id": self.db_nsr["_id"],
1572 "vdu-profile-id": None,
1573 "kdu-resource-profile-id": None,
1574 "vnf-profile-id": "hackfest_vnf1",
1575 "execution-environment-ref": "f48163a6-c807-47bc-9682-f72caef5af85.alf-c-ab",
1576 "endpoint": "127.0.0.1",
1577 }
1578 )
1579
1580 relation = Relation("relation-name", ee_relation, ee_relation)
1581 cached_vnfrs = {"hackfest_vnf1": self.db_vnfr}
1582
1583 return await self.my_ns._add_relation(
1584 relation=relation,
1585 vca_type="native_charm",
1586 db_nsr=self.db_nsr,
1587 cached_vnfds={},
1588 cached_vnfrs=cached_vnfrs,
1589 )
1590
1591 async def test_add_relation_ok(self):
1592 await self.call_instantiate_N2VC()
1593 self.check_config_status(expected_status="READY")
1594
1595 async def test_add_relation_returns_false_raises_exception(self):
1596 self.my_ns._add_vca_relations = asynctest.CoroutineMock(return_value=False)
1597
1598 with self.assertRaises(LcmException) as exception:
1599 await self.call_instantiate_N2VC()
1600
1601 exception_msg = "Relations could not be added to VCA."
1602 self.assertTrue(exception_msg in str(exception.exception))
1603 self.check_config_status(expected_status="BROKEN")
1604
1605 async def test_add_relation_raises_lcm_exception(self):
1606 exception_msg = "Relations FAILED"
1607 self.my_ns._add_vca_relations = asynctest.CoroutineMock(
1608 side_effect=LcmException(exception_msg)
1609 )
1610
1611 with self.assertRaises(LcmException) as exception:
1612 await self.call_instantiate_N2VC()
1613
1614 self.assertTrue(exception_msg in str(exception.exception))
1615 self.check_config_status(expected_status="BROKEN")
1616
1617 async def test_n2vc_add_relation_fails_raises_exception(self):
1618 exception_msg = "N2VC failed to add relations"
1619 self.my_ns.n2vc.add_relation = asynctest.CoroutineMock(
1620 side_effect=N2VCException(exception_msg)
1621 )
1622 with self.assertRaises(LcmException) as exception:
1623 await self.call_ns_add_relation()
1624 self.assertTrue(exception_msg in str(exception.exception))
1625
1626 async def test_n2vc_add_relation_ok_returns_true(self):
1627 self.my_ns.n2vc.add_relation = asynctest.CoroutineMock(return_value=None)
1628 self.assertTrue(await self.call_ns_add_relation())
1629
1630
1631 class TestGetVNFRelations(TestBaseNS):
1632 async def setUp(self):
1633 await super().setUp()
1634 self.db_nsd = yaml.safe_load(descriptors.db_nsds_text)[0]
1635
1636 def test_ns_charm_vca_returns_empty_relations(self):
1637 ns_charm_vca = {"member-vnf-index": None, "target_element": "ns"}
1638 nsr_id = self.db_nsd["id"]
1639 deployed_vca = DeployedVCA(nsr_id, ns_charm_vca)
1640
1641 expected_relations = []
1642 self.assertEqual(
1643 expected_relations,
1644 self.my_ns._get_vnf_relations(
1645 nsr_id=nsr_id, nsd=self.db_nsd, vca=deployed_vca, cached_vnfds={}
1646 ),
1647 )
1648
1649 def test_vnf_returns_relation(self):
1650 vnf_vca = {
1651 "member-vnf-index": "1",
1652 "target_element": "vnf/0",
1653 "ee_descriptor_id": "simple-ee",
1654 "vdu_id": "mgmtVM",
1655 }
1656 nsr_id = self.db_nsd["id"]
1657 deployed_vca = DeployedVCA(nsr_id, vnf_vca)
1658
1659 provider_dict = {
1660 "nsr-id": nsr_id,
1661 "vnf-profile-id": "1",
1662 "vdu-profile-id": "mgmtVM",
1663 "kdu-resource-profile-id": None,
1664 "execution-environment-ref": "simple-ee",
1665 "endpoint": "interface",
1666 }
1667
1668 requirer_dict = {
1669 "nsr-id": nsr_id,
1670 "vnf-profile-id": "1",
1671 "vdu-profile-id": "dataVM",
1672 "kdu-resource-profile-id": None,
1673 "execution-environment-ref": "simple-ee",
1674 "endpoint": "interface",
1675 }
1676
1677 provider = EERelation(provider_dict)
1678 requirer = EERelation(requirer_dict)
1679 relation = Relation("relation", provider, requirer)
1680
1681 relations_found = self.my_ns._get_vnf_relations(
1682 nsr_id=nsr_id, nsd=self.db_nsd, vca=deployed_vca, cached_vnfds={}
1683 )
1684
1685 self.assertEqual(1, len(relations_found))
1686 self.assertEqual(relation, relations_found[0])
1687
1688
1689 if __name__ == "__main__":
1690 asynctest.main()