--- /dev/null
+#!/usr/bin/env python3
+
+############################################################################
+# Copyright 2016 RIFT.io Inc #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); #
+# you may not use this file except in compliance with the License. #
+# You may obtain a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+############################################################################
+
+import argparse
+import logging
+import os
+import shutil
+import sys
+import tarfile
+import tempfile
+import unittest
+import xmlrunner
+
+import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
+
+from rift.mano.utils.compare_desc import CompareDescShell
+
+from rift.tasklets.rwlaunchpad.tosca import ExportTosca
+from rift.tasklets.rwlaunchpad.tosca import ImportTosca
+
+from rift.package.package import TarPackageArchive
+
+class PingPongDescriptors(object):
+ def __init__(self):
+ ping_vnfd, pong_vnfd, nsd = \
+ ping_pong_nsd.generate_ping_pong_descriptors(
+ pingcount=1,
+ external_vlr_count=1,
+ internal_vlr_count=0,
+ num_vnf_vms=1,
+ ping_md5sum='1234567890abcdefg',
+ pong_md5sum='1234567890abcdefg',
+ mano_ut=False,
+ use_scale_group=True,
+ use_mon_params=True,
+ use_placement_group=False,
+ use_ns_init_conf=False,
+ )
+ self.ping_pong_nsd = nsd.descriptor.nsd[0]
+ self.ping_vnfd = ping_vnfd.descriptor.vnfd[0]
+ self.pong_vnfd = pong_vnfd.descriptor.vnfd[0]
+
+
+class ToscaTestCase(unittest.TestCase):
+ """ Unittest for YANG to TOSCA and back translations
+
+ This generates the Ping Pong descrptors using the script
+ in examles and then converts it to TOSCA and back to YANG.
+ """
+ default_timeout = 0
+ top_dir = __file__[:__file__.find('/modules/core/')]
+ log_level = logging.WARN
+ log = None
+
+ @classmethod
+ def setUpClass(cls):
+ fmt = logging.Formatter(
+ '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
+ stderr_handler = logging.StreamHandler(stream=sys.stderr)
+ stderr_handler.setFormatter(fmt)
+ logging.basicConfig(level=cls.log_level)
+ cls.log = logging.getLogger('tosca-ut')
+ cls.log.addHandler(stderr_handler)
+
+ def setUp(self):
+ """Run before each test method to initialize test environment."""
+
+ super(ToscaTestCase, self).setUp()
+ self.output_dir = tempfile.mkdtemp()
+
+ def compare_dict(self, gen_d, exp_d):
+ gen = "--generated="+str(gen_d)
+ exp = "--expected="+str(exp_d)
+ CompareDescShell.compare_dicts(gen, exp, log=self.log)
+
+ def yang_to_tosca(self, descs):
+ """Convert YANG model to TOSCA model"""
+ pkg = ExportTosca(self.log)
+ nsd_id = pkg.add_nsd(descs.ping_pong_nsd)
+ pkg.add_vnfd(nsd_id, descs.ping_vnfd)
+ pkg.add_vnfd(nsd_id, descs.pong_vnfd)
+
+ return pkg.create_archive('ping_pong_nsd', self.output_dir)
+
+ def tosca_to_yang(self, tosca_file):
+ """Convert TOSCA model to YANG model"""
+ if ImportTosca.is_tosca_package(tosca_file):
+ # This could be a tosca package, try processing
+ tosca = ImportTosca(self.log, tosca_file, out_dir=self.output_dir)
+ files = tosca.translate()
+ if files is None or len(files) < 3:
+ raise ValueError("Could not process as a "
+ "TOSCA package {}: {}".format(tosca_file, files))
+ else:
+ self.log.info("Tosca package was translated successfully")
+ return files
+ else:
+ raise ValueError("Not a valid TOSCA archive: {}".
+ format(tosca_file))
+
+ def compare_descs(self, descs, yang_files):
+ """Compare the sescriptors generated with original"""
+ for yang_file in yang_files:
+ if tarfile.is_tarfile(yang_file):
+ with open(yang_file, "r+b") as tar:
+ archive = TarPackageArchive(self.log, tar)
+ pkg = archive.create_package()
+ desc_type = pkg.descriptor_type
+ if desc_type == 'nsd':
+ nsd_yang = pkg.descriptor_msg.as_dict()
+ self.compare_dict(nsd_yang,
+ descs.ping_pong_nsd.as_dict())
+ elif desc_type == 'vnfd':
+ vnfd_yang = pkg.descriptor_msg.as_dict()
+ if 'ping_vnfd' == vnfd_yang['name']:
+ self.compare_dict(vnfd_yang,
+ descs.ping_vnfd.as_dict())
+ elif 'pong_vnfd' == vnfd_yang['name']:
+ self.compare_dict(vnfd_yang,
+ descs.pong_vnfd.as_dict())
+ else:
+ raise Exception("Unknown descriptor type {} found: {}".
+ format(desc_type, pkg.files))
+ else:
+ raise Exception("Did not find a valid tar file for yang model: {}".
+ format(yang_file))
+
+ def test_output(self):
+ try:
+ # Generate the Ping Pong descriptors
+ descs = PingPongDescriptors()
+
+ # Translate the descriptors to TOSCA
+ tosca_file = self.yang_to_tosca(descs)
+
+ # Now translate back to YANG
+ yang_files = self.tosca_to_yang(tosca_file)
+
+ # Compare the generated YANG to original
+ self.compare_descs(descs, yang_files)
+
+ # Removing temp dir only on success to allow debug in case of failures
+ if self.output_dir is not None:
+ shutil.rmtree(self.output_dir)
+ self.output_dir = None
+
+ except Exception as e:
+ self.log.exception(e)
+ self.fail("Exception {}".format(e))
+
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('-n', '--no-runner', action='store_true')
+ args, unittest_args = parser.parse_known_args()
+ if args.no_runner:
+ runner = None
+ else:
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ ToscaTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+ main()