update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / tosca_translator / test / tosca_translator_ut.py
1 #!/usr/bin/env python3
2
3 # Copyright 2016 RIFT.io Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 '''
18 Unittest for TOSCA tranlator to RIFT.io YANG model
19 '''
20
21 import argparse
22 import logging
23 import os
24 import shutil
25 import sys
26 import tarfile
27 import tempfile
28 import xmlrunner
29
30 import unittest
31
32 import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
33
34 from rift.mano.tosca_translator.common.utils import _
35 import rift.mano.tosca_translator.shell as shell
36
37 from rift.mano.utils.compare_desc import CompareDescShell
38
39 from rift.package import convert
40
41 from toscaparser.common.exception import TOSCAException
42
43
44 _TRUE_VALUES = ('True', 'true', '1', 'yes')
45
46
47 class PingPongDescriptors(object):
48 def __init__(self):
49 ping_vnfd, pong_vnfd, nsd = \
50 ping_pong_nsd.generate_ping_pong_descriptors(
51 pingcount=1,
52 external_vlr_count=1,
53 internal_vlr_count=0,
54 num_vnf_vms=1,
55 ping_md5sum='1234567890abcdefg',
56 pong_md5sum='1234567890abcdefg',
57 mano_ut=False,
58 use_scale_group=True,
59 use_mon_params=True,
60 )
61 self.ping_pong_nsd = nsd.descriptor.nsd[0]
62 self.ping_vnfd = ping_vnfd.descriptor.vnfd[0]
63 self.pong_vnfd = pong_vnfd.descriptor.vnfd[0]
64
65 class TestToscaTranslator(unittest.TestCase):
66
67 tosca_helloworld = os.path.join(
68 os.path.dirname(os.path.abspath(__file__)),
69 "data/tosca_helloworld_nfv.yaml")
70 template_file = '--template-file=' + tosca_helloworld
71 template_validation = "--validate-only"
72 debug="--debug"
73 failure_msg = _('The program raised an exception unexpectedly.')
74
75 log_level = logging.WARN
76 log = None
77
78 exp_descs = None
79
80 @classmethod
81 def setUpClass(cls):
82 fmt = logging.Formatter(
83 '%(asctime)-23s %(levelname)-5s " \
84 "(%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
85 stderr_handler = logging.StreamHandler(stream=sys.stderr)
86 stderr_handler.setFormatter(fmt)
87 logging.basicConfig(level=cls.log_level)
88 cls.log = logging.getLogger('tosca-translator-ut')
89 cls.log.addHandler(stderr_handler)
90 cls.exp_descs = PingPongDescriptors()
91
92 def test_missing_arg(self):
93 self.assertRaises(SystemExit, shell.main, '')
94
95 def test_invalid_file_arg(self):
96 self.assertRaises(SystemExit, shell.main, 'translate me')
97
98 def test_invalid_file_value(self):
99 self.assertRaises(SystemExit,
100 shell.main,
101 ('--template-file=template.txt'))
102
103 def test_invalid_type_value(self):
104 self.assertRaises(SystemExit, shell.main,
105 (self.template_file, '--template-type=xyz'))
106
107 def test_invalid_parameters(self):
108 self.assertRaises(ValueError, shell.main,
109 (self.template_file,
110 '--parameters=key'))
111
112 @unittest.skip
113 def test_valid_template(self):
114 try:
115 shell.main([self.template_file])
116 except Exception as e:
117 self.log.exception(e)
118 self.fail(self.failure_msg)
119
120 @unittest.skip
121 def test_validate_only(self):
122 try:
123 shell.main([self.template_file,
124 self.template_validation])
125 except Exception as e:
126 self.log.exception(e)
127 self.fail(self.failure_msg)
128
129 template = os.path.join(
130 os.path.dirname(os.path.abspath(__file__)),
131 "data/tosca_helloworld_invalid.yaml")
132 invalid_template = '--template-file=' + template
133 self.assertRaises(TOSCAException, shell.main,
134 [invalid_template,
135 self.template_validation])
136
137 def compare_dict(self, gen_d, exp_d):
138 gen = "--generated="+str(gen_d)
139 exp = "--expected="+str(exp_d)
140 CompareDescShell.compare_dicts(gen, exp, log=self.log)
141
142 def check_output(self, out_dir, archive=False):
143 prev_dir = os.getcwd()
144 os.chdir(out_dir)
145 # Check the archives or directories are present
146 dirs = os.listdir(out_dir)
147 # The desc dirs are using uuid, so cannot match name
148 # Check there are 3 dirs or files
149 self.assertTrue(len(dirs) >= 3)
150
151 try:
152 count = 0
153 for a in dirs:
154 desc = None
155 if archive:
156 if os.path.isfile(a):
157 self.log.debug("Checking archive: {}".format(a))
158 with tarfile.open(a, 'r') as t:
159 for m in t.getnames():
160 if m.endswith('.yaml') or m.endswith('.yml'):
161 # Descriptor file
162 t.extract(m)
163 self.log.debug("Extracted file: {}".format(m))
164 desc = m
165 break
166 else:
167 continue
168
169 else:
170 if os.path.isdir(a):
171 self.log.debug("Checking directory: {}".format(a))
172 for m in os.listdir(a):
173 if m.endswith('.yaml') or m.endswith('.yml'):
174 desc = os.path.join(a, m)
175 break
176
177 if desc:
178 self.log.debug("Checking descriptor: {}".format(desc))
179 with open(desc, 'r') as d:
180 rest, ext = os.path.splitext(desc)
181 if '_vnfd.y' in desc:
182 vnfd = convert.VnfdSerializer().from_file_hdl(d, ext)
183 gen_desc = vnfd.as_dict()
184 if 'ping_vnfd.y' in desc:
185 exp_desc = self.exp_descs.ping_vnfd.as_dict()
186 elif 'pong_vnfd.y' in desc:
187 exp_desc = self.exp_descs.pong_vnfd.as_dict()
188 else:
189 raise Exception("Unknown VNFD descriptor: {}".
190 format(desc))
191 elif '_nsd.y' in desc:
192 nsd = convert.NsdSerializer().from_file_hdl(d, ext)
193 gen_desc = nsd.as_dict()
194 exp_desc = self.exp_descs.ping_pong_nsd.as_dict()
195 else:
196 raise Exception("Unknown file: {}".format(desc))
197
198 # Compare the descriptors
199 self.compare_dict(gen_desc, exp_desc)
200
201 # Increment the count of descriptiors found
202 count += 1
203
204 if count != 3:
205 raise Exception("Did not find expected number of descriptors: {}".
206 format(count))
207 except Exception as e:
208 self.log.exception(e)
209 raise e
210
211 finally:
212 os.chdir(prev_dir)
213
214 def test_output_dir(self):
215 test_base_dir = os.path.join(os.path.dirname(
216 os.path.abspath(__file__)), 'data')
217 template_file = os.path.join(test_base_dir,
218 "tosca_ping_pong_epa/Definitions/ping_pong_nsd.yaml")
219 template = '--template-file='+template_file
220 temp_dir = tempfile.mkdtemp()
221 output_dir = "--output-dir=" + temp_dir
222 try:
223 shell.main([template, output_dir], log=self.log)
224
225 except Exception as e:
226 self.log.exception(e)
227 self.fail("Exception in test_output_dir: {}".format(e))
228
229 else:
230 self.check_output(temp_dir)
231
232 finally:
233 if self.log_level != logging.DEBUG:
234 if os.path.exists(temp_dir):
235 shutil.rmtree(temp_dir)
236 else:
237 self.log.warn("Generated desc in {}".format(temp_dir))
238
239
240 def test_input_csar(self):
241 test_base_dir = os.path.join(
242 os.path.dirname(os.path.abspath(__file__)),
243 'data')
244 template_file = os.path.join(test_base_dir, "tosca_ping_pong_epa.zip")
245 template = '--template-file='+template_file
246 temp_dir = tempfile.mkdtemp()
247 output_dir = "--output-dir=" + temp_dir
248
249 try:
250 shell.main([template, output_dir, '--archive'], log=self.log)
251
252 except Exception as e:
253 self.log.exception(e)
254 self.fail("Exception in test_output_dir: {}".format(e))
255
256 else:
257 self.check_output(temp_dir, archive=True)
258
259 finally:
260 if self.log_level != logging.DEBUG:
261 if os.path.exists(temp_dir):
262 shutil.rmtree(temp_dir)
263 else:
264 self.log.warn("Generated desc in {}".format(temp_dir))
265
266 @unittest.skip
267 def test_input_csar_no_gi(self):
268 test_base_dir = os.path.join(
269 os.path.dirname(os.path.abspath(__file__)),
270 'data')
271 template_file = os.path.join(test_base_dir, "tosca_ping_pong_epa.zip")
272 template = '--template-file='+template_file
273 temp_dir = tempfile.mkdtemp()
274 output_dir = "--output-dir=" + temp_dir
275 no_gi = '--no-gi'
276
277 try:
278 shell.main([template, output_dir, no_gi, '--archive'], log=self.log)
279
280 except Exception as e:
281 self.log.exception(e)
282 self.fail("Exception in input_csar_no_gi: {}".format(e))
283
284 else:
285 self.check_output(temp_dir, archive=True)
286
287 finally:
288 if self.log_level != logging.DEBUG:
289 if os.path.exists(temp_dir):
290 shutil.rmtree(temp_dir)
291 else:
292 self.log.warn("Generated desc in {}".format(temp_dir))
293
294 def main():
295 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
296
297 parser = argparse.ArgumentParser()
298 parser.add_argument('-v', '--verbose', action='store_true')
299 parser.add_argument('-n', '--no-runner', action='store_true')
300 args, unittest_args = parser.parse_known_args()
301 if args.no_runner:
302 runner = None
303
304 TestToscaTranslator.log_level = logging.DEBUG if args.verbose else logging.WARN
305
306 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
307
308 if __name__ == '__main__':
309 main()