Fixing Basic 12 Ns Primitives
[osm/LCM.git] / osm_lcm / tests / test_ns.py
index 5e248a2..560edc9 100644 (file)
@@ -122,19 +122,19 @@ class TestMyNS(asynctest.TestCase):
         print("kwargs > {}".format(kwargs))
         if args:
             if "update" in args:
-                ro_ns_desc = yaml.load(
-                    descriptors.ro_update_action_text, Loader=yaml.Loader
+                ro_ns_desc = yaml.safe_load(
+                    descriptors.ro_update_action_text
                 )
                 while True:
                     yield ro_ns_desc
         if kwargs.get("delete"):
-            ro_ns_desc = yaml.load(
-                descriptors.ro_delete_action_text, Loader=yaml.Loader
+            ro_ns_desc = yaml.safe_load(
+                descriptors.ro_delete_action_text
             )
             while True:
                 yield ro_ns_desc
 
-        ro_ns_desc = yaml.load(descriptors.ro_ns_text, Loader=yaml.Loader)
+        ro_ns_desc = yaml.safe_load(descriptors.ro_ns_text)
 
         # if ip address provided, replace descriptor
         ip_addresses = getenv("OSMLCMTEST_NS_IPADDRESS", "")
@@ -177,34 +177,34 @@ class TestMyNS(asynctest.TestCase):
 
             self.db = Database({"database": {"driver": "memory"}}).instance.db
             self.db.create_list(
-                "vnfds", yaml.load(descriptors.db_vnfds_text, Loader=yaml.Loader)
+                "vnfds", yaml.safe_load(descriptors.db_vnfds_text)
             )
             self.db.create_list(
                 "vnfds_revisions",
-                yaml.load(descriptors.db_vnfds_revisions_text, Loader=yaml.Loader),
+                yaml.safe_load(descriptors.db_vnfds_revisions_text),
             )
             self.db.create_list(
-                "nsds", yaml.load(descriptors.db_nsds_text, Loader=yaml.Loader)
+                "nsds", yaml.safe_load(descriptors.db_nsds_text)
             )
             self.db.create_list(
-                "nsrs", yaml.load(descriptors.db_nsrs_text, Loader=yaml.Loader)
+                "nsrs", yaml.safe_load(descriptors.db_nsrs_text)
             )
             self.db.create_list(
                 "vim_accounts",
-                yaml.load(descriptors.db_vim_accounts_text, Loader=yaml.Loader),
+                yaml.safe_load(descriptors.db_vim_accounts_text),
             )
             self.db.create_list(
                 "k8sclusters",
-                yaml.load(descriptors.db_k8sclusters_text, Loader=yaml.Loader),
+                yaml.safe_load(descriptors.db_k8sclusters_text),
             )
             self.db.create_list(
-                "nslcmops", yaml.load(descriptors.db_nslcmops_text, Loader=yaml.Loader)
+                "nslcmops", yaml.safe_load(descriptors.db_nslcmops_text)
             )
             self.db.create_list(
-                "vnfrs", yaml.load(descriptors.db_vnfrs_text, Loader=yaml.Loader)
+                "vnfrs", yaml.safe_load(descriptors.db_vnfrs_text)
             )
-            self.db_vim_accounts = yaml.load(
-                descriptors.db_vim_accounts_text, Loader=yaml.Loader
+            self.db_vim_accounts = yaml.safe_load(
+                descriptors.db_vim_accounts_text
             )
 
         # Mock kafka
@@ -390,6 +390,38 @@ class TestMyNS(asynctest.TestCase):
     #     await self.test_instantiate()
     #     # this will check that the initial-congig-primitive 'not_to_be_called' is not called
 
+    @asynctest.fail_on(active_handles=True)
+    async def test_start_stop_rebuild_pass(self):
+        nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"]
+        vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+        additional_param = {"count-index": "0"}
+        operation_type = "start"
+        await self.my_ns.rebuild_start_stop(
+            nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+        )
+        expected_value = "COMPLETED"
+        return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+            "operationState"
+        )
+        self.assertEqual(return_value, expected_value)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_start_stop_rebuild_fail(self):
+        nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"]
+        vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+        additional_param = {"count-index": "0"}
+        operation_type = "stop"
+        await self.my_ns.rebuild_start_stop(
+            nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+        )
+        expected_value = "Error"
+        return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+            "operationState"
+        )
+        self.assertEqual(return_value, expected_value)
+
     # Test scale() and related methods
     @asynctest.fail_on(active_handles=True)  # all async tasks must be completed
     async def test_scale(self):
@@ -809,6 +841,39 @@ class TestMyNS(asynctest.TestCase):
                 self.db.get_one("vnfrs", {"_id": vnf_instance_id})
             self.assertTrue("database exception Not found entry with filter" in str(context.exception))
 
+    # test vertical scale executes sucessfully
+    # @patch("osm_lcm.ng_ro.status.response")
+    @asynctest.fail_on(active_handles=True)
+    async def test_vertical_scaling(self):
+        nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"]
+
+        # calling the vertical scale fucntion
+        # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
+        mock_wait_ng_ro = asynctest.CoroutineMock()
+        with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+            await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
+            return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+                "operationState"
+            )
+            expected_value = "COMPLETED"
+            self.assertEqual(return_value, expected_value)
+
+    # test vertical scale executes fail
+    @asynctest.fail_on(active_handles=True)
+    async def test_vertical_scaling_fail(self):
+        # get th nsr nad nslcmops id from descriptors
+        nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"]
+
+        # calling the vertical scale fucntion
+        await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
+        return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+            "operationState"
+        )
+        expected_value = "FAILED"
+        self.assertEqual(return_value, expected_value)
+
     # async def test_instantiate_pdu(self):
     #     nsr_id = descriptors.test_ids["TEST-A"]["ns"]
     #     nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
@@ -1492,7 +1557,7 @@ class TestMyNS(asynctest.TestCase):
             result = find_software_version(db_vnfd)
             self.assertEqual(result, expected_result, "VNFD software version is wrong")
 
-        with self.subTest(i=3, t="Check charm hash, Hash has did not change"):
+        with self.subTest(i=3, t="Check charm hash, Hash did not change"):
             # Testing method check_charm_hash_changed
 
             current_path, target_path = "/tmp/charm1", "/tmp/charm1"