Fixing flake and black issues in code, enabling the same in tox
[osm/LCM.git] / osm_lcm / tests / test_ns.py
index 4234562..2004701 100644 (file)
@@ -122,19 +122,15 @@ class TestMyNS(asynctest.TestCase):
         print("kwargs > {}".format(kwargs))
         if args:
             if "update" in args:
-                ro_ns_desc = yaml.load(
-                    descriptors.ro_update_action_text, Loader=yaml.Loader
-                )
+                ro_ns_desc = yaml.safe_load(descriptors.ro_update_action_text)
                 while True:
                     yield ro_ns_desc
         if kwargs.get("delete"):
-            ro_ns_desc = yaml.load(
-                descriptors.ro_delete_action_text, Loader=yaml.Loader
-            )
+            ro_ns_desc = yaml.safe_load(descriptors.ro_delete_action_text)
             while True:
                 yield ro_ns_desc
 
-        ro_ns_desc = yaml.load(descriptors.ro_ns_text, Loader=yaml.Loader)
+        ro_ns_desc = yaml.safe_load(descriptors.ro_ns_text)
 
         # if ip address provided, replace descriptor
         ip_addresses = getenv("OSMLCMTEST_NS_IPADDRESS", "")
@@ -176,36 +172,26 @@ class TestMyNS(asynctest.TestCase):
             Database.instance = None
 
             self.db = Database({"database": {"driver": "memory"}}).instance.db
-            self.db.create_list(
-                "vnfds", yaml.load(descriptors.db_vnfds_text, Loader=yaml.Loader)
-            )
+            self.db.create_list("vnfds", yaml.safe_load(descriptors.db_vnfds_text))
             self.db.create_list(
                 "vnfds_revisions",
-                yaml.load(descriptors.db_vnfds_revisions_text, Loader=yaml.Loader),
-            )
-            self.db.create_list(
-                "nsds", yaml.load(descriptors.db_nsds_text, Loader=yaml.Loader)
-            )
-            self.db.create_list(
-                "nsrs", yaml.load(descriptors.db_nsrs_text, Loader=yaml.Loader)
+                yaml.safe_load(descriptors.db_vnfds_revisions_text),
             )
+            self.db.create_list("nsds", yaml.safe_load(descriptors.db_nsds_text))
+            self.db.create_list("nsrs", yaml.safe_load(descriptors.db_nsrs_text))
             self.db.create_list(
                 "vim_accounts",
-                yaml.load(descriptors.db_vim_accounts_text, Loader=yaml.Loader),
+                yaml.safe_load(descriptors.db_vim_accounts_text),
             )
             self.db.create_list(
                 "k8sclusters",
-                yaml.load(descriptors.db_k8sclusters_text, Loader=yaml.Loader),
+                yaml.safe_load(descriptors.db_k8sclusters_text),
             )
             self.db.create_list(
-                "nslcmops", yaml.load(descriptors.db_nslcmops_text, Loader=yaml.Loader)
-            )
-            self.db.create_list(
-                "vnfrs", yaml.load(descriptors.db_vnfrs_text, Loader=yaml.Loader)
-            )
-            self.db_vim_accounts = yaml.load(
-                descriptors.db_vim_accounts_text, Loader=yaml.Loader
+                "nslcmops", yaml.safe_load(descriptors.db_nslcmops_text)
             )
+            self.db.create_list("vnfrs", yaml.safe_load(descriptors.db_vnfrs_text))
+            self.db_vim_accounts = yaml.safe_load(descriptors.db_vim_accounts_text)
 
         # Mock kafka
         self.msg = asynctest.Mock(MsgKafka())
@@ -390,6 +376,38 @@ class TestMyNS(asynctest.TestCase):
     #     await self.test_instantiate()
     #     # this will check that the initial-congig-primitive 'not_to_be_called' is not called
 
+    @asynctest.fail_on(active_handles=True)
+    async def test_start_stop_rebuild_pass(self):
+        nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"]
+        vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+        additional_param = {"count-index": "0"}
+        operation_type = "start"
+        await self.my_ns.rebuild_start_stop(
+            nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+        )
+        expected_value = "COMPLETED"
+        return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+            "operationState"
+        )
+        self.assertEqual(return_value, expected_value)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_start_stop_rebuild_fail(self):
+        nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"]
+        vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+        additional_param = {"count-index": "0"}
+        operation_type = "stop"
+        await self.my_ns.rebuild_start_stop(
+            nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+        )
+        expected_value = "Error"
+        return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+            "operationState"
+        )
+        self.assertEqual(return_value, expected_value)
+
     # Test scale() and related methods
     @asynctest.fail_on(active_handles=True)  # all async tasks must be completed
     async def test_scale(self):
@@ -797,16 +815,53 @@ class TestMyNS(asynctest.TestCase):
         nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
         nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
         vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
-        self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
-        await self.my_ns.update(nsr_id, nslcmop_id)
-        expected_value = "COMPLETED"
+        mock_wait_ng_ro = asynctest.CoroutineMock()
+        with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+            await self.my_ns.update(nsr_id, nslcmop_id)
+            expected_value = "COMPLETED"
+            return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+                "operationState"
+            )
+            self.assertEqual(return_value, expected_value)
+            with self.assertRaises(Exception) as context:
+                self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+            self.assertTrue(
+                "database exception Not found entry with filter"
+                in str(context.exception)
+            )
+
+    # test vertical scale executes sucessfully
+    # @patch("osm_lcm.ng_ro.status.response")
+    @asynctest.fail_on(active_handles=True)
+    async def test_vertical_scaling(self):
+        nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"]
+
+        # calling the vertical scale fucntion
+        # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
+        mock_wait_ng_ro = asynctest.CoroutineMock()
+        with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+            await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
+            return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+                "operationState"
+            )
+            expected_value = "COMPLETED"
+            self.assertEqual(return_value, expected_value)
+
+    # test vertical scale executes fail
+    @asynctest.fail_on(active_handles=True)
+    async def test_vertical_scaling_fail(self):
+        # get th nsr nad nslcmops id from descriptors
+        nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"]
+
+        # calling the vertical scale fucntion
+        await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
         return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
             "operationState"
         )
+        expected_value = "FAILED"
         self.assertEqual(return_value, expected_value)
-        with self.assertRaises(Exception) as context:
-            self.db.get_one("vnfrs", {"_id": vnf_instance_id})
-        self.assertTrue("database exception Not found entry with filter" in str(context.exception))
 
     # async def test_instantiate_pdu(self):
     #     nsr_id = descriptors.test_ids["TEST-A"]["ns"]
@@ -1491,7 +1546,7 @@ class TestMyNS(asynctest.TestCase):
             result = find_software_version(db_vnfd)
             self.assertEqual(result, expected_result, "VNFD software version is wrong")
 
-        with self.subTest(i=3, t="Check charm hash, Hash has did not change"):
+        with self.subTest(i=3, t="Check charm hash, Hash did not change"):
             # Testing method check_charm_hash_changed
 
             current_path, target_path = "/tmp/charm1", "/tmp/charm1"