update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / test / tosca_ut.py
1 #!/usr/bin/env python3
2
3 ############################################################################
4 # Copyright 2016 RIFT.io Inc #
5 # #
6 # Licensed under the Apache License, Version 2.0 (the "License"); #
7 # you may not use this file except in compliance with the License. #
8 # You may obtain a copy of the License at #
9 # #
10 # http://www.apache.org/licenses/LICENSE-2.0 #
11 # #
12 # Unless required by applicable law or agreed to in writing, software #
13 # distributed under the License is distributed on an "AS IS" BASIS, #
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15 # See the License for the specific language governing permissions and #
16 # limitations under the License. #
17 ############################################################################
18
19 import argparse
20 import logging
21 import os
22 import shutil
23 import sys
24 import tarfile
25 import tempfile
26 import unittest
27 import xmlrunner
28
29 #Setting RIFT_VAR_ROOT if not already set for unit test execution
30 if "RIFT_VAR_ROOT" not in os.environ:
31 os.environ['RIFT_VAR_ROOT'] = os.path.join(os.environ['RIFT_INSTALL'], 'var/rift/unittest')
32
33 import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
34
35 from rift.mano.utils.compare_desc import CompareDescShell
36
37 from rift.tasklets.rwlaunchpad.tosca import ExportTosca
38 from rift.tasklets.rwlaunchpad.tosca import ImportTosca
39
40 from rift.package.package import TarPackageArchive
41
42 class PingPongDescriptors(object):
43 def __init__(self):
44 ping_vnfd, pong_vnfd, nsd = \
45 ping_pong_nsd.generate_ping_pong_descriptors(
46 pingcount=1,
47 external_vlr_count=1,
48 internal_vlr_count=0,
49 num_vnf_vms=1,
50 ping_md5sum='1234567890abcdefg',
51 pong_md5sum='1234567890abcdefg',
52 mano_ut=False,
53 use_scale_group=True,
54 use_mon_params=True,
55 use_placement_group=False,
56 use_ns_init_conf=False,
57 )
58 self.ping_pong_nsd = nsd.descriptor.nsd[0]
59 self.ping_vnfd = ping_vnfd.descriptor.vnfd[0]
60 self.pong_vnfd = pong_vnfd.descriptor.vnfd[0]
61
62
63 class ToscaTestCase(unittest.TestCase):
64 """ Unittest for YANG to TOSCA and back translations
65
66 This generates the Ping Pong descrptors using the script
67 in examles and then converts it to TOSCA and back to YANG.
68 """
69 default_timeout = 0
70 top_dir = __file__[:__file__.find('/modules/core/')]
71 log_level = logging.WARN
72 log = None
73
74 @classmethod
75 def setUpClass(cls):
76 fmt = logging.Formatter(
77 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
78 stderr_handler = logging.StreamHandler(stream=sys.stderr)
79 stderr_handler.setFormatter(fmt)
80 logging.basicConfig(level=cls.log_level)
81 cls.log = logging.getLogger('tosca-ut')
82 cls.log.addHandler(stderr_handler)
83
84 def setUp(self):
85 """Run before each test method to initialize test environment."""
86
87 super(ToscaTestCase, self).setUp()
88 self.output_dir = tempfile.mkdtemp()
89
90 def compare_dict(self, gen_d, exp_d):
91 gen = "--generated="+str(gen_d)
92 exp = "--expected="+str(exp_d)
93 CompareDescShell.compare_dicts(gen, exp, log=self.log)
94
95 def yang_to_tosca(self, descs):
96 """Convert YANG model to TOSCA model"""
97 pkg = ExportTosca(self.log)
98 nsd_id = pkg.add_nsd(descs.ping_pong_nsd)
99 pkg.add_vnfd(nsd_id, descs.ping_vnfd)
100 pkg.add_vnfd(nsd_id, descs.pong_vnfd)
101
102 return pkg.create_archive('ping_pong_nsd', self.output_dir)
103
104 def tosca_to_yang(self, tosca_file):
105 """Convert TOSCA model to YANG model"""
106 if ImportTosca.is_tosca_package(tosca_file):
107 # This could be a tosca package, try processing
108 tosca = ImportTosca(self.log, tosca_file, out_dir=self.output_dir)
109 files = tosca.translate()
110 if files is None or len(files) < 3:
111 raise ValueError("Could not process as a "
112 "TOSCA package {}: {}".format(tosca_file, files))
113 else:
114 self.log.info("Tosca package was translated successfully")
115 return files
116 else:
117 raise ValueError("Not a valid TOSCA archive: {}".
118 format(tosca_file))
119
120 def compare_descs(self, descs, yang_files):
121 """Compare the sescriptors generated with original"""
122 for yang_file in yang_files:
123 if tarfile.is_tarfile(yang_file):
124 with open(yang_file, "r+b") as tar:
125 archive = TarPackageArchive(self.log, tar)
126 pkg = archive.create_package()
127 desc_type = pkg.descriptor_type
128 if desc_type == 'nsd':
129 nsd_yang = pkg.descriptor_msg.as_dict()
130 self.compare_dict(nsd_yang,
131 descs.ping_pong_nsd.as_dict())
132 elif desc_type == 'vnfd':
133 vnfd_yang = pkg.descriptor_msg.as_dict()
134 if 'ping_vnfd' == vnfd_yang['name']:
135 self.compare_dict(vnfd_yang,
136 descs.ping_vnfd.as_dict())
137 elif 'pong_vnfd' == vnfd_yang['name']:
138 self.compare_dict(vnfd_yang,
139 descs.pong_vnfd.as_dict())
140 else:
141 raise Exception("Unknown descriptor type {} found: {}".
142 format(desc_type, pkg.files))
143 else:
144 raise Exception("Did not find a valid tar file for yang model: {}".
145 format(yang_file))
146
147 def test_output(self):
148 try:
149 # Generate the Ping Pong descriptors
150 descs = PingPongDescriptors()
151
152 # Translate the descriptors to TOSCA
153 tosca_file = self.yang_to_tosca(descs)
154
155 # Now translate back to YANG
156 yang_files = self.tosca_to_yang(tosca_file)
157
158 # Compare the generated YANG to original
159 self.compare_descs(descs, yang_files)
160
161 # Removing temp dir only on success to allow debug in case of failures
162 if self.output_dir is not None:
163 shutil.rmtree(self.output_dir)
164 self.output_dir = None
165
166 except Exception as e:
167 self.log.exception(e)
168 self.fail("Exception {}".format(e))
169
170
171
172 def main():
173 parser = argparse.ArgumentParser()
174 parser.add_argument('-v', '--verbose', action='store_true')
175 parser.add_argument('-n', '--no-runner', action='store_true')
176 args, unittest_args = parser.parse_known_args()
177 if args.no_runner:
178 runner = None
179 else:
180 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
181
182 ToscaTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
183
184 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
185
186 if __name__ == '__main__':
187 main()