3 ############################################################################
4 # Copyright 2016 RIFT.io Inc #
6 # Licensed under the Apache License, Version 2.0 (the "License"); #
7 # you may not use this file except in compliance with the License. #
8 # You may obtain a copy of the License at #
10 # http://www.apache.org/licenses/LICENSE-2.0 #
12 # Unless required by applicable law or agreed to in writing, software #
13 # distributed under the License is distributed on an "AS IS" BASIS, #
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
15 # See the License for the specific language governing permissions and #
16 # limitations under the License. #
17 ############################################################################
29 #Setting RIFT_VAR_ROOT if not already set for unit test execution
30 if "RIFT_VAR_ROOT" not in os
.environ
:
31 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
33 import rift
.mano
.examples
.ping_pong_nsd
as ping_pong_nsd
35 from rift
.mano
.utils
.compare_desc
import CompareDescShell
37 from rift
.tasklets
.rwlaunchpad
.tosca
import ExportTosca
38 from rift
.tasklets
.rwlaunchpad
.tosca
import ImportTosca
40 from rift
.package
.package
import TarPackageArchive
42 class PingPongDescriptors(object):
44 ping_vnfd
, pong_vnfd
, nsd
= \
45 ping_pong_nsd
.generate_ping_pong_descriptors(
50 ping_md5sum
='1234567890abcdefg',
51 pong_md5sum
='1234567890abcdefg',
55 use_placement_group
=False,
56 use_ns_init_conf
=False,
58 self
.ping_pong_nsd
= nsd
.descriptor
.nsd
[0]
59 self
.ping_vnfd
= ping_vnfd
.descriptor
.vnfd
[0]
60 self
.pong_vnfd
= pong_vnfd
.descriptor
.vnfd
[0]
63 class ToscaTestCase(unittest
.TestCase
):
64 """ Unittest for YANG to TOSCA and back translations
66 This generates the Ping Pong descrptors using the script
67 in examles and then converts it to TOSCA and back to YANG.
70 top_dir
= __file__
[:__file__
.find('/modules/core/')]
71 log_level
= logging
.WARN
76 fmt
= logging
.Formatter(
77 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
78 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
79 stderr_handler
.setFormatter(fmt
)
80 logging
.basicConfig(level
=cls
.log_level
)
81 cls
.log
= logging
.getLogger('tosca-ut')
82 cls
.log
.addHandler(stderr_handler
)
85 """Run before each test method to initialize test environment."""
87 super(ToscaTestCase
, self
).setUp()
88 self
.output_dir
= tempfile
.mkdtemp()
90 def compare_dict(self
, gen_d
, exp_d
):
91 gen
= "--generated="+str(gen_d
)
92 exp
= "--expected="+str(exp_d
)
93 CompareDescShell
.compare_dicts(gen
, exp
, log
=self
.log
)
95 def yang_to_tosca(self
, descs
):
96 """Convert YANG model to TOSCA model"""
97 pkg
= ExportTosca(self
.log
)
98 nsd_id
= pkg
.add_nsd(descs
.ping_pong_nsd
)
99 pkg
.add_vnfd(nsd_id
, descs
.ping_vnfd
)
100 pkg
.add_vnfd(nsd_id
, descs
.pong_vnfd
)
102 return pkg
.create_archive('ping_pong_nsd', self
.output_dir
)
104 def tosca_to_yang(self
, tosca_file
):
105 """Convert TOSCA model to YANG model"""
106 if ImportTosca
.is_tosca_package(tosca_file
):
107 # This could be a tosca package, try processing
108 tosca
= ImportTosca(self
.log
, tosca_file
, out_dir
=self
.output_dir
)
109 files
= tosca
.translate()
110 if files
is None or len(files
) < 3:
111 raise ValueError("Could not process as a "
112 "TOSCA package {}: {}".format(tosca_file
, files
))
114 self
.log
.info("Tosca package was translated successfully")
117 raise ValueError("Not a valid TOSCA archive: {}".
120 def compare_descs(self
, descs
, yang_files
):
121 """Compare the sescriptors generated with original"""
122 for yang_file
in yang_files
:
123 if tarfile
.is_tarfile(yang_file
):
124 with
open(yang_file
, "r+b") as tar
:
125 archive
= TarPackageArchive(self
.log
, tar
)
126 pkg
= archive
.create_package()
127 desc_type
= pkg
.descriptor_type
128 if desc_type
== 'nsd':
129 nsd_yang
= pkg
.descriptor_msg
.as_dict()
130 self
.compare_dict(nsd_yang
,
131 descs
.ping_pong_nsd
.as_dict())
132 elif desc_type
== 'vnfd':
133 vnfd_yang
= pkg
.descriptor_msg
.as_dict()
134 if 'ping_vnfd' == vnfd_yang
['name']:
135 self
.compare_dict(vnfd_yang
,
136 descs
.ping_vnfd
.as_dict())
137 elif 'pong_vnfd' == vnfd_yang
['name']:
138 self
.compare_dict(vnfd_yang
,
139 descs
.pong_vnfd
.as_dict())
141 raise Exception("Unknown descriptor type {} found: {}".
142 format(desc_type
, pkg
.files
))
144 raise Exception("Did not find a valid tar file for yang model: {}".
147 def test_output(self
):
149 # Generate the Ping Pong descriptors
150 descs
= PingPongDescriptors()
152 # Translate the descriptors to TOSCA
153 tosca_file
= self
.yang_to_tosca(descs
)
155 # Now translate back to YANG
156 yang_files
= self
.tosca_to_yang(tosca_file
)
158 # Compare the generated YANG to original
159 self
.compare_descs(descs
, yang_files
)
161 # Removing temp dir only on success to allow debug in case of failures
162 if self
.output_dir
is not None:
163 shutil
.rmtree(self
.output_dir
)
164 self
.output_dir
= None
166 except Exception as e
:
167 self
.log
.exception(e
)
168 self
.fail("Exception {}".format(e
))
173 parser
= argparse
.ArgumentParser()
174 parser
.add_argument('-v', '--verbose', action
='store_true')
175 parser
.add_argument('-n', '--no-runner', action
='store_true')
176 args
, unittest_args
= parser
.parse_known_args()
180 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
182 ToscaTestCase
.log_level
= logging
.DEBUG
if args
.verbose
else logging
.WARN
184 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
186 if __name__
== '__main__':