| aticig | 9bc63ac | 2022-07-27 09:32:06 +0300 | [diff] [blame] | 1 | # Copyright 2022 Canonical Ltd. |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 4 | # not use this file except in compliance with the License. You may obtain |
| 5 | # a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 11 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 12 | # License for the specific language governing permissions and limitations |
| 13 | # under the License. |
| 14 | # |
| 15 | # For those usages not covered by the Apache License, Version 2.0 please |
| 16 | # contact: alfonso.tiernosepulveda@telefonica.com |
| 17 | ## |
| 18 | import logging |
| 19 | import os |
| 20 | from unittest.mock import Mock, patch, MagicMock |
| 21 | from unittest import TestCase |
| 22 | |
| 23 | from osm_common.msgkafka import MsgKafka |
| 24 | from osm_common import fslocal |
| 25 | from osm_lcm.data_utils.database.database import Database |
| 26 | from osm_lcm.data_utils.filesystem.filesystem import Filesystem |
| 27 | from osm_lcm.lcm_utils import LcmBase, LcmException |
| 28 | from osm_lcm.tests import test_db_descriptors as descriptors |
| 29 | import yaml |
| 30 | from zipfile import BadZipfile |
| 31 | |
| 32 | |
| 33 | class TestLcmBase(TestCase): |
| 34 | |
| 35 | test_nsr_id = "b63aa1ba-996e-43a7-921a-1aca5ccbc63f" |
| 36 | nsd_package_path = "/" + test_nsr_id |
| 37 | nsd_package_name = "test_nsd" |
| 38 | charm_metadata_file = "/path/charm/metadata.yaml" |
| 39 | |
| 40 | def setUp(self): |
| 41 | # DB |
| 42 | Database.instance = None |
| 43 | self.db = Database({"database": {"driver": "memory"}}).instance.db |
| 44 | self.db.create_list("nsds", yaml.safe_load(descriptors.db_nsds_text)) |
| 45 | self.db.create_list("nsrs", yaml.safe_load(descriptors.db_nsrs_text)) |
| 46 | # Filesystem |
| 47 | self.fs = Filesystem({"storage": {"driver": "local", "path": "/"}}).instance.fs |
| 48 | # Create LCMBase class |
| 49 | self.msg = Mock(MsgKafka()) |
| 50 | self.logger = Mock(logging) |
| 51 | self.my_ns = LcmBase(self.msg, self.logger) |
| 52 | self.my_ns.fs = self.fs |
| 53 | self.my_ns.db = self.db |
| 54 | |
| 55 | def test_get_charm_name_successfully(self): |
| 56 | instance = self.my_ns |
| 57 | mock_open = MagicMock(open) |
| 58 | mock_yaml = MagicMock(yaml) |
| 59 | mock_yaml.safe_load.return_value = {"name": "test_charm"} |
| 60 | expected_result = "test_charm" |
| 61 | |
| 62 | with patch("osm_lcm.lcm_utils.open", mock_open), patch( |
| 63 | "osm_lcm.lcm_utils.yaml.safe_load", mock_yaml.safe_load |
| 64 | ): |
| 65 | result = instance.get_charm_name(TestLcmBase.charm_metadata_file) |
| 66 | self.assertEqual(result, expected_result, "wrong charm name") |
| 67 | self.assertEqual(mock_yaml.safe_load.call_count, 1) |
| 68 | self.assertEqual(mock_open.call_count, 1) |
| 69 | |
| 70 | def test_get_charm_name_can_not_open_metadata_file(self): |
| 71 | instance = self.my_ns |
| 72 | mock_open = MagicMock(open) |
| 73 | mock_open.side_effect = IOError |
| 74 | mock_yaml = MagicMock(create_autospec=True) |
| 75 | |
| 76 | with patch("osm_lcm.lcm_utils.open", mock_open), patch( |
| 77 | "osm_lcm.lcm_utils.yaml.safe_load", mock_yaml.safe_load |
| 78 | ): |
| 79 | with self.assertRaises(IOError): |
| 80 | instance.get_charm_name(TestLcmBase.charm_metadata_file) |
| 81 | mock_yaml.safe_load.assert_not_called() |
| 82 | self.assertEqual(mock_open.call_count, 1) |
| 83 | |
| 84 | def test_get_charm_name_wrong_metadata_file_format(self): |
| 85 | instance = self.my_ns |
| 86 | mock_open = MagicMock(open) |
| 87 | mock_yaml = MagicMock(create_autospec=True) |
| 88 | mock_yaml.safe_load.return_value = {} |
| 89 | |
| 90 | with patch("osm_lcm.lcm_utils.open", mock_open), patch( |
| 91 | "osm_lcm.lcm_utils.yaml.safe_load", mock_yaml.safe_load |
| 92 | ): |
| 93 | with self.assertRaises(KeyError): |
| 94 | instance.get_charm_name(TestLcmBase.charm_metadata_file) |
| 95 | self.assertEqual(mock_open.call_count, 1) |
| 96 | self.assertEqual(mock_yaml.safe_load.call_count, 1) |
| 97 | |
| 98 | def test_get_charm_path_successfully(self): |
| 99 | instance = self.my_ns |
| 100 | fs = fslocal.FsLocal() |
| 101 | fs.path = "/app/storage" |
| 102 | instance.fs = fs |
| 103 | charm_folder_name = "simple_charm" |
| 104 | expected_result = "/app/storage/b63aa1ba-996e-43a7-921a-1aca5ccbc63f/test_nsd/charms/simple_charm" |
| 105 | result = instance._get_charm_path( |
| 106 | TestLcmBase.nsd_package_path, TestLcmBase.nsd_package_name, charm_folder_name |
| 107 | ) |
| 108 | self.assertEqual(result, expected_result, "wrong_charm_path") |
| 109 | |
| 110 | def test_get_charm_metadata_file_charm_is_not_zipped(self): |
| 111 | instance = self.my_ns |
| 112 | fs = fslocal.FsLocal() |
| 113 | fs.path = "/app/storage" |
| 114 | instance.fs = fs |
| 115 | mock_zipfile = MagicMock(create_autospec=True) |
| 116 | charm_folder_name = "simple_charm" |
| 117 | charm_path = "/app/storage/b63aa1ba-996e-43a7-921a-1aca5ccbc63f/test_nsd/charms/simple_charm" |
| 118 | expected_result = "/app/storage/b63aa1ba-996e-43a7-921a-1aca5ccbc63f/test_nsd/charms/simple_charm/metadata.yaml" |
| 119 | |
| 120 | with patch("osm_lcm.lcm_utils.ZipFile", mock_zipfile): |
| 121 | result = instance._get_charm_metadata_file( |
| 122 | charm_folder_name, |
| 123 | TestLcmBase.nsd_package_path, |
| 124 | TestLcmBase.nsd_package_name, |
| 125 | charm_path=charm_path, |
| 126 | ) |
| 127 | self.assertEqual(result, expected_result, "wrong charm metadata path") |
| 128 | mock_zipfile.assert_not_called() |
| 129 | |
| 130 | def test_get_charm_metadata_file_charm_is_zipped(self): |
| 131 | instance = self.my_ns |
| 132 | fs = fslocal.FsLocal() |
| 133 | fs.path = "/app/storage" |
| 134 | instance.fs = fs |
| 135 | mock_zipfile = MagicMock(create_autospec=True) |
| 136 | mock_zipfile.side_effect = None |
| 137 | charm_folder_name = "simple_charm2.charm" |
| 138 | charm_path = "/app/storage/b63aa1ba-996e-43a7-921a-1aca5ccbc63f/test_nsd/charms/simple_charm" |
| 139 | expected_result = "/app/storage/b63aa1ba-996e-43a7-921a-1aca5ccbc63f/test_nsd/charms/simple_charm2/metadata.yaml" |
| 140 | |
| 141 | with patch("osm_lcm.lcm_utils.ZipFile", mock_zipfile): |
| 142 | result = instance._get_charm_metadata_file( |
| 143 | charm_folder_name, |
| 144 | TestLcmBase.nsd_package_path, |
| 145 | TestLcmBase.nsd_package_name, |
| 146 | charm_path=charm_path, |
| 147 | ) |
| 148 | self.assertEqual(result, expected_result, "wrong charm metadata path") |
| 149 | self.assertEqual(mock_zipfile.call_count, 1) |
| 150 | |
| 151 | def test_find_charm_name_successfully(self): |
| 152 | db_nsr = self.db.get_one("nsrs", {"_id": TestLcmBase.test_nsr_id}) |
| 153 | instance = self.my_ns |
| 154 | mock_listdir = MagicMock(os.listdir()) |
| 155 | mock_listdir.side_effect = None |
| 156 | mock_charm_path = MagicMock() |
| 157 | mock_metadata_file = MagicMock() |
| 158 | mock_metadata_file.return_value = ( |
| 159 | "/b63aa1ba-996e-43a7-921a-1aca5ccbc63f/test_nsd/charms/simple/metadata.yaml" |
| 160 | ) |
| 161 | mock_charm_name = MagicMock() |
| 162 | mock_charm_name.return_value = "test_charm" |
| 163 | expected_result = "test_charm" |
| 164 | |
| 165 | with patch("osm_lcm.lcm_utils.LcmBase._get_charm_path", mock_charm_path), patch( |
| 166 | "osm_lcm.lcm_utils.LcmBase._get_charm_metadata_file", mock_metadata_file |
| 167 | ), patch("osm_lcm.lcm_utils.LcmBase.get_charm_name", mock_charm_name), patch( |
| 168 | "osm_lcm.lcm_utils.os.listdir", mock_listdir |
| 169 | ): |
| 170 | |
| 171 | result = instance.find_charm_name(db_nsr, "simple") |
| 172 | self.assertEqual(result, expected_result, "Wrong charm name") |
| 173 | mock_charm_path.assert_called_once() |
| 174 | mock_metadata_file.assert_called_once() |
| 175 | mock_charm_name.assert_called_once_with( |
| 176 | "/b63aa1ba-996e-43a7-921a-1aca5ccbc63f/test_nsd/charms/simple/metadata.yaml" |
| 177 | ) |
| 178 | self.assertEqual(mock_listdir.call_count, 1) |
| 179 | |
| 180 | def test_find_charm_name_charm_bad_zipfile(self): |
| 181 | db_nsr = self.db.get_one("nsrs", {"_id": TestLcmBase.test_nsr_id}) |
| 182 | instance = self.my_ns |
| 183 | mock_listdir = MagicMock(os.listdir()) |
| 184 | mock_listdir.side_effect = None |
| 185 | mock_charm_path = MagicMock() |
| 186 | mock_metadata_file = MagicMock() |
| 187 | mock_metadata_file.side_effect = BadZipfile |
| 188 | mock_charm_name = MagicMock() |
| 189 | |
| 190 | with patch("osm_lcm.lcm_utils.LcmBase._get_charm_path", mock_charm_path), patch( |
| 191 | "osm_lcm.lcm_utils.LcmBase._get_charm_metadata_file", mock_metadata_file |
| 192 | ), patch("osm_lcm.lcm_utils.LcmBase.get_charm_name", mock_charm_name), patch( |
| 193 | "osm_lcm.lcm_utils.os.listdir", mock_listdir |
| 194 | ): |
| 195 | |
| 196 | with self.assertRaises(LcmException): |
| 197 | |
| 198 | instance.find_charm_name(db_nsr, "simple") |
| 199 | self.assertEqual(mock_listdir.call_count, 1) |
| 200 | self.assertEqual(mock_charm_path.call_count, 1) |
| 201 | self.assertEqual(mock_metadata_file.call_count, 1) |
| 202 | mock_charm_name.assert_not_called() |
| 203 | |
| 204 | def test_find_charm_name_charm_nsd_package_does_not_exist(self): |
| 205 | db_nsr = self.db.get_one("nsrs", {"_id": TestLcmBase.test_nsr_id}) |
| 206 | instance = self.my_ns |
| 207 | mock_listdir = MagicMock(os.listdir()) |
| 208 | mock_listdir.side_effect = FileNotFoundError |
| 209 | mock_charm_path = MagicMock() |
| 210 | mock_metadata_file = MagicMock() |
| 211 | mock_charm_name = MagicMock() |
| 212 | |
| 213 | with patch("osm_lcm.lcm_utils.LcmBase._get_charm_path", mock_charm_path), patch( |
| 214 | "osm_lcm.lcm_utils.LcmBase._get_charm_metadata_file", mock_metadata_file |
| 215 | ), patch("osm_lcm.lcm_utils.LcmBase.get_charm_name", mock_charm_name), patch( |
| 216 | "osm_lcm.lcm_utils.os.listdir", mock_listdir |
| 217 | ): |
| 218 | |
| 219 | with self.assertRaises(LcmException): |
| 220 | instance.find_charm_name(db_nsr, "simple") |
| 221 | self.assertEqual(mock_listdir.call_count, 1) |
| 222 | mock_charm_path.assert_not_called() |
| 223 | mock_metadata_file.assert_not_called() |
| 224 | mock_charm_name.assert_not_called() |
| 225 | |
| 226 | def test_find_charm_name_missing_input_charm_folder_name(self): |
| 227 | db_nsr = self.db.get_one("nsrs", {"_id": TestLcmBase.test_nsr_id}) |
| 228 | instance = self.my_ns |
| 229 | mock_listdir = MagicMock(os.listdir()) |
| 230 | mock_charm_path = MagicMock() |
| 231 | mock_metadata_file = MagicMock() |
| 232 | mock_charm_name = MagicMock() |
| 233 | |
| 234 | with patch("osm_lcm.lcm_utils.LcmBase._get_charm_path", mock_charm_path), patch( |
| 235 | "osm_lcm.lcm_utils.LcmBase._get_charm_metadata_file", mock_metadata_file |
| 236 | ), patch("osm_lcm.lcm_utils.LcmBase.get_charm_name", mock_charm_name), patch( |
| 237 | "osm_lcm.lcm_utils.os.listdir", mock_listdir |
| 238 | ): |
| 239 | |
| 240 | with self.assertRaises(LcmException): |
| 241 | instance.find_charm_name(db_nsr, "") |
| 242 | mock_charm_path.assert_not_called() |
| 243 | mock_metadata_file.assert_not_called() |
| 244 | mock_charm_name.assert_not_called() |
| 245 | mock_listdir.assert_not_called() |
| 246 | |
| 247 | def test_find_charm_name_can_not_open_metadata_file(self): |
| 248 | db_nsr = self.db.get_one("nsrs", {"_id": TestLcmBase.test_nsr_id}) |
| 249 | instance = self.my_ns |
| 250 | mock_listdir = MagicMock(os.listdir()) |
| 251 | mock_charm_path = MagicMock() |
| 252 | mock_metadata_file = MagicMock() |
| 253 | mock_charm_name = MagicMock() |
| 254 | mock_charm_name.side_effect = yaml.YAMLError |
| 255 | |
| 256 | with patch("osm_lcm.lcm_utils.LcmBase._get_charm_path", mock_charm_path), patch( |
| 257 | "osm_lcm.lcm_utils.LcmBase._get_charm_metadata_file", mock_metadata_file |
| 258 | ), patch("osm_lcm.lcm_utils.LcmBase.get_charm_name", mock_charm_name), patch( |
| 259 | "osm_lcm.lcm_utils.os.listdir", mock_listdir |
| 260 | ): |
| 261 | |
| 262 | with self.assertRaises(LcmException): |
| 263 | instance.find_charm_name(db_nsr, "simple") |
| 264 | self.assertEqual(mock_charm_path.call_count, 1) |
| 265 | self.assertEqual(mock_metadata_file.call_count, 1) |
| 266 | self.assertEqual(mock_charm_name.call_count, 1) |
| 267 | self.assertEqual(mock_listdir.call_count, 1) |