Feature 10911-Vertical scaling of VM instances from OSM
[osm/LCM.git] / osm_lcm / tests / test_ns.py
index 7613a88..d7192c9 100644 (file)
@@ -390,6 +390,38 @@ class TestMyNS(asynctest.TestCase):
     #     await self.test_instantiate()
     #     # this will check that the initial-congig-primitive 'not_to_be_called' is not called
 
+    @asynctest.fail_on(active_handles=True)
+    async def test_start_stop_rebuild_pass(self):
+        nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"]
+        vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+        additional_param = {"count-index": "0"}
+        operation_type = "start"
+        await self.my_ns.rebuild_start_stop(
+            nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+        )
+        expected_value = "COMPLETED"
+        return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+            "operationState"
+        )
+        self.assertEqual(return_value, expected_value)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_start_stop_rebuild_fail(self):
+        nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"]
+        vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+        additional_param = {"count-index": "0"}
+        operation_type = "stop"
+        await self.my_ns.rebuild_start_stop(
+            nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+        )
+        expected_value = "Error"
+        return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+            "operationState"
+        )
+        self.assertEqual(return_value, expected_value)
+
     # Test scale() and related methods
     @asynctest.fail_on(active_handles=True)  # all async tasks must be completed
     async def test_scale(self):
@@ -797,16 +829,50 @@ class TestMyNS(asynctest.TestCase):
         nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
         nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
         vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
-        self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
-        await self.my_ns.update(nsr_id, nslcmop_id)
-        expected_value = "COMPLETED"
+        mock_wait_ng_ro = asynctest.CoroutineMock()
+        with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+            await self.my_ns.update(nsr_id, nslcmop_id)
+            expected_value = "COMPLETED"
+            return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+                "operationState"
+            )
+            self.assertEqual(return_value, expected_value)
+            with self.assertRaises(Exception) as context:
+                self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+            self.assertTrue("database exception Not found entry with filter" in str(context.exception))
+
+    # test vertical scale executes sucessfully
+    # @patch("osm_lcm.ng_ro.status.response")
+    @asynctest.fail_on(active_handles=True)
+    async def test_vertical_scaling(self):
+        nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"]
+
+        # calling the vertical scale fucntion
+        # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
+        mock_wait_ng_ro = asynctest.CoroutineMock()
+        with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+            await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
+            return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+                "operationState"
+            )
+            expected_value = "COMPLETED"
+            self.assertEqual(return_value, expected_value)
+
+    # test vertical scale executes fail
+    @asynctest.fail_on(active_handles=True)
+    async def test_vertical_scaling_fail(self):
+        # get th nsr nad nslcmops id from descriptors
+        nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+        nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"]
+
+        # calling the vertical scale fucntion
+        await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
         return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
             "operationState"
         )
+        expected_value = "FAILED"
         self.assertEqual(return_value, expected_value)
-        with self.assertRaises(Exception) as context:
-            self.db.get_one("vnfrs", {"_id": vnf_instance_id})
-        self.assertTrue("database exception Not found entry with filter" in str(context.exception))
 
     # async def test_instantiate_pdu(self):
     #     nsr_id = descriptors.test_ids["TEST-A"]["ns"]
@@ -981,7 +1047,7 @@ class TestMyNS(asynctest.TestCase):
 
                 mock_charm_hash.assert_called_with(
                     "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
-                    "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
+                    "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple",
                 )
 
                 self.assertEqual(fs.sync.call_count, 2)
@@ -1151,7 +1217,7 @@ class TestMyNS(asynctest.TestCase):
 
                 mock_charm_hash.assert_called_with(
                     "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
-                    "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
+                    "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple",
                 )
 
                 self.assertEqual(fs.sync.call_count, 2)