1b5b156b8204bd4163e1e7b65d994d579ce71cdb
[osm/SO.git] / common / python / rift / mano / tosca_translator / test / tosca_translator_ut.py
1 #!/usr/bin/env python3
2
3 # Copyright 2016 RIFT.io Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 '''
18 Unittest for TOSCA tranlator to RIFT.io YANG model
19 '''
20
21 import argparse
22 import logging
23 import os
24 import shutil
25 import sys
26 import tarfile
27 import tempfile
28 import xmlrunner
29
30 import unittest
31
32 import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
33
34 from rift.mano.tosca_translator.common.utils import _
35 import rift.mano.tosca_translator.shell as shell
36
37 from rift.mano.utils.compare_desc import CompareDescShell
38
39 from rift.package import convert
40
41 from toscaparser.common.exception import TOSCAException
42
43
44 _TRUE_VALUES = ('True', 'true', '1', 'yes')
45
46
47 class PingPongDescriptors(object):
48 def __init__(self):
49 ping_vnfd, pong_vnfd, nsd = \
50 ping_pong_nsd.generate_ping_pong_descriptors(
51 pingcount=1,
52 external_vlr_count=1,
53 internal_vlr_count=0,
54 num_vnf_vms=1,
55 ping_md5sum='1234567890abcdefg',
56 pong_md5sum='1234567890abcdefg',
57 mano_ut=False,
58 use_scale_group=True,
59 use_mon_params=True,
60 )
61 self.ping_pong_nsd = nsd.descriptor.nsd[0]
62 self.ping_vnfd = ping_vnfd.descriptor.vnfd[0]
63 self.pong_vnfd = pong_vnfd.descriptor.vnfd[0]
64
65 class TestToscaTranslator(unittest.TestCase):
66
67 tosca_helloworld = os.path.join(
68 os.path.dirname(os.path.abspath(__file__)),
69 "data/tosca_helloworld.yaml")
70 template_file = '--template-file=' + tosca_helloworld
71 template_validation = "--validate-only"
72 debug="--debug"
73 failure_msg = _('The program raised an exception unexpectedly.')
74
75 log_level = logging.WARN
76 log = None
77
78 exp_descs = None
79
80 @classmethod
81 def setUpClass(cls):
82 fmt = logging.Formatter(
83 '%(asctime)-23s %(levelname)-5s " \
84 "(%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
85 stderr_handler = logging.StreamHandler(stream=sys.stderr)
86 stderr_handler.setFormatter(fmt)
87 logging.basicConfig(level=cls.log_level)
88 cls.log = logging.getLogger('tosca-translator-ut')
89 cls.log.addHandler(stderr_handler)
90 cls.exp_descs = PingPongDescriptors()
91
92 def test_missing_arg(self):
93 self.assertRaises(SystemExit, shell.main, '')
94
95 def test_invalid_file_arg(self):
96 self.assertRaises(SystemExit, shell.main, 'translate me')
97
98 def test_invalid_file_value(self):
99 self.assertRaises(SystemExit,
100 shell.main,
101 ('--template-file=template.txt'))
102
103 def test_invalid_type_value(self):
104 self.assertRaises(SystemExit, shell.main,
105 (self.template_file, '--template-type=xyz'))
106
107 def test_invalid_parameters(self):
108 self.assertRaises(ValueError, shell.main,
109 (self.template_file,
110 '--parameters=key'))
111
112 def test_valid_template(self):
113 try:
114 shell.main([self.template_file])
115 except Exception as e:
116 self.log.exception(e)
117 self.fail(self.failure_msg)
118
119 def test_validate_only(self):
120 try:
121 shell.main([self.template_file,
122 self.template_validation])
123 except Exception as e:
124 self.log.exception(e)
125 self.fail(self.failure_msg)
126
127 template = os.path.join(
128 os.path.dirname(os.path.abspath(__file__)),
129 "data/tosca_helloworld_invalid.yaml")
130 invalid_template = '--template-file=' + template
131 self.assertRaises(TOSCAException, shell.main,
132 [invalid_template,
133 self.template_validation])
134
135 def compare_dict(self, gen_d, exp_d):
136 gen = "--generated="+str(gen_d)
137 exp = "--expected="+str(exp_d)
138 CompareDescShell.compare_dicts(gen, exp, log=self.log)
139
140 def check_output(self, out_dir, archive=False):
141 prev_dir = os.getcwd()
142 os.chdir(out_dir)
143 # Check the archives or directories are present
144 dirs = os.listdir(out_dir)
145 # The desc dirs are using uuid, so cannot match name
146 # Check there are 3 dirs or files
147 self.assertTrue(len(dirs) >= 3)
148
149 try:
150 count = 0
151 for a in dirs:
152 desc = None
153 if archive:
154 if os.path.isfile(a):
155 self.log.debug("Checking archive: {}".format(a))
156 with tarfile.open(a, 'r') as t:
157 for m in t.getnames():
158 if m.endswith('.yaml') or m.endswith('.yml'):
159 # Descriptor file
160 t.extract(m)
161 self.log.debug("Extracted file: {}".format(m))
162 desc = m
163 break
164 else:
165 continue
166
167 else:
168 if os.path.isdir(a):
169 self.log.debug("Checking directory: {}".format(a))
170 for m in os.listdir(a):
171 if m.endswith('.yaml') or m.endswith('.yml'):
172 desc = os.path.join(a, m)
173 break
174
175 if desc:
176 self.log.debug("Checking descriptor: {}".format(desc))
177 with open(desc, 'r') as d:
178 rest, ext = os.path.splitext(desc)
179 if '_vnfd.y' in desc:
180 vnfd = convert.VnfdSerializer().from_file_hdl(d, ext)
181 gen_desc = vnfd.as_dict()
182 if 'ping_vnfd.y' in desc:
183 exp_desc = self.exp_descs.ping_vnfd.as_dict()
184 elif 'pong_vnfd.y' in desc:
185 exp_desc = self.exp_descs.pong_vnfd.as_dict()
186 else:
187 raise Exception("Unknown VNFD descriptor: {}".
188 format(desc))
189 elif '_nsd.y' in desc:
190 nsd = convert.NsdSerializer().from_file_hdl(d, ext)
191 gen_desc = nsd.as_dict()
192 exp_desc = self.exp_descs.ping_pong_nsd.as_dict()
193 else:
194 raise Exception("Unknown file: {}".format(desc))
195
196 # Compare the descriptors
197 self.compare_dict(gen_desc, exp_desc)
198
199 # Increment the count of descriptiors found
200 count += 1
201
202 if count != 3:
203 raise Exception("Did not find expected number of descriptors: {}".
204 format(count))
205 except Exception as e:
206 self.log.exception(e)
207 raise e
208
209 finally:
210 os.chdir(prev_dir)
211
212 def test_output_dir(self):
213 test_base_dir = os.path.join(os.path.dirname(
214 os.path.abspath(__file__)), 'data')
215 template_file = os.path.join(test_base_dir,
216 "ping_pong_csar/Definitions/ping_pong_nsd.yaml")
217 template = '--template-file='+template_file
218 temp_dir = tempfile.mkdtemp()
219 output_dir = "--output-dir=" + temp_dir
220 try:
221 shell.main([template, output_dir], log=self.log)
222
223 except Exception as e:
224 self.log.exception(e)
225 self.fail("Exception in test_output_dir: {}".format(e))
226
227 else:
228 self.check_output(temp_dir)
229
230 finally:
231 if self.log_level != logging.DEBUG:
232 if os.path.exists(temp_dir):
233 shutil.rmtree(temp_dir)
234 else:
235 self.log.warn("Generated desc in {}".format(temp_dir))
236
237 def test_input_csar(self):
238 test_base_dir = os.path.join(
239 os.path.dirname(os.path.abspath(__file__)),
240 'data')
241 template_file = os.path.join(test_base_dir, "ping_pong_csar.zip")
242 template = '--template-file='+template_file
243 temp_dir = tempfile.mkdtemp()
244 output_dir = "--output-dir=" + temp_dir
245
246 try:
247 shell.main([template, output_dir, '--archive'], log=self.log)
248
249 except Exception as e:
250 self.log.exception(e)
251 self.fail("Exception in test_output_dir: {}".format(e))
252
253 else:
254 self.check_output(temp_dir, archive=True)
255
256 finally:
257 if self.log_level != logging.DEBUG:
258 if os.path.exists(temp_dir):
259 shutil.rmtree(temp_dir)
260 else:
261 self.log.warn("Generated desc in {}".format(temp_dir))
262
263 def test_input_csar_no_gi(self):
264 test_base_dir = os.path.join(
265 os.path.dirname(os.path.abspath(__file__)),
266 'data')
267 template_file = os.path.join(test_base_dir, "ping_pong_csar.zip")
268 template = '--template-file='+template_file
269 temp_dir = tempfile.mkdtemp()
270 output_dir = "--output-dir=" + temp_dir
271 no_gi = '--no-gi'
272
273 try:
274 shell.main([template, output_dir, no_gi, '--archive'], log=self.log)
275
276 except Exception as e:
277 self.log.exception(e)
278 self.fail("Exception in input_csar_no_gi: {}".format(e))
279
280 else:
281 self.check_output(temp_dir, archive=True)
282
283 finally:
284 if self.log_level != logging.DEBUG:
285 if os.path.exists(temp_dir):
286 shutil.rmtree(temp_dir)
287 else:
288 self.log.warn("Generated desc in {}".format(temp_dir))
289
290 def main():
291 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
292
293 parser = argparse.ArgumentParser()
294 parser.add_argument('-v', '--verbose', action='store_true')
295 parser.add_argument('-n', '--no-runner', action='store_true')
296 args, unittest_args = parser.parse_known_args()
297 if args.no_runner:
298 runner = None
299
300 TestToscaTranslator.log_level = logging.DEBUG if args.verbose else logging.WARN
301
302 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
303
304 if __name__ == '__main__':
305 main()