RIFT OSM R1 Initial Submission
Signed-off-by: Jeremy Mordkoff <jeremy.mordkoff@riftio.com>
diff --git a/rwlaunchpad/test/CMakeLists.txt b/rwlaunchpad/test/CMakeLists.txt
new file mode 100644
index 0000000..bd1a51e
--- /dev/null
+++ b/rwlaunchpad/test/CMakeLists.txt
@@ -0,0 +1,65 @@
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Author(s): Joshua Downer
+# Creation Date: 10/01/2015
+#
+
+cmake_minimum_required(VERSION 2.8)
+
+install(
+ PROGRAMS
+ launchpad.py
+ DESTINATION demos
+ COMPONENT ${PKG_LONG_NAME}
+ )
+
+install(
+ FILES
+ pytest/lp_test.py
+ DESTINATION
+ usr/rift/systemtest/pytest/launchpad
+ COMPONENT ${PKG_LONG_NAME}
+ )
+
+install(
+ PROGRAMS
+ launchpad_recovery
+ DESTINATION
+ usr/rift/systemtest/launchpad
+ COMPONENT ${PKG_LONG_NAME}
+ )
+
+install(
+ PROGRAMS
+ launchpad
+ DESTINATION usr/bin
+ COMPONENT rwcal-1.0
+ )
+
+rift_py3test(utest_rwmonitor
+ TEST_ARGS
+ ${CMAKE_CURRENT_SOURCE_DIR}/utest_rwmonitor.py
+ )
+
+rift_py3test(utest_rwnsm
+ TEST_ARGS
+ ${CMAKE_CURRENT_SOURCE_DIR}/utest_rwnsm.py
+ )
+
+rift_py3test(tosca_ut
+ TEST_ARGS
+ ${CMAKE_CURRENT_SOURCE_DIR}/tosca_ut.py
+ )
diff --git a/rwlaunchpad/test/launchpad b/rwlaunchpad/test/launchpad
new file mode 100644
index 0000000..6e423ac
--- /dev/null
+++ b/rwlaunchpad/test/launchpad
@@ -0,0 +1,145 @@
+#!/usr/bin/env python3
+
+import argparse
+import contextlib
+import os
+import signal
+import subprocess
+import sys
+
+import gi
+gi.require_version('RwcalYang', '1.0')
+gi.require_version('RwCal', '1.0')
+gi.require_version('RwLog', '1.0')
+
+
+TEST_PARSER = "test"
+
+
+class PyTestRunner:
+ SYS_CMD = "demos/launchpad.py -m ethsim --skip-prepare-vm -c"
+ CLOUDSIM_CMD = "cloudsim start"
+
+ @property
+ def rift_install(self):
+ return os.getenv('RIFT_INSTALL')
+
+ @property
+ def account_script(self):
+ return os.path.join(
+ self.rift_install,
+ "usr/rift/systemtest/pytest/mission_control/test_mission_control.py")
+
+ @property
+ def onboard_script(self):
+ return os.path.join(
+ self.rift_install,
+ "usr/rift/systemtest/pytest/mission_control/pingpong_vnf/test_onboard_vnf.py")
+
+ @property
+ def records_script(self):
+ return os.path.join(
+ self.rift_install,
+ "usr/rift/systemtest/pytest/mission_control/pingpong_vnf/test_records.py")
+
+ def run_cmd(self, scripts=None, cal_account="mock"):
+ scripts = scripts or [self.account_script, self.onboard_script]
+
+ cmd = "py.test -v "
+
+ # In mock-cal mode we don't need the images.
+ if cal_account == "mock":
+ cmd += "--{} --lp-standalone --network-service pingpong_noimg ".format(cal_account)
+ else:
+ cmd += "--{} --lp-standalone --network-service pingpong ".format(cal_account)
+
+ cmd += " ".join(scripts)
+ subprocess.call(cmd, shell=True)
+
+ @contextlib.contextmanager
+ def system_start(self, debug_mode=False, cal_account="mock"):
+
+
+ os.environ['LD_PRELOAD'] = os.path.join(
+ self.rift_install,
+ "usr/lib/rift/preloads/librwxercespreload.so")
+
+ sys_cmd = os.path.join(self.rift_install, self.SYS_CMD)
+ if debug_mode:
+ sys_cmd += " --mock-cli"
+
+ process = subprocess.Popen(
+ sys_cmd,
+ shell=True,
+ preexec_fn=os.setsid)
+
+ cloudsim_process = None
+ if cal_account == "lxc":
+ # If in LXC start the cloudsim server.
+ cloudsim_process = subprocess.Popen(
+ PyTestRunner.CLOUDSIM_CMD,
+ shell=True,
+ preexec_fn=os.setsid)
+
+ def kill():
+ os.killpg(process.pid, signal.SIGTERM)
+ if cloudsim_process:
+ os.killpg(cloudsim_process.pid, signal.SIGTERM)
+ cloudsim_process.wait()
+
+ process.wait()
+
+ signal.signal(signal.SIGHUP, kill)
+ signal.signal(signal.SIGTERM, kill)
+
+ yield
+
+ kill()
+
+
+def test_launchpad(args):
+ pytest = PyTestRunner()
+
+ scripts = None
+ if args.cal == "lxc":
+ scripts = [pytest.account_script, pytest.onboard_script, pytest.records_script]
+
+ with pytest.system_start(cal_account=args.cal):
+ pytest.run_cmd(scripts=scripts, cal_account=args.cal)
+
+
+def parse(arguments):
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument(
+ '--log-level', '-l',
+ default="WARNING",
+ type=str,
+ choices=["INFO", "DEBUG", "WARNING", "ERROR"],
+ help="Set log level, defaults to warning and above.")
+
+ subparsers = parser.add_subparsers()
+
+ start_parser = subparsers.add_parser(TEST_PARSER, help="Test the LP")
+ start_parser.add_argument(
+ '--cal', "-c",
+ help="Run the server in the foreground. The logs are sent to console.",
+ default="mock",
+ choices=["lxc", "mock"])
+ start_parser.set_defaults(which=TEST_PARSER)
+
+ args = parser.parse_args(arguments)
+
+ return args
+
+
+def main(args):
+
+ args = parse(args)
+
+ if args.which == TEST_PARSER:
+ test_launchpad(args)
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
\ No newline at end of file
diff --git a/rwlaunchpad/test/launchpad.py b/rwlaunchpad/test/launchpad.py
new file mode 100755
index 0000000..239f91b
--- /dev/null
+++ b/rwlaunchpad/test/launchpad.py
@@ -0,0 +1,520 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import logging
+import os
+import resource
+import socket
+import sys
+import subprocess
+import shlex
+import shutil
+import netifaces
+
+from rift.rwlib.util import certs
+import rift.rwcal.cloudsim
+import rift.rwcal.cloudsim.net
+import rift.vcs
+import rift.vcs.core as core
+import rift.vcs.demo
+import rift.vcs.vms
+
+import rift.rwcal.cloudsim
+import rift.rwcal.cloudsim.net
+
+from rift.vcs.ext import ClassProperty
+
+logger = logging.getLogger(__name__)
+
+
+class NsmTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a network services manager tasklet.
+ """
+
+ def __init__(self, name='network-services-manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a NsmTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(NsmTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwnsmtasklet')
+ plugin_name = ClassProperty('rwnsmtasklet')
+
+
+class VnsTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a network services manager tasklet.
+ """
+
+ def __init__(self, name='virtual-network-service', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a VnsTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(VnsTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwvnstasklet')
+ plugin_name = ClassProperty('rwvnstasklet')
+
+
+class VnfmTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a virtual network function manager tasklet.
+ """
+
+ def __init__(self, name='virtual-network-function-manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a VnfmTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(VnfmTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwvnfmtasklet')
+ plugin_name = ClassProperty('rwvnfmtasklet')
+
+
+class ResMgrTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a Resource Manager tasklet.
+ """
+
+ def __init__(self, name='Resource-Manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a ResMgrTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(ResMgrTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwresmgrtasklet')
+ plugin_name = ClassProperty('rwresmgrtasklet')
+
+
+class ImageMgrTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a Image Manager tasklet.
+ """
+
+ def __init__(self, name='Image-Manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a Image Manager Tasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(ImageMgrTasklet, self).__init__(
+ name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwimagemgrtasklet')
+ plugin_name = ClassProperty('rwimagemgrtasklet')
+
+
+class MonitorTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a tasklet that is used to monitor NFVI metrics.
+ """
+
+ def __init__(self, name='nfvi-metrics-monitor', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a MonitorTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+
+ """
+ super(MonitorTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwmonitor')
+ plugin_name = ClassProperty('rwmonitor')
+
+class RedisServer(rift.vcs.NativeProcess):
+ def __init__(self, name="RW.Redis.Server",
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ super(RedisServer, self).__init__(
+ name=name,
+ exe="/usr/bin/redis-server",
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ @property
+ def args(self):
+ return "./usr/bin/active_redis.conf --port 9999"
+
+
+class MonitoringParameterTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a tasklet that is used to generate monitoring
+ parameters.
+ """
+
+ def __init__(self, name='Monitoring-Parameter', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a MonitoringParameterTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+
+ """
+ super(MonitoringParameterTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwmonparam')
+ plugin_name = ClassProperty('rwmonparam')
+
+
+class AutoscalerTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a tasklet that is used to generate monitoring
+ parameters.
+ """
+
+ def __init__(self, name='Autoscaler', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a MonitoringParameterTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+
+ """
+ super(AutoscalerTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwautoscaler')
+ plugin_name = ClassProperty('rwautoscaler')
+
+
+def get_ui_ssl_args():
+ """Returns the SSL parameter string for launchpad UI processes"""
+
+ try:
+ use_ssl, certfile_path, keyfile_path = certs.get_bootstrap_cert_and_key()
+ except certs.BootstrapSslMissingException:
+ logger.error('No bootstrap certificates found. Disabling UI SSL')
+ use_ssl = False
+
+ # If we're not using SSL, no SSL arguments are necessary
+ if not use_ssl:
+ return ""
+
+ return "--enable-https --keyfile-path=%s --certfile-path=%s" % (keyfile_path, certfile_path)
+
+
+class UIServer(rift.vcs.NativeProcess):
+ def __init__(self, name="RW.MC.UI",
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ super(UIServer, self).__init__(
+ name=name,
+ exe="./usr/share/rw.ui/skyquake/scripts/launch_ui.sh",
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ @property
+ def args(self):
+ return get_ui_ssl_args()
+
+class ConfigManagerTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a Resource Manager tasklet.
+ """
+
+ def __init__(self, name='Configuration-Manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a ConfigManagerTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(ConfigManagerTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwconmantasklet')
+ plugin_name = ClassProperty('rwconmantasklet')
+
+class GlanceServer(rift.vcs.NativeProcess):
+ def __init__(self, name="glance-image-catalog",
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ super(GlanceServer, self).__init__(
+ name=name,
+ exe="./usr/bin/glance_start_wrapper",
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ @property
+ def args(self):
+ return "./etc/glance"
+
+
+class Demo(rift.vcs.demo.Demo):
+ def __init__(self, no_ui=False, ha_mode=None, mgmt_ip_list=[], test_name=None):
+ procs = [
+ ConfigManagerTasklet(),
+ GlanceServer(),
+ rift.vcs.DtsRouterTasklet(),
+ rift.vcs.MsgBrokerTasklet(),
+ rift.vcs.RestPortForwardTasklet(),
+ rift.vcs.RestconfTasklet(),
+ rift.vcs.RiftCli(),
+ rift.vcs.uAgentTasklet(),
+ rift.vcs.Launchpad(),
+ ]
+
+ standby_procs = [
+ RedisServer(),
+ rift.vcs.DtsRouterTasklet(),
+ rift.vcs.MsgBrokerTasklet(),
+ ]
+
+ datastore = core.DataStore.BDB.value
+ if ha_mode:
+ procs.append(RedisServer())
+ datastore = core.DataStore.REDIS.value
+
+ if not no_ui:
+ procs.append(UIServer())
+
+ restart_procs = [
+ VnfmTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ VnsTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ MonitorTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ MonitoringParameterTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ NsmTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ ResMgrTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ ImageMgrTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ AutoscalerTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+ ]
+
+ if not mgmt_ip_list or len(mgmt_ip_list) == 0:
+ mgmt_ip_list.append("127.0.0.1")
+
+ colony = rift.vcs.core.Colony(name='top', uid=1)
+
+ lead_lp_vm = rift.vcs.VirtualMachine(
+ name='vm-launchpad-1',
+ ip=mgmt_ip_list[0],
+ procs=procs,
+ restart_procs=restart_procs,
+ )
+ lead_lp_vm.leader = True
+ colony.append(lead_lp_vm)
+
+ if ha_mode:
+ stby_lp_vm = rift.vcs.VirtualMachine(
+ name='launchpad-vm-2',
+ ip=mgmt_ip_list[1],
+ procs=standby_procs,
+ start=False,
+ )
+ # WA to Agent mode_active flag reset
+ stby_lp_vm.add_tasklet(rift.vcs.uAgentTasklet(), mode_active=False)
+ colony.append(stby_lp_vm)
+
+ sysinfo = rift.vcs.SystemInfo(
+ mode='ethsim',
+ zookeeper=rift.vcs.manifest.RaZookeeper(master_ip=mgmt_ip_list[0]),
+ colonies=[colony],
+ multi_broker=True,
+ multi_dtsrouter=True,
+ mgmt_ip_list=mgmt_ip_list,
+ test_name=test_name,
+ )
+
+ super(Demo, self).__init__(
+ # Construct the system. This system consists of 1 cluster in 1
+ # colony. The master cluster houses CLI and management VMs
+ sysinfo = sysinfo,
+
+ # Define the generic portmap.
+ port_map = {},
+
+ # Define a mapping from the placeholder logical names to the real
+ # port names for each of the different modes supported by this demo.
+ port_names = {
+ 'ethsim': {
+ },
+ 'pci': {
+ }
+ },
+
+ # Define the connectivity between logical port names.
+ port_groups = {},
+ )
+
+
+def main(argv=sys.argv[1:]):
+ logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s')
+
+ # Create a parser which includes all generic demo arguments
+ parser = rift.vcs.demo.DemoArgParser()
+ parser.add_argument("--no-ui", action='store_true')
+ args = parser.parse_args(argv)
+
+ # Disable loading any kernel modules for the launchpad VM
+ # since it doesn't need it and it will fail within containers
+ os.environ["NO_KERNEL_MODS"] = "1"
+
+ # Remove the persistent Redis data
+ for f in os.listdir(os.environ["INSTALLDIR"]):
+ if f.endswith(".aof") or f.endswith(".rdb"):
+ os.remove(os.path.join(os.environ["INSTALLDIR"], f))
+
+ # Remove the persistant DTS recovery files
+ for f in os.listdir(os.environ["INSTALLDIR"]):
+ if f.endswith(".db"):
+ os.remove(os.path.join(os.environ["INSTALLDIR"], f))
+ try:
+ shutil.rmtree(os.path.join(os.environ["INSTALLDIR"], "zk/server-1"))
+ shutil.rmtree(os.path.join(os.environ["INSTALLDIR"], "var/rift/tmp*"))
+ except:
+ pass
+
+ ha_mode = args.ha_mode
+ mgmt_ip_list = [] if not args.mgmt_ip_list else args.mgmt_ip_list
+
+ #load demo info and create Demo object
+ demo = Demo(args.no_ui, ha_mode, mgmt_ip_list, args.test_name)
+
+ # Create the prepared system from the demo
+ system = rift.vcs.demo.prepared_system_from_demo_and_args(demo, args,
+ northbound_listing="cli_launchpad_schema_listing.txt",
+ netconf_trace_override=True)
+
+ confd_ip = socket.gethostbyname(socket.gethostname())
+ intf = netifaces.ifaddresses('eth0')
+ if intf and netifaces.AF_INET in intf and len(intf[netifaces.AF_INET]):
+ confd_ip = intf[netifaces.AF_INET][0]['addr']
+ rift.vcs.logger.configure_sink(config_file=None, confd_ip=confd_ip)
+
+ # Start the prepared system
+ system.start()
+
+
+if __name__ == "__main__":
+ resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) )
+ try:
+ main()
+ except rift.vcs.demo.ReservationError:
+ print("ERROR: unable to retrieve a list of IP addresses from the reservation system")
+ sys.exit(1)
+ except rift.vcs.demo.MissingModeError:
+ print("ERROR: you need to provide a mode to run the script")
+ sys.exit(1)
+ finally:
+ os.system("stty sane")
diff --git a/rwlaunchpad/test/launchpad_recovery b/rwlaunchpad/test/launchpad_recovery
new file mode 100755
index 0000000..eea5d4a
--- /dev/null
+++ b/rwlaunchpad/test/launchpad_recovery
@@ -0,0 +1,793 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import unittest
+import re
+import psutil
+import types
+
+import xmlrunner
+
+import gi
+gi.require_version('RwDtsToyTaskletYang', '1.0')
+gi.require_version('RwManifestYang', '1.0')
+gi.require_version('RwVcsYang', '1.0')
+
+import gi.repository.RwManifestYang as rwmanifest
+import gi.repository.RwVcsYang as rwvcs
+import gi.repository.RwDtsToyTaskletYang as toyyang
+import gi.repository.RwYang as RwYang
+import rift.auto.session
+import rift.vcs.vcs
+
+import rift.tasklets
+import rift.test.dts
+
+if sys.version_info < (3, 4, 4):
+ asyncio.ensure_future = asyncio.async
+
+class LaunchPad(rift.test.dts.AbstractDTSTest):
+ """
+ DTS GI interface unittests
+
+ Note: Each tests uses a list of asyncio.Events for staging through the
+ test. These are required here because we are bring up each coroutine
+ ("tasklet") at the same time and are not implementing any re-try
+ mechanisms. For instance, this is used in numerous tests to make sure that
+ a publisher is up and ready before the subscriber sends queries. Such
+ event lists should not be used in production software.
+ """
+ def setUp(self):
+ """
+ 1. Creates an asyncio loop
+ 2. Triggers the hook configure_test
+ """
+ def scheduler_tick(self, *args):
+ self.call_soon(self.stop)
+ self.run_forever()
+
+ # Init params: loop & timers
+ self.loop = asyncio.new_event_loop()
+
+ self.loop.scheduler_tick = types.MethodType(scheduler_tick, self.loop)
+
+ self.asyncio_timer = None
+ self.stop_timer = None
+ self.__class__.id_cnt += 1
+ self.configure_test(self.loop, self.__class__.id_cnt)
+
+ @classmethod
+ def configure_schema(cls):
+ schema = RwYang.Model.load_and_merge_schema(rwvcs.get_schema(), 'librwcal_yang_gen.so', 'Rwcal')
+ cls.model = RwYang.Model.create_libncx()
+ cls.model.load_schema_ypbc(schema)
+ xml = cls.manifest.to_xml_v2(cls.model, 1)
+ xml = re.sub('rw-manifest:', '', xml)
+ xml = re.sub('<manifest xmlns:rw-manifest="http://riftio.com/ns/riftware-1.0/rw-manifest">', '<?xml version="1.0" ?>\n<manifest xmlns="http://riftio.com/ns/riftware-1.0/rw-manifest" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://riftio.com/ns/riftware-1.0/rw-manifest ./rw-manifest.xsd">', xml)
+ xml = '\n'.join(xml.split('\n')[1:])
+ with open('lptestmanifest.xml', 'w') as f:
+ f.write(str(xml))
+ f.close()
+ return schema
+
+
+ @classmethod
+ def configure_manifest(cls):
+ manifest = rwmanifest.Manifest()
+ manifest.bootstrap_phase = rwmanifest.BootstrapPhase.from_dict({
+ "rwmgmt": {
+ "northbound_listing": [ "cli_launchpad_schema_listing.txt" ]
+ },
+ "rwtasklet": {
+ "plugin_name": "rwinit-c"
+ },
+ "rwtrace": {
+ "enable": True,
+ "level": 5,
+ },
+ "log": {
+ "enable": True,
+ "severity": 4,
+ "bootstrap_time": 30,
+ "console_severity": 4
+ },
+ "ip_addrs_list": [
+ {
+ "ip_addr": "127.0.0.1",
+ }
+ ],
+ "zookeeper": {
+ "master_ip": "127.0.0.1",
+ "unique_ports": False,
+ "zake": False
+ },
+ "serf": {
+ "start": True
+ },
+ "rwvm": {
+ "instances": [
+ {
+ "component_name": "msgbroker",
+ "config_ready": True
+ },
+ {
+ "component_name": "dtsrouter",
+ "config_ready": True
+ }
+ ]
+ },
+# "rwsecurity": {
+# "use_ssl": True,
+# "cert": "/net/mahi/localdisk/kelayath/ws/coreha/etc/ssl/current.cert",
+# "key": "/net/mahi/localdisk/kelayath/ws/coreha/etc/ssl/current.key"
+# }
+ })
+ manifest.init_phase = rwmanifest.InitPhase.from_dict({
+ "environment": {
+ "python_variable": [
+ "vm_ip_address = '127.0.0.1'",
+ "rw_component_name = 'vm-launchpad'",
+ "instance_id = 1",
+ "component_type = 'rwvm'",
+ ],
+ "component_name": "$python(rw_component_name)",
+ "instance_id": "$python(instance_id)",
+ "component_type": "$python(rw_component_type)"
+ },
+ "settings": {
+ "rwmsg": {
+ "multi_broker": {
+ "enable": False
+ }
+ },
+ "rwdtsrouter": {
+ "multi_dtsrouter": {
+ "enable": True
+ }
+ },
+ "rwvcs": {
+ "collapse_each_rwvm": False,
+ "collapse_each_rwprocess": False
+ }
+ }
+ })
+ manifest.inventory = rwmanifest.Inventory.from_dict({
+ "component": [
+ {
+ "component_name": "master",
+ "component_type": "RWCOLLECTION",
+ "rwcollection": {
+ "collection_type": "rwcolony",
+ "event_list": {
+ "event": [{
+ "name": "onentry",
+ "action": [{
+ "name": "Start vm-launchpad for master",
+ "start": {
+ "python_variable": ["vm_ip_address = '127.0.0.1'"],
+ "component_name": "vm-launchpad",
+ "instance_id": "1",
+ "config_ready": True
+ }
+ }]
+ }]
+ }
+ }
+ },
+ {
+ "component_name": "vm-launchpad",
+ "component_type": "RWVM",
+ "rwvm": {
+ "leader": True,
+ "event_list": {
+ "event": [{
+ "name": "onentry",
+ "action": [
+ {
+ "name": "Start the master",
+ "start": {
+ "component_name": "master",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+# {
+# "name": "Start the RW.CLI",
+# "start": {
+# "component_name": "RW.CLI",
+# "recovery_action": "RESTART",
+# "config_ready": True
+# }
+# },
+ {
+ "name": "Start the RW.Proc_1.Restconf",
+ "start": {
+ "component_name": "RW.Proc_1.Restconf",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+# {
+# "name": "Start the RW.Proc_2.RestPortForward",
+# "start": {
+# "component_name": "RW.Proc_2.RestPortForward",
+# "recovery_action": "RESTART",
+# "config_ready": True
+# }
+# },
+ {
+ "name": "Start the RW.Proc_3.CalProxy",
+ "start": {
+ "component_name": "RW.Proc_3.CalProxy",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.Proc_4.nfvi-metrics-monitor",
+ "start": {
+ "component_name": "RW.Proc_4.nfvi-metrics-monitor",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.Proc_5.network-services-manager",
+ "start": {
+ "component_name": "RW.Proc_5.network-services-manager",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.Proc_6.virtual-network-function-manager",
+ "start": {
+ "component_name": "RW.Proc_6.virtual-network-function-manager",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.Proc_7.virtual-network-service",
+ "start": {
+ "component_name": "RW.Proc_7.virtual-network-service",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.Proc_8.nfvi-metrics-monitor",
+ "start": {
+ "component_name": "RW.Proc_8.nfvi-metrics-monitor",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.MC.UI",
+ "start": {
+ "component_name": "RW.MC.UI",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+# {
+# "name": "Start the RW.COMPOSER.UI",
+# "start": {
+# "component_name": "RW.COMPOSER.UI",
+# "config_ready": True
+# }
+# },
+ {
+ "name": "Start the RW.Proc_10.launchpad",
+ "start": {
+ "component_name": "RW.Proc_10.launchpad",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.Proc_11.Resource-Manager",
+ "start": {
+ "component_name": "RW.Proc_11.Resource-Manager",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the RW.uAgent",
+ "start": {
+ "python_variable": ["cmdargs_str = '--confd-proto AF_INET --confd-ip 127.0.0.1'"],
+ "component_name": "RW.uAgent",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ },
+ {
+ "name": "Start the logd",
+ "start": {
+ "component_name": "logd",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }
+ }
+ ]
+ }]
+ }
+ }
+ },
+# {
+# "component_name": "RW.CLI",
+# "component_type": "PROC",
+# "native_proc": {
+# "exe_path": "./usr/bin/rwcli",
+# "args": "--netconf_host 127.0.0.1 --netconf_port 2022 --schema_listing cli_launchpad_schema_listing.txt",
+# }
+# },
+ {
+ "component_name": "RW.Proc_1.Restconf",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start RW.Restconf for RW.Proc_1.Restconf",
+ "component_name": "RW.Restconf",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "RW.Restconf",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/restconf",
+ "plugin_name": "restconf"
+ }
+ },
+# {
+# "component_name": "RW.Proc_2.RestPortForward",
+# "component_type": "RWPROC",
+# "rwproc": {
+# "tasklet": [{
+# "name": "Start RW.RestPortForward for RW.Proc_2.RestPortForward",
+# "component_name": "RW.RestPortForward",
+# "recovery_action": "RESTART",
+# "config_ready": True
+# }]
+# }
+# },
+# {
+# "component_name": "RW.RestPortForward",
+# "component_type": "RWTASKLET",
+# "rwtasklet": {
+# "plugin_directory": "./usr/lib/rift/plugins/restportforward",
+# "plugin_name": "restportforward"
+# }
+# },
+ {
+ "component_name": "RW.Proc_3.CalProxy",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start RW.CalProxy for RW.Proc_3.CalProxy",
+ "component_name": "RW.CalProxy",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "RW.CalProxy",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwcalproxytasklet",
+ "plugin_name": "rwcalproxytasklet"
+ }
+ },
+ {
+ "component_name": "RW.Proc_4.nfvi-metrics-monitor",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start nfvi-metrics-monitor for RW.Proc_4.nfvi-metrics-monitor",
+ "component_name": "nfvi-metrics-monitor",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "nfvi-metrics-monitor",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwmonitor",
+ "plugin_name": "rwmonitor"
+ }
+ },
+ {
+ "component_name": "RW.Proc_5.network-services-manager",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start network-services-manager for RW.Proc_5.network-services-manager",
+ "component_name": "network-services-manager",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "network-services-manager",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwnsmtasklet",
+ "plugin_name": "rwnsmtasklet"
+ }
+ },
+ {
+ "component_name": "RW.Proc_6.virtual-network-function-manager",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start virtual-network-function-manager for RW.Proc_6.virtual-network-function-manager",
+ "component_name": "virtual-network-function-manager",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "virtual-network-function-manager",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwvnfmtasklet",
+ "plugin_name": "rwvnfmtasklet"
+ }
+ },
+ {
+ "component_name": "RW.Proc_7.virtual-network-service",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start virtual-network-service for RW.Proc_7.virtual-network-service",
+ "component_name": "virtual-network-service",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "virtual-network-service",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwvnstasklet",
+ "plugin_name": "rwvnstasklet"
+ }
+ },
+ {
+ "component_name": "RW.Proc_8.nfvi-metrics-monitor",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start nfvi-metrics-monitor for RW.Proc_8.nfvi-metrics-monitor",
+ "component_name": "nfvi-metrics-monitor",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "RW.MC.UI",
+ "component_type": "PROC",
+ "native_proc": {
+ "exe_path": "./usr/share/rw.ui/skyquake/scripts/launch_ui.sh",
+ }
+ },
+ {
+ "component_name": "RW.COMPOSER.UI",
+ "component_type": "PROC",
+ "native_proc": {
+ "exe_path": "./usr/share/composer/scripts/launch_composer.sh",
+ }
+ },
+ {
+ "component_name": "RW.Proc_9.Configuration-Manager",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start Configuration-Manager for RW.Proc_9.Configuration-Manager",
+ "component_name": "Configuration-Manager",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "Configuration-Manager",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwconmantasklet",
+ "plugin_name": "rwconmantasklet"
+ }
+ },
+ {
+ "component_name": "RW.Proc_10.launchpad",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start launchpad for RW.Proc_10.launchpad",
+ "component_name": "launchpad",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "launchpad",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwlaunchpad",
+ "plugin_name": "rwlaunchpad"
+ }
+ },
+ {
+ "component_name": "RW.Proc_11.Resource-Manager",
+ "component_type": "RWPROC",
+ "rwproc": {
+ "tasklet": [{
+ "name": "Start Resource-Manager for RW.Proc_11.Resource-Manager",
+ "component_name": "Resource-Manager",
+ "recovery_action": "RESTART",
+ "config_ready": True
+ }]
+ }
+ },
+ {
+ "component_name": "Resource-Manager",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwresmgrtasklet",
+ "plugin_name": "rwresmgrtasklet"
+ }
+ },
+ {
+ "component_name": "RW.uAgent",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwuagent-c",
+ "plugin_name": "rwuagent-c"
+ }
+ },
+ {
+ "component_name": "logd",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwlogd-c",
+ "plugin_name": "rwlogd-c"
+ }
+ },
+ {
+ "component_name": "msgbroker",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwmsgbroker-c",
+ "plugin_name": "rwmsgbroker-c"
+ }
+ },
+ {
+ "component_name": "dtsrouter",
+ "component_type": "RWTASKLET",
+ "rwtasklet": {
+ "plugin_directory": "./usr/lib/rift/plugins/rwdtsrouter-c",
+ "plugin_name": "rwdtsrouter-c"
+ }
+ }
+ ]
+ })
+ return manifest
+
+ def tearDown(self):
+ tasklist = { 'reaperd',
+ 'rwlogd-report-c',
+ 'launch_ui.sh' }
+ for proc in psutil.process_iter():
+ if proc.name() in tasklist:
+ print("killing", proc.name())
+ try:
+ proc.kill()
+ except:
+ print(proc.name(), "no longer exists")
+ self.loop.stop()
+ self.loop.close()
+
+
+class LaunchPadTest(LaunchPad):
+ """
+ DTS GI interface unittests
+
+ Note: Each tests uses a list of asyncio.Events for staging through the
+ test. These are required here because we are bring up each coroutine
+ ("tasklet") at the same time and are not implementing any re-try
+ mechanisms. For instance, this is used in numerous tests to make sure that
+ a publisher is up and ready before the subscriber sends queries. Such
+ event lists should not be used in production software.
+ """
+ @asyncio.coroutine
+ def inventory(self):
+ res_iter = yield from self.dts_mgmt.query_read('/rw-base:vcs/rw-base:info', flags=0)
+ for i in res_iter:
+ info_result = yield from i
+ components = info_result.result.components.component_info
+ recvd_list = {}
+ for component in components:
+ recvd_list[component.component_name] = (component.instance_id,
+ component.rwcomponent_parent,
+ component.component_type,
+ component.state)
+ return recvd_list
+
+ @asyncio.coroutine
+ def issue_vcrash(self, component_type):
+# critical_components = {'msgbroker', 'dtsrouter'}
+ critical_components = {'msgbroker', 'dtsrouter', 'RW.uAgent'}
+ comp_inventory = yield from self.inventory()
+ for component in comp_inventory:
+ if ((comp_inventory[component])[2] == component_type):
+ inst = (comp_inventory[component])[0]
+ if (component in critical_components):
+ print(component, 'Marked as CRITICAL - Not restarting')
+ else:
+ print('Crashing ', component_type,component)
+ vcrash_input = rwvcs.VCrashInput(instance_name=component+'-'+str(inst))
+ query_iter = yield from self.dts_mgmt.query_rpc( xpath="/rw-vcs:vcrash",
+ flags=0, msg=vcrash_input)
+ yield from asyncio.sleep(1, loop=self.loop)
+ restarted_inventory = yield from self.inventory()
+ self.assertTrue(restarted_inventory[component][3] != 'TO_RECOVER')
+
+ def test_launch_pad(self):
+ """
+ Verify the launchpad setup functions
+ The test will progress through stages defined by the events list:
+ 0: mission_control setup is brought up
+ 2: Tasklet/PROC/VM restarts tested to confirm recovery is proper
+ """
+
+ print("{{{{{{{{{{{{{{{{{{{{STARTING - mano recovery test")
+# confd_host="127.0.0.1"
+
+ events = [asyncio.Event(loop=self.loop) for _ in range(2)]
+
+ @asyncio.coroutine
+ def sub():
+
+ tinfo = self.new_tinfo('sub')
+ self.dts_mgmt = rift.tasklets.DTS(tinfo, self.schema, self.loop)
+
+ # Sleep for DTS registrations to complete
+ print('.........................................................')
+ print('........SLEEPING 80 seconds for system to come up........')
+ yield from asyncio.sleep(80, loop=self.loop)
+ print('........RESUMING........')
+
+ @asyncio.coroutine
+ def issue_vstop(component,inst,flag=0):
+ vstop_input = rwvcs.VStopInput(instance_name=component+'-'+(str(inst)))
+ query_iter = yield from self.dts_mgmt.query_rpc( xpath="/rw-vcs:vstop",
+ flags=flag, msg=vstop_input)
+ yield from asyncio.sleep(1, loop=self.loop)
+
+
+
+ @asyncio.coroutine
+ def issue_vstart(component, parent, recover=False):
+ vstart_input = rwvcs.VStartInput()
+ vstart_input.component_name = component
+ vstart_input.parent_instance = parent
+ vstart_input.recover = recover
+ query_iter = yield from self.dts_mgmt.query_rpc( xpath="/rw-vcs:vstart",
+ flags=0, msg=vstart_input)
+ yield from asyncio.sleep(1, loop=self.loop)
+
+ @asyncio.coroutine
+ def issue_start_stop(comp_inventory, component_type):
+# critical_components = {'msgbroker', 'dtsrouter'}
+ critical_components = {'msgbroker', 'dtsrouter', 'RW.uAgent'}
+ for component in comp_inventory:
+ if ((comp_inventory[component])[2] == component_type):
+ inst = (comp_inventory[component])[0]
+ parent = (comp_inventory[component])[1]
+ if (component in critical_components):
+ print(component, 'Marked as CRITICAL - Not restarting')
+ else:
+ print('Stopping ', component_type,component)
+ yield from issue_vstop(component,inst)
+ restarted_inventory = yield from self.inventory()
+# self.assertEqual(restarted_inventory[component][3],'TO_RECOVER')
+ print('Starting ',component_type,component)
+ yield from issue_vstart(component, parent, recover=True)
+ restarted_inventory = yield from self.inventory()
+ self.assertTrue(restarted_inventory[component][3] != 'TO_RECOVER')
+
+ yield from asyncio.sleep(20, loop=self.loop)
+ comp_inventory = yield from self.inventory()
+ yield from issue_start_stop(comp_inventory, 'RWTASKLET')
+# yield from issue_start_stop(comp_inventory, 'RWPROC')
+# yield from self.issue_vcrash('RWTASKLET')
+
+ yield from asyncio.sleep(20, loop=self.loop)
+ restarted_inventory = yield from self.inventory()
+# critical_components = {'msgbroker', 'dtsrouter', 'RW.uAgent'}
+ for comp in comp_inventory:
+ self.assertEqual(str(comp_inventory[comp]), str(restarted_inventory[comp]))
+# if (comp not in critical_components):
+# inst = (comp_inventory[comp])[0]
+# yield from issue_vstop(comp,inst)
+
+ events[1].set()
+
+ asyncio.ensure_future(sub(), loop=self.loop)
+ self.run_until(events[1].is_set, timeout=260)
+
+
+def main():
+ plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins")
+ if 'DTS_TEST_PUB_DIR' not in os.environ:
+ os.environ['DTS_TEST_PUB_DIR'] = os.path.join(plugin_dir, 'dtstestpub')
+
+ if 'RIFT_NO_SUDO_REAPER' not in os.environ:
+ os.environ['RIFT_NO_SUDO_REAPER'] = '1'
+
+ if 'MESSAGE_BROKER_DIR' not in os.environ:
+ os.environ['MESSAGE_BROKER_DIR'] = os.path.join(plugin_dir, 'rwmsgbroker-c')
+
+ if 'ROUTER_DIR' not in os.environ:
+ os.environ['ROUTER_DIR'] = os.path.join(plugin_dir, 'rwdtsrouter-c')
+
+ if 'RW_VAR_RIFT' not in os.environ:
+ os.environ['RW_VAR_RIFT'] = '1'
+
+ if 'INSTALLDIR' in os.environ:
+ os.chdir(os.environ.get('INSTALLDIR'))
+
+# if 'RWMSG_BROKER_SHUNT' not in os.environ:
+# os.environ['RWMSG_BROKER_SHUNT'] = '1'
+
+ if 'TEST_ENVIRON' not in os.environ:
+ os.environ['TEST_ENVIRON'] = '1'
+
+ if 'RW_MANIFEST' not in os.environ:
+ os.environ['RW_MANIFEST'] = os.path.join(install_dir, 'lptestmanifest.xml')
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ args, _ = parser.parse_known_args()
+
+
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+ unittest.main(testRunner=runner)
+
+if __name__ == '__main__':
+ main()
+
+# vim: sw=4
diff --git a/rwlaunchpad/test/mano_error_ut.py b/rwlaunchpad/test/mano_error_ut.py
new file mode 100755
index 0000000..e593cee
--- /dev/null
+++ b/rwlaunchpad/test/mano_error_ut.py
@@ -0,0 +1,898 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import time
+import unittest
+import uuid
+
+import xmlrunner
+
+import gi.repository.RwDts as rwdts
+import gi.repository.RwNsmYang as rwnsmyang
+import gi.repository.RwResourceMgrYang as RwResourceMgrYang
+import gi.repository.RwLaunchpadYang as launchpadyang
+import rift.tasklets
+import rift.test.dts
+
+import mano_ut
+
+
+if sys.version_info < (3, 4, 4):
+ asyncio.ensure_future = asyncio.async
+
+
+class OutOfResourceError(Exception):
+ pass
+
+
+class ComputeResourceRequestMockEventHandler(object):
+ def __init__(self):
+ self._pool_name = "vm_pool"
+ self._vdu_id = str(uuid.uuid4())
+ self._vdu_info = {
+ "vdu_id": self._vdu_id,
+ "state": "active",
+ "management_ip": "1.1.1.1",
+ "public_ip": "1.1.1.1",
+ "connection_points": [],
+ }
+
+ self._resource_state = "active"
+
+ self._event_id = None
+ self._request_info = None
+
+ def allocate(self, event_id, request_info):
+ self._event_id = event_id
+ self._request_info = request_info
+
+ self._vdu_info.update({
+ "name": self._request_info.name,
+ "flavor_id": self._request_info.flavor_id,
+ "image_id": self._request_info.image_id,
+ })
+
+ for cp in request_info.connection_points:
+ info_cp = dict(
+ name=cp.name,
+ virtual_link_id=cp.virtual_link_id,
+ vdu_id=self._vdu_id,
+ state="active",
+ ip_address="1.2.3.4",
+ )
+ info_cp = self._vdu_info["connection_points"].append(info_cp)
+
+ @property
+ def event_id(self):
+ return self._event_id
+
+ @property
+ def resource_state(self):
+ return self._resource_state
+
+ def set_active(self):
+ self._resource_state = "active"
+
+ def set_failed(self):
+ self._resource_state = "failed"
+
+ def set_pending(self):
+ self._resource_state = "pending"
+
+ @property
+ def response_msg(self):
+ resource_info = dict(
+ pool_name=self._pool_name,
+ resource_state=self.resource_state,
+ )
+ resource_info.update(self._vdu_info)
+
+ response = RwResourceMgrYang.VDUEventData.from_dict(dict(
+ event_id=self._event_id,
+ request_info=self._request_info.as_dict(),
+ resource_info=resource_info,
+ ))
+
+ return response.resource_info
+
+
+class NetworkResourceRequestMockEventHandler(object):
+ def __init__(self):
+ self._pool_name = "network_pool"
+ self._link_id = str(uuid.uuid4())
+ self._link_info = {
+ "virtual_link_id": self._link_id,
+ "state": "active",
+ }
+
+ self._resource_state = "active"
+
+ self._event_id = None
+ self._request_info = None
+
+ def allocate(self, event_id, request_info):
+ self._event_id = event_id
+ self._request_info = request_info
+
+ self._link_info.update({
+ "name": self._request_info.name,
+ "subnet": self._request_info.subnet,
+ })
+
+ @property
+ def event_id(self):
+ return self._event_id
+
+ @property
+ def resource_state(self):
+ return self._resource_state
+
+ def set_active(self):
+ self._resource_state = "active"
+
+ def set_failed(self):
+ self._resource_state = "failed"
+
+ def set_pending(self):
+ self._resource_state = "pending"
+
+ @property
+ def response_msg(self):
+ resource_info = dict(
+ pool_name=self._pool_name,
+ resource_state=self.resource_state,
+ )
+ resource_info.update(self._link_info)
+
+ response = RwResourceMgrYang.VirtualLinkEventData.from_dict(dict(
+ event_id=self._event_id,
+ request_info=self._request_info.as_dict(),
+ resource_info=resource_info,
+ ))
+
+ return response.resource_info
+
+
+class ResourceMgrMock(object):
+ VDU_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
+ VLINK_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
+
+ def __init__(self, dts, log, loop):
+ self._log = log
+ self._dts = dts
+ self._loop = loop
+ self._vdu_reg = None
+ self._link_reg = None
+
+ self._vdu_reg_event = asyncio.Event(loop=self._loop)
+ self._link_reg_event = asyncio.Event(loop=self._loop)
+
+ self._available_compute_handlers = []
+ self._available_network_handlers = []
+
+ self._used_compute_handlers = {}
+ self._used_network_handlers = {}
+
+ self._compute_allocate_requests = 0
+ self._network_allocate_requests = 0
+
+ self._registered = False
+
+ def _allocate_virtual_compute(self, event_id, request_info):
+ self._compute_allocate_requests += 1
+
+ if not self._available_compute_handlers:
+ raise OutOfResourceError("No more compute handlers")
+
+ handler = self._available_compute_handlers.pop()
+ handler.allocate(event_id, request_info)
+ self._used_compute_handlers[event_id] = handler
+
+ return handler.response_msg
+
+ def _allocate_virtual_network(self, event_id, request_info):
+ self._network_allocate_requests += 1
+
+ if not self._available_network_handlers:
+ raise OutOfResourceError("No more network handlers")
+
+ handler = self._available_network_handlers.pop()
+ handler.allocate(event_id, request_info)
+ self._used_network_handlers[event_id] = handler
+
+ return handler.response_msg
+
+ def _release_virtual_network(self, event_id):
+ del self._used_network_handlers[event_id]
+
+ def _release_virtual_compute(self, event_id):
+ del self._used_compute_handlers[event_id]
+
+ def _read_virtual_network(self, event_id):
+ return self._used_network_handlers[event_id].response_msg
+
+ def _read_virtual_compute(self, event_id):
+ return self._used_compute_handlers[event_id].response_msg
+
+ @asyncio.coroutine
+ def on_link_request_prepare(self, xact_info, action, ks_path, request_msg):
+ if not self._registered:
+ self._log.error("Got a prepare callback when not registered!")
+ xact_info.respond_xpath(rwdts.XactRspCode.NA)
+ return
+
+ self._log.debug("Received virtual-link on_prepare callback (self: %s, xact_info: %s, action: %s): %s",
+ self, xact_info, action, request_msg)
+
+ response_info = None
+ response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+
+ schema = RwResourceMgrYang.VirtualLinkEventData().schema()
+ pathentry = schema.keyspec_to_entry(ks_path)
+
+ if action == rwdts.QueryAction.CREATE:
+ response_info = self._allocate_virtual_network(
+ pathentry.key00.event_id,
+ request_msg.request_info,
+ )
+
+ elif action == rwdts.QueryAction.DELETE:
+ self._release_virtual_network(pathentry.key00.event_id)
+
+ elif action == rwdts.QueryAction.READ:
+ response_info = self._read_virtual_network(
+ pathentry.key00.event_id
+ )
+ else:
+ raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
+
+ self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
+ response_xpath, response_info)
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
+
+ @asyncio.coroutine
+ def on_vdu_request_prepare(self, xact_info, action, ks_path, request_msg):
+ if not self._registered:
+ self._log.error("Got a prepare callback when not registered!")
+ xact_info.respond_xpath(rwdts.XactRspCode.NA)
+ return
+
+ @asyncio.coroutine
+ def monitor_vdu_state(response_xpath, pathentry):
+ self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
+ loop_cnt = 120
+ while loop_cnt > 0:
+ self._log.debug("VDU state monitoring: Sleeping for 1 second ")
+ yield from asyncio.sleep(1, loop = self._loop)
+ try:
+ response_info = self._read_virtual_compute(
+ pathentry.key00.event_id
+ )
+ except Exception as e:
+ self._log.error(
+ "VDU state monitoring: Received exception %s "
+ "in VDU state monitoring for %s. Aborting monitoring",
+ str(e), response_xpath
+ )
+ raise
+
+ if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
+ self._log.info(
+ "VDU state monitoring: VDU reached terminal state."
+ "Publishing VDU info: %s at path: %s",
+ response_info, response_xpath
+ )
+ yield from self._dts.query_update(response_xpath,
+ rwdts.XactFlag.ADVISE,
+ response_info)
+ return
+ else:
+ loop_cnt -= 1
+
+ ### End of while loop. This is only possible if VDU did not reach active state
+ self._log.info("VDU state monitoring: VDU at xpath :%s did not reached active state in 120 seconds. Aborting monitoring",
+ response_xpath)
+ response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
+ response_info.resource_state = 'failed'
+ yield from self._dts.query_update(response_xpath,
+ rwdts.XactFlag.ADVISE,
+ response_info)
+ return
+
+ self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
+ xact_info, action, request_msg)
+
+ response_info = None
+ response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
+
+ schema = RwResourceMgrYang.VDUEventData().schema()
+ pathentry = schema.keyspec_to_entry(ks_path)
+
+ if action == rwdts.QueryAction.CREATE:
+ response_info = self._allocate_virtual_compute(
+ pathentry.key00.event_id,
+ request_msg.request_info,
+ )
+ if response_info.resource_state == 'pending':
+ asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
+ loop = self._loop)
+
+ elif action == rwdts.QueryAction.DELETE:
+ self._release_virtual_compute(
+ pathentry.key00.event_id
+ )
+
+ elif action == rwdts.QueryAction.READ:
+ response_info = self._read_virtual_compute(
+ pathentry.key00.event_id
+ )
+ else:
+ raise ValueError("Only create/delete actions available. Received action: %s" %(action))
+
+ self._log.debug("Responding with VDUInfo at xpath %s: %s",
+ response_xpath, response_info)
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
+
+ @asyncio.coroutine
+ def register(self):
+ @asyncio.coroutine
+ def on_request_ready(registration, status):
+ self._log.debug("Got request ready event (registration: %s) (status: %s)",
+ registration, status)
+
+ if registration == self._link_reg:
+ self._link_reg_event.set()
+ elif registration == self._vdu_reg:
+ self._vdu_reg_event.set()
+ else:
+ self._log.error("Unknown registration ready event: %s", registration)
+
+
+ with self._dts.group_create() as group:
+ self._log.debug("Registering for Link Resource Request using xpath: %s",
+ ResourceMgrMock.VLINK_REQUEST_XPATH)
+
+ self._link_reg = group.register(
+ xpath=ResourceMgrMock.VLINK_REQUEST_XPATH,
+ handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+ on_prepare=self.on_link_request_prepare),
+ flags=rwdts.Flag.PUBLISHER)
+
+ self._log.debug("Registering for VDU Resource Request using xpath: %s",
+ ResourceMgrMock.VDU_REQUEST_XPATH)
+
+ self._vdu_reg = group.register(
+ xpath=ResourceMgrMock.VDU_REQUEST_XPATH,
+ handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
+ on_prepare=self.on_vdu_request_prepare),
+ flags=rwdts.Flag.PUBLISHER)
+
+ self._registered = True
+
+ def unregister(self):
+ self._link_reg.deregister()
+ self._vdu_reg.deregister()
+ self._registered = False
+
+ @asyncio.coroutine
+ def wait_ready(self, timeout=5):
+ self._log.debug("Waiting for all request registrations to become ready.")
+ yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
+ timeout=timeout, loop=self._loop)
+
+ def create_compute_mock_event_handler(self):
+ handler = ComputeResourceRequestMockEventHandler()
+ self._available_compute_handlers.append(handler)
+
+ return handler
+
+ def create_network_mock_event_handler(self):
+ handler = NetworkResourceRequestMockEventHandler()
+ self._available_network_handlers.append(handler)
+
+ return handler
+
+ @property
+ def num_compute_requests(self):
+ return self._compute_allocate_requests
+
+ @property
+ def num_network_requests(self):
+ return self._network_allocate_requests
+
+ @property
+ def num_allocated_compute_resources(self):
+ return len(self._used_compute_handlers)
+
+ @property
+ def num_allocated_network_resources(self):
+ return len(self._used_network_handlers)
+
+
+@unittest.skip('failing and needs rework')
+class ManoErrorTestCase(rift.test.dts.AbstractDTSTest):
+ """
+ DTS GI interface unittests
+
+ Note: Each tests uses a list of asyncio.Events for staging through the
+ test. These are required here because we are bring up each coroutine
+ ("tasklet") at the same time and are not implementing any re-try
+ mechanisms. For instance, this is used in numerous tests to make sure that
+ a publisher is up and ready before the subscriber sends queries. Such
+ event lists should not be used in production software.
+ """
+
+ @classmethod
+ def configure_suite(cls, rwmain):
+ plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins")
+ rwmain.add_tasklet(
+ os.path.join(plugin_dir, 'rwvns'),
+ 'rwvnstasklet'
+ )
+
+ rwmain.add_tasklet(
+ os.path.join(plugin_dir, 'rwvnfm'),
+ 'rwvnfmtasklet'
+ )
+
+ rwmain.add_tasklet(
+ os.path.join(plugin_dir, 'rwnsm'),
+ 'rwnsmtasklet'
+ )
+
+ cls.waited_for_tasklets = False
+
+ @asyncio.coroutine
+ def register_mock_res_mgr(self):
+ self.res_mgr = ResourceMgrMock(
+ self.dts,
+ self.log,
+ self.loop,
+ )
+ yield from self.res_mgr.register()
+
+ self.log.info("Waiting for resource manager to be ready")
+ yield from self.res_mgr.wait_ready()
+
+ def unregister_mock_res_mgr(self):
+ self.res_mgr.unregister()
+
+ @classmethod
+ def configure_schema(cls):
+ return rwnsmyang.get_schema()
+
+ @classmethod
+ def configure_timeout(cls):
+ return 240
+
+ @asyncio.coroutine
+ def wait_tasklets(self):
+ if not ManoErrorTestCase.waited_for_tasklets:
+ yield from asyncio.sleep(5, loop=self.loop)
+ ManoErrorTestCase.waited_for_tasklets = True
+
+ @asyncio.coroutine
+ def publish_desciptors(self, num_external_vlrs=1, num_internal_vlrs=1, num_ping_vms=1):
+ yield from self.ping_pong.publish_desciptors(
+ num_external_vlrs,
+ num_internal_vlrs,
+ num_ping_vms
+ )
+
+ def unpublish_descriptors(self):
+ self.ping_pong.unpublish_descriptors()
+
+ @asyncio.coroutine
+ def wait_until_nsr_active_or_failed(self, nsr_id, timeout_secs=20):
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+ nsrs = yield from self.querier.get_nsr_opdatas(nsr_id)
+ self.assertEqual(1, len(nsrs))
+ if nsrs[0].operational_status in ['running', 'failed']:
+ return
+
+ self.log.debug("Rcvd NSR with %s status", nsrs[0].operational_status)
+ yield from asyncio.sleep(2, loop=self.loop)
+
+ self.assertIn(nsrs[0].operational_status, ['running', 'failed'])
+
+ def verify_number_compute_requests(self, num_requests):
+ self.assertEqual(num_requests, self.res_mgr.num_compute_requests)
+
+ def verify_number_network_requests(self, num_requests):
+ self.assertEqual(num_requests, self.res_mgr.num_network_requests)
+
+ def verify_number_allocated_compute(self, num_allocated):
+ self.assertEqual(num_allocated, self.res_mgr.num_allocated_compute_resources)
+
+ def verify_number_allocated_network(self, num_allocated):
+ self.assertEqual(num_allocated, self.res_mgr.num_allocated_network_resources)
+
+ def allocate_network_handlers(self, num_networks):
+ return [self.res_mgr.create_network_mock_event_handler() for _ in range(num_networks)]
+
+ def allocate_compute_handlers(self, num_computes):
+ return [self.res_mgr.create_compute_mock_event_handler() for _ in range(num_computes)]
+
+ @asyncio.coroutine
+ def create_mock_launchpad_tasklet(self):
+ yield from mano_ut.create_mock_launchpad_tasklet(self.log, self.dts)
+
+ def configure_test(self, loop, test_id):
+ self.log.debug("STARTING - %s", self.id())
+ self.tinfo = self.new_tinfo(self.id())
+ self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+ self.ping_pong = mano_ut.PingPongDescriptorPublisher(self.log, self.loop, self.dts)
+ self.querier = mano_ut.ManoQuerier(self.log, self.dts)
+
+ # Add a task to wait for tasklets to come up
+ asyncio.ensure_future(self.wait_tasklets(), loop=self.loop)
+
+ @rift.test.dts.async_test
+ def test_fail_first_nsm_vlr(self):
+ yield from self.publish_desciptors(num_external_vlrs=2)
+ yield from self.register_mock_res_mgr()
+
+ nsr_id = yield from self.ping_pong.create_nsr()
+ yield from self.wait_until_nsr_active_or_failed(nsr_id)
+
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 1)
+ yield from self.verify_nsr_state(nsr_id, "failed")
+ yield from self.verify_num_vlrs(1)
+ yield from self.verify_num_nsr_vlrs(nsr_id, 2)
+ yield from self.verify_num_vnfrs(0)
+
+ nsr_vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ yield from self.verify_vlr_state(nsr_vlrs[0], "failed")
+
+ self.verify_number_network_requests(1)
+ self.verify_number_compute_requests(0)
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ yield from self.terminate_nsr(nsr_id)
+
+ yield from self.verify_nsr_deleted(nsr_id)
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 0)
+ yield from self.verify_num_vlrs(0)
+
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ self.unregister_mock_res_mgr()
+ self.unpublish_descriptors()
+
+ @rift.test.dts.async_test
+ def test_fail_second_nsm_vlr(self):
+ yield from self.publish_desciptors(num_external_vlrs=2)
+ yield from self.register_mock_res_mgr()
+ self.allocate_network_handlers(1)
+
+ nsr_id = yield from self.ping_pong.create_nsr()
+ yield from self.wait_until_nsr_active_or_failed(nsr_id)
+
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 1)
+ yield from self.verify_nsr_state(nsr_id, "failed")
+ yield from self.verify_num_vlrs(2)
+ yield from self.verify_num_nsr_vlrs(nsr_id, 2)
+
+ nsr_vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ yield from self.verify_vlr_state(nsr_vlrs[0], "running")
+ yield from self.verify_vlr_state(nsr_vlrs[1], "failed")
+
+ self.verify_number_network_requests(2)
+ self.verify_number_compute_requests(0)
+ self.verify_number_allocated_network(1)
+ self.verify_number_allocated_compute(0)
+
+ yield from self.terminate_nsr(nsr_id)
+
+ yield from self.verify_nsr_deleted(nsr_id)
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 0)
+ yield from self.verify_num_vlrs(0)
+
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ self.unregister_mock_res_mgr()
+ self.unpublish_descriptors()
+
+ @rift.test.dts.async_test
+ def test_fail_first_vnf_first_vlr(self):
+ yield from self.publish_desciptors(num_internal_vlrs=2)
+ yield from self.register_mock_res_mgr()
+ self.allocate_network_handlers(1)
+
+ nsr_id = yield from self.ping_pong.create_nsr()
+ yield from self.wait_until_nsr_active_or_failed(nsr_id)
+
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 1)
+ yield from self.verify_nsr_state(nsr_id, "failed")
+ yield from self.verify_num_vlrs(2)
+ yield from self.verify_num_nsr_vlrs(nsr_id, 1)
+
+ nsr_vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ yield from self.verify_vlr_state(nsr_vlrs[0], "running")
+
+ yield from self.verify_num_nsr_vnfrs(nsr_id, 2)
+
+ # Verify only a single vnfr was instantiated and is failed
+ yield from self.verify_num_vnfrs(1)
+ nsr_vnfs = yield from self.get_nsr_vnfs(nsr_id)
+ yield from self.verify_vnf_state(nsr_vnfs[0], "failed")
+
+ yield from self.verify_num_vnfr_vlrs(nsr_vnfs[0], 2)
+ vnf_vlrs = yield from self.get_vnf_vlrs(nsr_vnfs[0])
+ yield from self.verify_vlr_state(vnf_vlrs[0], "failed")
+
+ self.verify_number_network_requests(2)
+ self.verify_number_compute_requests(0)
+ self.verify_number_allocated_network(1)
+ self.verify_number_allocated_compute(0)
+
+ yield from self.terminate_nsr(nsr_id)
+
+ yield from self.verify_nsr_deleted(nsr_id)
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 0)
+ yield from self.verify_num_vlrs(0)
+
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ self.unregister_mock_res_mgr()
+ self.unpublish_descriptors()
+
+ @rift.test.dts.async_test
+ def test_fail_first_vnf_second_vlr(self):
+ yield from self.publish_desciptors(num_internal_vlrs=2)
+ yield from self.register_mock_res_mgr()
+ self.allocate_network_handlers(2)
+
+ nsr_id = yield from self.ping_pong.create_nsr()
+ yield from self.wait_until_nsr_active_or_failed(nsr_id)
+
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 1)
+ yield from self.verify_nsr_state(nsr_id, "failed")
+ yield from self.verify_num_vlrs(3)
+ yield from self.verify_num_nsr_vlrs(nsr_id, 1)
+
+ nsr_vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ yield from self.verify_vlr_state(nsr_vlrs[0], "running")
+
+ yield from self.verify_num_nsr_vnfrs(nsr_id, 2)
+
+ # Verify only a single vnfr was instantiated and is failed
+ yield from self.verify_num_vnfrs(1)
+ nsr_vnfs = yield from self.get_nsr_vnfs(nsr_id)
+ yield from self.verify_vnf_state(nsr_vnfs[0], "failed")
+
+ yield from self.verify_num_vnfr_vlrs(nsr_vnfs[0], 2)
+ vnf_vlrs = yield from self.get_vnf_vlrs(nsr_vnfs[0])
+ yield from self.verify_vlr_state(vnf_vlrs[0], "running")
+ yield from self.verify_vlr_state(vnf_vlrs[1], "failed")
+
+ self.verify_number_network_requests(3)
+ self.verify_number_compute_requests(0)
+ self.verify_number_allocated_network(2)
+ self.verify_number_allocated_compute(0)
+
+ yield from self.terminate_nsr(nsr_id)
+
+ yield from self.verify_nsr_deleted(nsr_id)
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 0)
+ yield from self.verify_num_vlrs(0)
+
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ self.unregister_mock_res_mgr()
+ self.unpublish_descriptors()
+
+ @rift.test.dts.async_test
+ def test_fail_first_vnf_first_vdu(self):
+ yield from self.publish_desciptors(num_internal_vlrs=2, num_ping_vms=2)
+ yield from self.register_mock_res_mgr()
+ yield from self.create_mock_launchpad_tasklet()
+ self.allocate_network_handlers(3)
+
+ nsr_id = yield from self.ping_pong.create_nsr()
+ yield from self.wait_until_nsr_active_or_failed(nsr_id)
+
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 1)
+ yield from self.verify_nsr_state(nsr_id, "failed")
+ yield from self.verify_num_vlrs(3)
+ yield from self.verify_num_nsr_vlrs(nsr_id, 1)
+
+ nsr_vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ yield from self.verify_vlr_state(nsr_vlrs[0], "running")
+
+ yield from self.verify_num_nsr_vnfrs(nsr_id, 2)
+
+ # Verify only a single vnfr was instantiated and is failed
+ yield from self.verify_num_vnfrs(1)
+ nsr_vnfs = yield from self.get_nsr_vnfs(nsr_id)
+ yield from self.verify_vnf_state(nsr_vnfs[0], "failed")
+
+ yield from self.verify_num_vnfr_vlrs(nsr_vnfs[0], 2)
+ vnf_vlrs = yield from self.get_vnf_vlrs(nsr_vnfs[0])
+ yield from self.verify_vlr_state(vnf_vlrs[0], "running")
+ yield from self.verify_vlr_state(vnf_vlrs[1], "running")
+
+ yield from self.verify_num_vnfr_vdus(nsr_vnfs[0], 2)
+ vdus = yield from self.get_vnf_vdus(nsr_vnfs[0])
+ self.verify_vdu_state(vdus[0], "failed")
+
+ self.verify_number_network_requests(3)
+ self.verify_number_compute_requests(1)
+ self.verify_number_allocated_network(3)
+ self.verify_number_allocated_compute(0)
+
+ yield from self.terminate_nsr(nsr_id)
+
+ yield from self.verify_nsr_deleted(nsr_id)
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 0)
+ yield from self.verify_num_vlrs(0)
+
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ self.unregister_mock_res_mgr()
+ self.unpublish_descriptors()
+
+ @rift.test.dts.async_test
+ def test_fail_first_vnf_second_vdu(self):
+ yield from self.publish_desciptors(num_internal_vlrs=2, num_ping_vms=2)
+ yield from self.register_mock_res_mgr()
+ yield from self.create_mock_launchpad_tasklet()
+ self.allocate_network_handlers(3)
+ self.allocate_compute_handlers(1)
+
+ nsr_id = yield from self.ping_pong.create_nsr()
+ yield from self.wait_until_nsr_active_or_failed(nsr_id)
+
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 1)
+ yield from self.verify_nsr_state(nsr_id, "failed")
+ yield from self.verify_num_vlrs(3)
+ yield from self.verify_num_nsr_vlrs(nsr_id, 1)
+
+ nsr_vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ yield from self.verify_vlr_state(nsr_vlrs[0], "running")
+
+ yield from self.verify_num_nsr_vnfrs(nsr_id, 2)
+
+ # Verify only a single vnfr was instantiated and is failed
+ yield from self.verify_num_vnfrs(1)
+ nsr_vnfs = yield from self.get_nsr_vnfs(nsr_id)
+ yield from self.verify_vnf_state(nsr_vnfs[0], "failed")
+
+ yield from self.verify_num_vnfr_vlrs(nsr_vnfs[0], 2)
+ vnf_vlrs = yield from self.get_vnf_vlrs(nsr_vnfs[0])
+ yield from self.verify_vlr_state(vnf_vlrs[0], "running")
+ yield from self.verify_vlr_state(vnf_vlrs[1], "running")
+
+ yield from self.verify_num_vnfr_vdus(nsr_vnfs[0], 2)
+
+ vdus = yield from self.get_vnf_vdus(nsr_vnfs[0])
+ self.verify_vdu_state(vdus[0], "running")
+ self.verify_vdu_state(vdus[1], "failed")
+
+ self.verify_number_network_requests(3)
+ self.verify_number_compute_requests(2)
+ self.verify_number_allocated_network(3)
+ self.verify_number_allocated_compute(1)
+
+ yield from self.terminate_nsr(nsr_id)
+
+ yield from self.verify_nsr_deleted(nsr_id)
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 0)
+ yield from self.verify_num_vlrs(0)
+
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ self.unregister_mock_res_mgr()
+ self.unpublish_descriptors()
+
+ @rift.test.dts.async_test
+ def test_fail_second_vnf_second_vdu(self):
+ yield from self.publish_desciptors(num_internal_vlrs=2, num_ping_vms=2)
+ yield from self.register_mock_res_mgr()
+ yield from self.create_mock_launchpad_tasklet()
+ self.allocate_network_handlers(5)
+ self.allocate_compute_handlers(3)
+
+ nsr_id = yield from self.ping_pong.create_nsr()
+ yield from self.wait_until_nsr_active_or_failed(nsr_id)
+
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 1)
+ yield from self.verify_nsr_state(nsr_id, "failed")
+ yield from self.verify_num_vlrs(5)
+ yield from self.verify_num_nsr_vlrs(nsr_id, 1)
+
+ nsr_vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ yield from self.verify_vlr_state(nsr_vlrs[0], "running")
+
+ yield from self.verify_num_nsr_vnfrs(nsr_id, 2)
+
+ # Verify only a single vnfr was instantiated and is failed
+ yield from self.verify_num_vnfrs(2)
+ nsr_vnfs = yield from self.get_nsr_vnfs(nsr_id)
+ yield from self.verify_vnf_state(nsr_vnfs[0], "running")
+ yield from self.verify_vnf_state(nsr_vnfs[1], "failed")
+
+ yield from self.verify_num_vnfr_vlrs(nsr_vnfs[0], 2)
+
+ vnf_vlrs = yield from self.get_vnf_vlrs(nsr_vnfs[0])
+ yield from self.verify_vlr_state(vnf_vlrs[0], "running")
+ yield from self.verify_vlr_state(vnf_vlrs[1], "running")
+
+ vnf_vlrs = yield from self.get_vnf_vlrs(nsr_vnfs[1])
+ yield from self.verify_vlr_state(vnf_vlrs[0], "running")
+ yield from self.verify_vlr_state(vnf_vlrs[1], "running")
+
+ yield from self.verify_num_vnfr_vdus(nsr_vnfs[0], 2)
+ yield from self.verify_num_vnfr_vdus(nsr_vnfs[1], 2)
+
+ vdus = yield from self.get_vnf_vdus(nsr_vnfs[0])
+ self.verify_vdu_state(vdus[0], "running")
+ self.verify_vdu_state(vdus[1], "running")
+
+ vdus = yield from self.get_vnf_vdus(nsr_vnfs[1])
+ self.verify_vdu_state(vdus[0], "running")
+ self.verify_vdu_state(vdus[1], "failed")
+
+ self.verify_number_network_requests(5)
+ self.verify_number_compute_requests(4)
+ self.verify_number_allocated_network(5)
+ self.verify_number_allocated_compute(3)
+
+ yield from self.terminate_nsr(nsr_id)
+
+ yield from self.verify_nsr_deleted(nsr_id)
+ yield from self.verify_nsd_ref_count(self.ping_pong.nsd_id, 0)
+ yield from self.verify_num_vlrs(0)
+
+ self.verify_number_allocated_network(0)
+ self.verify_number_allocated_compute(0)
+
+ self.unregister_mock_res_mgr()
+ self.unpublish_descriptors()
+
+
+def main():
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ args, _ = parser.parse_known_args()
+
+ ManoErrorTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner)
+
+if __name__ == '__main__':
+ main()
+
+# vim: sw
diff --git a/rwlaunchpad/test/mano_ut.py b/rwlaunchpad/test/mano_ut.py
new file mode 100755
index 0000000..69a0d40
--- /dev/null
+++ b/rwlaunchpad/test/mano_ut.py
@@ -0,0 +1,1198 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import asyncio
+import os
+import sys
+import unittest
+import uuid
+import xmlrunner
+import argparse
+import logging
+import time
+import types
+
+import gi
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+gi.require_version('RwLaunchpadYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwcalYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('NsrYang', '1.0')
+gi.require_version('RwlogMgmtYang', '1.0')
+
+from gi.repository import (
+ RwCloudYang as rwcloudyang,
+ RwDts as rwdts,
+ RwLaunchpadYang as launchpadyang,
+ RwNsmYang as rwnsmyang,
+ RwNsrYang as rwnsryang,
+ NsrYang as nsryang,
+ RwResourceMgrYang as rmgryang,
+ RwcalYang as rwcalyang,
+ RwConfigAgentYang as rwcfg_agent,
+ RwlogMgmtYang
+)
+
+from gi.repository.RwTypes import RwStatus
+import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
+import rift.tasklets
+import rift.test.dts
+import rw_peas
+
+
+openstack_info = {
+ 'username': 'pluto',
+ 'password': 'mypasswd',
+ 'auth_url': 'http://10.66.4.27:5000/v3/',
+ 'project_name': 'demo',
+ 'mgmt_network': 'private',
+ }
+
+
+if sys.version_info < (3, 4, 4):
+ asyncio.ensure_future = asyncio.async
+
+
+class XPaths(object):
+ @staticmethod
+ def nsd(k=None):
+ return ("C,/nsd:nsd-catalog/nsd:nsd" +
+ ("[nsd:id='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def vld(k=None):
+ return ("C,/vld:vld-catalog/vld:vld" +
+ ("[vld:id='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def vnfd(k=None):
+ return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
+ ("[vnfd:id='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def vnfr(k=None):
+ return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
+ ("[vnfr:id='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def vlr(k=None):
+ return ("D,/vlr:vlr-catalog/vlr:vlr" +
+ ("[vlr:id='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def nsd_ref_count(k=None):
+ return ("D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count" +
+ ("[rw-nsr:nsd-id-ref='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def vnfd_ref_count(k=None):
+ return ("D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count" +
+ ("[rw-nsr:nsd-id-ref='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def nsr_config(k=None):
+ return ("C,/nsr:ns-instance-config/nsr:nsr" +
+ ("[nsr:id='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def nsr_opdata(k=None):
+ return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
+ ("[nsr:ns-instance-config-ref='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def nsr_config_status(k=None):
+ return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
+ ("[nsr:ns-instance-config-ref='{}']/config_status".format(k) if k is not None else ""))
+
+ @staticmethod
+ def cm_state(k=None):
+ if k is None:
+ return ("D,/rw-conman:cm-state/rw-conman:cm-nsr")
+ else:
+ return ("D,/rw-conman:cm-state/rw-conman:cm-nsr" +
+ ("[rw-conman:id='{}']".format(k) if k is not None else ""))
+
+ @staticmethod
+ def nsr_scale_group_instance(nsr_id=None, group_name=None, index=None):
+ return (("D,/nsr:ns-instance-opdata/nsr:nsr") +
+ ("[nsr:ns-instance-config-ref='{}']".format(nsr_id) if nsr_id is not None else "") +
+ ("/nsr:scaling-group-record") +
+ ("[nsr:scaling-group-name-ref='{}']".format(group_name) if group_name is not None else "") +
+ ("/nsr:instance") +
+ ("[nsr:scaling-group-index-ref='{}']".format(index) if index is not None else ""))
+
+ @staticmethod
+ def nsr_scale_group_instance_config(nsr_id=None, group_name=None, index=None):
+ return (("C,/nsr:ns-instance-config/nsr:nsr") +
+ ("[nsr:id='{}']".format(nsr_id) if nsr_id is not None else "") +
+ ("/nsr:scaling-group") +
+ ("[nsr:scaling-group-name-ref='{}']".format(group_name) if group_name is not None else "") +
+ ("/nsr:instance") +
+ ("[nsr:index='{}']".format(index) if index is not None else ""))
+
+
+class ManoQuerier(object):
+ def __init__(self, log, dts):
+ self.log = log
+ self.dts = dts
+
+ @asyncio.coroutine
+ def _read_query(self, xpath, do_trace=False):
+ self.log.debug("Running XPATH read query: %s (trace: %s)", xpath, do_trace)
+ flags = rwdts.XactFlag.MERGE
+ flags += rwdts.XactFlag.TRACE if do_trace else 0
+ res_iter = yield from self.dts.query_read(
+ xpath, flags=flags
+ )
+
+ results = []
+ for i in res_iter:
+ result = yield from i
+ if result is not None:
+ results.append(result.result)
+
+ return results
+
+ @asyncio.coroutine
+ def get_cm_state(self, nsr_id=None):
+ return (yield from self._read_query(XPaths.cm_state(nsr_id), False))
+
+ @asyncio.coroutine
+ def get_nsr_opdatas(self, nsr_id=None):
+ return (yield from self._read_query(XPaths.nsr_opdata(nsr_id), False))
+
+ @asyncio.coroutine
+ def get_nsr_scale_group_instance_opdata(self, nsr_id=None, group_name=None, index=None):
+ return (yield from self._read_query(XPaths.nsr_scale_group_instance(nsr_id, group_name, index), False))
+ #return (yield from self._read_query(XPaths.nsr_scale_group_instance(nsr_id, group_name), True))
+
+ @asyncio.coroutine
+ def get_nsr_configs(self, nsr_id=None):
+ return (yield from self._read_query(XPaths.nsr_config(nsr_id)))
+
+ @asyncio.coroutine
+ def get_nsr_config_status(self, nsr_id=None):
+ return (yield from self._read_query(XPaths.nsr_config_status(nsr_id)))
+
+ @asyncio.coroutine
+ def get_vnfrs(self, vnfr_id=None):
+ return (yield from self._read_query(XPaths.vnfr(vnfr_id)))
+
+ @asyncio.coroutine
+ def get_vlrs(self, vlr_id=None):
+ return (yield from self._read_query(XPaths.vlr(vlr_id)))
+
+ @asyncio.coroutine
+ def get_nsd_ref_counts(self, nsd_id=None):
+ return (yield from self._read_query(XPaths.nsd_ref_count(nsd_id)))
+
+ @asyncio.coroutine
+ def get_vnfd_ref_counts(self, vnfd_id=None):
+ return (yield from self._read_query(XPaths.vnfd_ref_count(vnfd_id)))
+
+ @asyncio.coroutine
+ def delete_nsr(self, nsr_id):
+ with self.dts.transaction() as xact:
+ yield from self.dts.query_delete(
+ XPaths.nsr_config(nsr_id),
+ 0
+ #rwdts.XactFlag.TRACE,
+ #rwdts.Flag.ADVISE,
+ )
+
+ @asyncio.coroutine
+ def delete_nsd(self, nsd_id):
+ nsd_xpath = XPaths.nsd(nsd_id)
+ self.log.debug("Attempting to delete NSD with path = %s", nsd_xpath)
+ with self.dts.transaction() as xact:
+ yield from self.dts.query_delete(
+ nsd_xpath,
+ rwdts.XactFlag.ADVISE,
+ )
+
+ @asyncio.coroutine
+ def delete_vnfd(self, vnfd_id):
+ vnfd_xpath = XPaths.vnfd(vnfd_id)
+ self.log.debug("Attempting to delete VNFD with path = %s", vnfd_xpath)
+ with self.dts.transaction() as xact:
+ yield from self.dts.query_delete(
+ vnfd_xpath,
+ rwdts.XactFlag.ADVISE,
+ )
+
+ @asyncio.coroutine
+ def update_nsd(self, nsd_id, nsd_msg):
+ nsd_xpath = XPaths.nsd(nsd_id)
+ self.log.debug("Attempting to update NSD with path = %s", nsd_xpath)
+ with self.dts.transaction() as xact:
+ yield from self.dts.query_update(
+ nsd_xpath,
+ rwdts.XactFlag.ADVISE,
+ nsd_msg,
+ )
+
+ @asyncio.coroutine
+ def update_vnfd(self, vnfd_id, vnfd_msg):
+ vnfd_xpath = XPaths.vnfd(vnfd_id)
+ self.log.debug("Attempting to delete VNFD with path = %s", vnfd_xpath)
+ with self.dts.transaction() as xact:
+ yield from self.dts.query_update(
+ vnfd_xpath,
+ rwdts.XactFlag.ADVISE,
+ vnfd_msg,
+ )
+
+ @asyncio.coroutine
+ def update_nsr_config(self, nsr_id, nsr_msg):
+ nsr_xpath = XPaths.nsr_config(nsr_id)
+ self.log.debug("Attempting to update NSR with path = %s", nsr_xpath)
+ with self.dts.transaction() as xact:
+ yield from self.dts.query_update(
+ nsr_xpath,
+ rwdts.XactFlag.ADVISE|rwdts.XactFlag.REPLACE,
+ nsr_msg,
+ )
+
+
+class ManoTestCase(rift.test.dts.AbstractDTSTest):
+ @asyncio.coroutine
+ def verify_nsr_state(self, nsr_id, state):
+ nsrs = yield from self.querier.get_nsr_opdatas(nsr_id)
+ self.assertEqual(1, len(nsrs))
+ nsr = nsrs[0]
+
+ self.log.debug("Got nsr = %s", nsr)
+ self.assertEqual(state, nsr.operational_status)
+
+ @asyncio.coroutine
+ def verify_vlr_state(self, vlr_id, state):
+ vlrs = yield from self.querier.get_vlrs(vlr_id)
+ self.assertEqual(1, len(vlrs))
+ vlr = vlrs[0]
+
+ self.assertEqual(state, vlr.operational_status)
+
+ def verify_vdu_state(self, vdu, state):
+ self.assertEqual(state, vdu.operational_status)
+
+ @asyncio.coroutine
+ def verify_vnf_state(self, vnfr_id, state):
+ vnfrs = yield from self.querier.get_vnfrs(vnfr_id)
+ self.assertEqual(1, len(vnfrs))
+ vnfr = vnfrs[0]
+
+ self.assertEqual(state, vnfr.operational_status)
+
+ @asyncio.coroutine
+ def terminate_nsr(self, nsr_id):
+ self.log.debug("Terminating nsr id: %s", nsr_id)
+ yield from self.querier.delete_nsr(nsr_id)
+
+ @asyncio.coroutine
+ def verify_nsr_deleted(self, nsr_id):
+ nsr_opdatas = yield from self.querier.get_nsr_opdatas(nsr_id)
+ self.assertEqual(0, len(nsr_opdatas))
+
+ nsr_configs = yield from self.querier.get_nsr_configs(nsr_id)
+ self.assertEqual(0, len(nsr_configs))
+
+ @asyncio.coroutine
+ def verify_num_vlrs(self, num_vlrs):
+ vlrs = yield from self.querier.get_vlrs()
+ self.assertEqual(num_vlrs, len(vlrs))
+
+ @asyncio.coroutine
+ def get_nsr_vlrs(self, nsr_id):
+ nsrs = yield from self.querier.get_nsr_opdatas(nsr_id)
+ return [v.vlr_ref for v in nsrs[0].vlr]
+
+ @asyncio.coroutine
+ def get_nsr_vnfs(self, nsr_id):
+ nsrs = yield from self.querier.get_nsr_opdatas(nsr_id)
+ return nsrs[0].constituent_vnfr_ref
+
+ @asyncio.coroutine
+ def get_vnf_vlrs(self, vnfr_id):
+ vnfrs = yield from self.querier.get_vnfrs(vnfr_id)
+ return [i.vlr_ref for i in vnfrs[0].internal_vlr]
+
+ @asyncio.coroutine
+ def verify_num_nsr_vlrs(self, nsr_id, num_vlrs):
+ vlrs = yield from self.get_nsr_vlrs(nsr_id)
+ self.assertEqual(num_vlrs, len(vlrs))
+
+ @asyncio.coroutine
+ def verify_num_nsr_vnfrs(self, nsr_id, num_vnfs):
+ vnfs = yield from self.get_nsr_vnfs(nsr_id)
+ self.assertEqual(num_vnfs, len(vnfs))
+
+ @asyncio.coroutine
+ def verify_num_vnfr_vlrs(self, vnfr_id, num_vlrs):
+ vlrs = yield from self.get_vnf_vlrs(vnfr_id)
+ self.assertEqual(num_vlrs, len(vlrs))
+
+ @asyncio.coroutine
+ def get_vnf_vdus(self, vnfr_id):
+ vnfrs = yield from self.querier.get_vnfrs(vnfr_id)
+ return [i for i in vnfrs[0].vdur]
+
+ @asyncio.coroutine
+ def verify_num_vnfr_vdus(self, vnfr_id, num_vdus):
+ vdus = yield from self.get_vnf_vdus(vnfr_id)
+ self.assertEqual(num_vdus, len(vdus))
+
+ @asyncio.coroutine
+ def verify_num_vnfrs(self, num_vnfrs):
+ vnfrs = yield from self.querier.get_vnfrs()
+ self.assertEqual(num_vnfrs, len(vnfrs))
+
+ @asyncio.coroutine
+ def verify_nsd_ref_count(self, nsd_id, num_ref):
+ nsd_ref_counts = yield from self.querier.get_nsd_ref_counts(nsd_id)
+ self.assertEqual(num_ref, nsd_ref_counts[0].instance_ref_count)
+
+class DescriptorPublisher(object):
+ def __init__(self, log, loop, dts):
+ self.log = log
+ self.loop = loop
+ self.dts = dts
+
+ self._registrations = []
+
+ @asyncio.coroutine
+ def publish(self, w_path, path, desc):
+ ready_event = asyncio.Event(loop=self.loop)
+
+ @asyncio.coroutine
+ def on_ready(regh, status):
+ self.log.debug("Create element: %s, obj-type:%s obj:%s",
+ path, type(desc), desc)
+ with self.dts.transaction() as xact:
+ regh.create_element(path, desc, xact.xact)
+ self.log.debug("Created element: %s, obj:%s", path, desc)
+ ready_event.set()
+
+ handler = rift.tasklets.DTS.RegistrationHandler(
+ on_ready=on_ready
+ )
+
+ self.log.debug("Registering path: %s, obj:%s", w_path, desc)
+ reg = yield from self.dts.register(
+ w_path,
+ handler,
+ flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ
+ )
+ self._registrations.append(reg)
+ self.log.debug("Registered path : %s", w_path)
+ yield from ready_event.wait()
+
+ return reg
+
+ def unpublish_all(self):
+ self.log.debug("Deregistering all published descriptors")
+ for reg in self._registrations:
+ reg.deregister()
+
+
+class PingPongNsrConfigPublisher(object):
+ XPATH = "C,/nsr:ns-instance-config"
+
+ def __init__(self, log, loop, dts, ping_pong, cloud_account_name):
+ self.dts = dts
+ self.log = log
+ self.loop = loop
+ self.ref = None
+
+ self.querier = ManoQuerier(log, dts)
+
+ self.nsr_config = rwnsryang.YangData_Nsr_NsInstanceConfig()
+
+ nsr = rwnsryang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr.id = str(uuid.uuid4())
+ nsr.name = "ns1.{}".format(nsr.id)
+ nsr.nsd = nsryang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
+ nsr.nsd.from_dict(ping_pong.ping_pong_nsd.nsd.as_dict())
+ nsr.cloud_account = cloud_account_name
+
+ nsr.vnf_cloud_account_map.add().from_dict({
+ 'member_vnf_index_ref': nsr.nsd.constituent_vnfd[0].member_vnf_index,
+ 'config_agent_account': 'RiftCA',
+ #'cloud_account':'mock_account1'
+ })
+
+ inputs = nsryang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter()
+ inputs.xpath = "/nsd:nsd-catalog/nsd:nsd[nsd:id={}]/nsd:name".format(ping_pong.nsd_id)
+ inputs.value = "inigo montoya"
+
+ fast_cpu = {'metadata_key': 'FASTCPU', 'metadata_value': 'True'}
+ self.create_nsd_placement_group_map(nsr,
+ group_name = 'Orcus',
+ cloud_type = 'openstack',
+ construct_type = 'host_aggregate',
+ construct_value = [fast_cpu])
+
+ fast_storage = {'metadata_key': 'FASTSSD', 'metadata_value': 'True'}
+ self.create_nsd_placement_group_map(nsr,
+ group_name = 'Quaoar',
+ cloud_type = 'openstack',
+ construct_type = 'host_aggregate',
+ construct_value = [fast_storage])
+
+ fast_cpu = {'metadata_key': 'BLUE_HW', 'metadata_value': 'True'}
+ self.create_vnfd_placement_group_map(nsr,
+ group_name = 'Eris',
+ vnfd_id = ping_pong.ping_vnfd_id,
+ cloud_type = 'openstack',
+ construct_type = 'host_aggregate',
+ construct_value = [fast_cpu])
+
+ fast_storage = {'metadata_key': 'YELLOW_HW', 'metadata_value': 'True'}
+ self.create_vnfd_placement_group_map(nsr,
+ group_name = 'Weywot',
+ vnfd_id = ping_pong.pong_vnfd_id,
+ cloud_type = 'openstack',
+ construct_type = 'host_aggregate',
+ construct_value = [fast_storage])
+
+
+ nsr.input_parameter.append(inputs)
+
+ self._nsr = nsr
+ self.nsr_config.nsr.append(nsr)
+
+ self._ready_event = asyncio.Event(loop=self.loop)
+ asyncio.ensure_future(self.register(), loop=loop)
+
+ @asyncio.coroutine
+ def register(self):
+ @asyncio.coroutine
+ def on_ready(regh, status):
+ self._ready_event.set()
+
+ self.log.debug("Registering path: %s", PingPongNsrConfigPublisher.XPATH)
+ self.reg = yield from self.dts.register(
+ PingPongNsrConfigPublisher.XPATH,
+ flags=rwdts.Flag.PUBLISHER,
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_ready=on_ready,
+ ),
+ )
+
+ @asyncio.coroutine
+ def publish(self):
+ self.log.debug("Publishing NSR: {}".format(self.nsr_config))
+ yield from self._ready_event.wait()
+ with self.dts.transaction() as xact:
+ self.reg.create_element(
+ PingPongNsrConfigPublisher.XPATH,
+ self.nsr_config,
+ xact=xact.xact,
+ )
+
+ return self._nsr.id
+
+ @asyncio.coroutine
+ def create_scale_group_instance(self, group_name, index):
+ index = 1
+ scaling_group = self.nsr_config.nsr[0].scaling_group.add()
+ scaling_group.from_dict({
+ "scaling_group_name_ref": group_name,
+ "instance": [{"index": index}],
+ })
+ with self.dts.transaction() as xact:
+ self.reg.update_element(
+ PingPongNsrConfigPublisher.XPATH,
+ self.nsr_config,
+ xact=xact.xact,
+ )
+
+ return index
+
+ def create_nsd_placement_group_map(self,
+ nsr,
+ group_name,
+ cloud_type,
+ construct_type,
+ construct_value):
+ placement_group = nsr.nsd_placement_group_maps.add()
+ placement_group.from_dict({
+ "placement_group_ref" : group_name,
+ "cloud_type" : cloud_type,
+ construct_type : construct_value,
+ })
+
+
+ def create_vnfd_placement_group_map(self,
+ nsr,
+ group_name,
+ vnfd_id,
+ cloud_type,
+ construct_type,
+ construct_value):
+ placement_group = nsr.vnfd_placement_group_maps.add()
+ placement_group.from_dict({
+ "placement_group_ref" : group_name,
+ "vnfd_id_ref" : vnfd_id,
+ "cloud_type" : cloud_type,
+ construct_type : construct_value,
+ })
+
+
+ @asyncio.coroutine
+ def delete_scale_group_instance(self, group_name, index):
+ self.log.debug("Deleting scale group %s instance %s", group_name, index)
+ #del self.nsr_config.nsr[0].scaling_group[0].instance[0]
+ xpath = XPaths.nsr_scale_group_instance_config(self.nsr_config.nsr[0].id, group_name, index)
+ yield from self.dts.query_delete(xpath, flags=rwdts.XactFlag.ADVISE)
+ #with self.dts.transaction() as xact:
+ # self.reg.update_element(
+ # PingPongNsrConfigPublisher.XPATH,
+ # self.nsr_config,
+ # flags=rwdts.XactFlag.REPLACE,
+ # xact=xact.xact,
+ # )
+
+ def deregister(self):
+ if self.reg is not None:
+ self.reg.deregister()
+
+ def create_nsr_vl(self):
+ vld = self.nsr_config.nsr[0].nsd.vld.add()
+ vld.id = 'ping_pong_vld_2'
+ vld.name = 'ping_pong_vld_2' # hard coded
+ vld.short_name = vld.name
+ vld.vendor = 'RIFT.io'
+ vld.description = 'Toy VL'
+ vld.version = '1.0'
+ vld.type_yang = 'ELAN'
+
+ # cpref = vld.vnfd_connection_point_ref.add()
+ # cpref.member_vnf_index_ref = cp[0]
+ # cpref.vnfd_id_ref = cp[1]
+ # cpref.vnfd_connection_point_ref = cp[2]
+
+ vld = self.nsr_config.nsr[0].vl_cloud_account_map.add()
+ vld.vld_id_ref = 'ping_pong_vld_2'
+ vld.cloud_accounts = ["mock_account"]
+
+ @asyncio.coroutine
+ def add_nsr_vl(self):
+ self.create_nsr_vl()
+ yield from self.querier.update_nsr_config(
+ self.nsr_config.nsr[0].id,
+ self.nsr_config.nsr[0],
+ )
+
+ @asyncio.coroutine
+ def del_nsr_vl(self):
+ for vld in self.nsr_config.nsr[0].nsd.vld:
+ if vld.id == 'ping_pong_vld_2':
+ self.nsr_config.nsr[0].nsd.vld.remove(vld)
+ break
+
+ yield from self.querier.update_nsr_config(
+ self.nsr_config.nsr[0].id,
+ self.nsr_config.nsr[0],
+ )
+
+ def update_vnf_cloud_map(self,vnf_cloud_map):
+ self.log.debug("Modifying NSR to add VNF cloud account map: {}".format(vnf_cloud_map))
+ for vnf_index,cloud_acct in vnf_cloud_map.items():
+ vnf_maps = [vnf_map for vnf_map in self.nsr_config.nsr[0].vnf_cloud_account_map if vnf_index == vnf_map.member_vnf_index_ref]
+ if vnf_maps:
+ vnf_maps[0].cloud_account = cloud_acct
+ else:
+ self.nsr_config.nsr[0].vnf_cloud_account_map.add().from_dict({
+ 'member_vnf_index_ref':vnf_index,
+ 'cloud_account':cloud_acct
+ })
+
+
+class PingPongDescriptorPublisher(object):
+ def __init__(self, log, loop, dts, num_external_vlrs=1, num_internal_vlrs=1, num_ping_vms=1):
+ self.log = log
+ self.loop = loop
+ self.dts = dts
+
+ self.querier = ManoQuerier(self.log, self.dts)
+ self.publisher = DescriptorPublisher(self.log, self.loop, self.dts)
+ self.ping_vnfd, self.pong_vnfd, self.ping_pong_nsd = \
+ ping_pong_nsd.generate_ping_pong_descriptors(
+ pingcount=1,
+ external_vlr_count=num_external_vlrs,
+ internal_vlr_count=num_internal_vlrs,
+ num_vnf_vms=2,
+ mano_ut=True,
+ use_scale_group=True,
+ use_mon_params=False,
+ )
+
+ self.config_dir = os.path.join(os.getenv('RIFT_ARTIFACTS'),
+ "launchpad/libs",
+ self.ping_pong_nsd.id,
+ "config")
+
+ @property
+ def nsd_id(self):
+ return self.ping_pong_nsd.id
+
+ @property
+ def ping_vnfd_id(self):
+ return self.ping_vnfd.id
+
+ @property
+ def pong_vnfd_id(self):
+ return self.pong_vnfd.id
+
+ @asyncio.coroutine
+ def publish_desciptors(self):
+ # Publish ping_vnfd
+ xpath = XPaths.vnfd(self.ping_vnfd_id)
+ xpath_wild = XPaths.vnfd()
+ for obj in self.ping_vnfd.descriptor.vnfd:
+ self.log.debug("Publishing ping_vnfd path: %s - %s, type:%s, obj:%s",
+ xpath, xpath_wild, type(obj), obj)
+ yield from self.publisher.publish(xpath_wild, xpath, obj)
+
+ # Publish pong_vnfd
+ xpath = XPaths.vnfd(self.pong_vnfd_id)
+ xpath_wild = XPaths.vnfd()
+ for obj in self.pong_vnfd.descriptor.vnfd:
+ self.log.debug("Publishing pong_vnfd path: %s, wild_path: %s, obj:%s",
+ xpath, xpath_wild, obj)
+ yield from self.publisher.publish(xpath_wild, xpath, obj)
+
+ # Publish ping_pong_nsd
+ xpath = XPaths.nsd(self.nsd_id)
+ xpath_wild = XPaths.nsd()
+ for obj in self.ping_pong_nsd.descriptor.nsd:
+ self.log.debug("Publishing ping_pong nsd path: %s, wild_path: %s, obj:%s",
+ xpath, xpath_wild, obj)
+ yield from self.publisher.publish(xpath_wild, xpath, obj)
+
+ self.log.debug("DONE - publish_desciptors")
+
+ def unpublish_descriptors(self):
+ self.publisher.unpublish_all()
+
+ @asyncio.coroutine
+ def delete_nsd(self):
+ yield from self.querier.delete_nsd(self.ping_pong_nsd.id)
+
+ @asyncio.coroutine
+ def delete_ping_vnfd(self):
+ yield from self.querier.delete_vnfd(self.ping_vnfd.id)
+
+ @asyncio.coroutine
+ def update_nsd(self):
+ yield from self.querier.update_nsd(
+ self.ping_pong_nsd.id,
+ self.ping_pong_nsd.descriptor.nsd[0]
+ )
+
+ @asyncio.coroutine
+ def update_ping_vnfd(self):
+ yield from self.querier.update_vnfd(
+ self.ping_vnfd.id,
+ self.ping_vnfd.descriptor.vnfd[0]
+ )
+
+
+
+
+class ManoTestCase(rift.test.dts.AbstractDTSTest):
+ """
+ DTS GI interface unittests
+
+ Note: Each tests uses a list of asyncio.Events for staging through the
+ test. These are required here because we are bring up each coroutine
+ ("tasklet") at the same time and are not implementing any re-try
+ mechanisms. For instance, this is used in numerous tests to make sure that
+ a publisher is up and ready before the subscriber sends queries. Such
+ event lists should not be used in production software.
+ """
+
+ @classmethod
+ def configure_suite(cls, rwmain):
+ vns_dir = os.environ.get('VNS_DIR')
+ vnfm_dir = os.environ.get('VNFM_DIR')
+ nsm_dir = os.environ.get('NSM_DIR')
+ rm_dir = os.environ.get('RM_DIR')
+
+ rwmain.add_tasklet(vns_dir, 'rwvnstasklet')
+ rwmain.add_tasklet(vnfm_dir, 'rwvnfmtasklet')
+ rwmain.add_tasklet(nsm_dir, 'rwnsmtasklet')
+ rwmain.add_tasklet(rm_dir, 'rwresmgrtasklet')
+ rwmain.add_tasklet(rm_dir, 'rwconmantasklet')
+
+ @classmethod
+ def configure_schema(cls):
+ return rwnsmyang.get_schema()
+
+ @classmethod
+ def configure_timeout(cls):
+ return 240
+
+ @staticmethod
+ def get_cal_account(account_type, account_name):
+ """
+ Creates an object for class RwcalYang.Clo
+ """
+ account = rwcloudyang.CloudAccount()
+ if account_type == 'mock':
+ account.name = account_name
+ account.account_type = "mock"
+ account.mock.username = "mock_user"
+ elif ((account_type == 'openstack_static') or (account_type == 'openstack_dynamic')):
+ account.name = account_name
+ account.account_type = 'openstack'
+ account.openstack.key = openstack_info['username']
+ account.openstack.secret = openstack_info['password']
+ account.openstack.auth_url = openstack_info['auth_url']
+ account.openstack.tenant = openstack_info['project_name']
+ account.openstack.mgmt_network = openstack_info['mgmt_network']
+ return account
+
+ @asyncio.coroutine
+ def configure_cloud_account(self, dts, cloud_type, cloud_name="cloud1"):
+ account = self.get_cal_account(cloud_type, cloud_name)
+ account_xpath = "C,/rw-cloud:cloud/rw-cloud:account[rw-cloud:name='{}']".format(cloud_name)
+ self.log.info("Configuring cloud-account: %s", account)
+ yield from dts.query_create(account_xpath,
+ rwdts.XactFlag.ADVISE,
+ account)
+
+ @asyncio.coroutine
+ def wait_tasklets(self):
+ yield from asyncio.sleep(5, loop=self.loop)
+
+ def configure_test(self, loop, test_id):
+ self.log.debug("STARTING - %s", self.id())
+ self.tinfo = self.new_tinfo(self.id())
+ self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+ self.ping_pong = PingPongDescriptorPublisher(self.log, self.loop, self.dts)
+ self.querier = ManoQuerier(self.log, self.dts)
+ self.nsr_publisher = PingPongNsrConfigPublisher(
+ self.log,
+ loop,
+ self.dts,
+ self.ping_pong,
+ "mock_account",
+ )
+
+ def test_create_nsr_record(self):
+
+ @asyncio.coroutine
+ def verify_cm_state(termination=False, nsrid=None):
+ self.log.debug("Verifying cm_state path = %s", XPaths.cm_state(nsrid))
+ #print("###>>> Verifying cm_state path:", XPaths.cm_state(nsrid))
+
+ loop_count = 10
+ loop_sleep = 10
+ while loop_count:
+ yield from asyncio.sleep(loop_sleep, loop=self.loop)
+ loop_count -= 1
+ cm_nsr = None
+ cm_nsr_i = yield from self.querier.get_cm_state(nsr_id=nsrid)
+ if (cm_nsr_i is not None and len(cm_nsr_i) != 0):
+ self.assertEqual(1, len(cm_nsr_i))
+ cm_nsr = cm_nsr_i[0].as_dict()
+ #print("###>>> cm_nsr=", cm_nsr)
+ if termination:
+ if len(cm_nsr_i) == 0:
+ print("\n###>>> cm-state NSR deleted OK <<<###\n")
+ return
+ elif (cm_nsr is not None and
+ 'state' in cm_nsr and
+ (cm_nsr['state'] == 'ready')):
+ self.log.debug("Got cm_nsr record %s", cm_nsr)
+ print("\n###>>> cm-state NSR 'ready' OK <<<###\n")
+ return
+
+ # if (len(cm_nsr_i) == 1 and cm_nsr_i[0].state == 'ready'):
+ # self.log.debug("Got cm_nsr record %s", cm_nsr)
+ # else:
+ # yield from asyncio.sleep(10, loop=self.loop)
+
+ print("###>>> Failed cm-state, termination:", termination)
+ self.assertEqual(1, loop_count)
+
+ @asyncio.coroutine
+ def verify_nsr_opdata(termination=False):
+ self.log.debug("Verifying nsr opdata path = %s", XPaths.nsr_opdata())
+
+ while True:
+ nsrs = yield from self.querier.get_nsr_opdatas()
+ if termination:
+ if len(nsrs) != 0:
+ for i in range(10):
+ nsrs = yield from self.querier.get_nsr_opdatas()
+ if len(nsrs) == 0:
+ self.log.debug("No active NSR records found. NSR termination successful")
+ return
+ else:
+ self.assertEqual(0, len(nsrs))
+ self.log.error("Active NSR records found. NSR termination failed")
+
+ else:
+ self.log.debug("No active NSR records found. NSR termination successful")
+ self.assertEqual(0, len(nsrs))
+ return
+
+ nsr = nsrs[0]
+ self.log.debug("Got nsr record %s", nsr)
+ if nsr.operational_status == 'running':
+ self.log.debug("!!! Rcvd NSR with running status !!!")
+ self.assertEqual("configuring", nsr.config_status)
+ break
+
+ self.log.debug("Rcvd NSR with %s status", nsr.operational_status)
+ self.log.debug("Sleeping for 10 seconds")
+ yield from asyncio.sleep(10, loop=self.loop)
+
+ @asyncio.coroutine
+ def verify_nsr_config(termination=False):
+ self.log.debug("Verifying nsr config path = %s", XPaths.nsr_config())
+
+ nsr_configs = yield from self.querier.get_nsr_configs()
+ self.assertEqual(1, len(nsr_configs))
+
+ nsr_config = nsr_configs[0]
+ self.assertEqual(
+ "/nsd:nsd-catalog/nsd:nsd[nsd:id={}]/nsd:name".format(self.ping_pong.nsd_id),
+ nsr_config.input_parameter[0].xpath,
+ )
+
+ @asyncio.coroutine
+ def verify_nsr_config_status(termination=False, nsrid=None):
+ if termination is False and nsrid is not None:
+ self.log.debug("Verifying nsr config status path = %s", XPaths.nsr_opdata(nsrid))
+
+ loop_count = 6
+ loop_sleep = 10
+ while loop_count:
+ loop_count -= 1
+ yield from asyncio.sleep(loop_sleep, loop=self.loop)
+ nsr_opdata_l = yield from self.querier.get_nsr_opdatas(nsrid)
+ self.assertEqual(1, len(nsr_opdata_l))
+ nsr_opdata = nsr_opdata_l[0].as_dict()
+ if ("configured" == nsr_opdata['config_status']):
+ print("\n###>>> NSR Config Status 'configured' OK <<<###\n")
+ return
+ self.assertEqual("configured", nsr_opdata['config_status'])
+
+ @asyncio.coroutine
+ def verify_vnfr_record(termination=False):
+ self.log.debug("Verifying vnfr record path = %s, Termination=%d",
+ XPaths.vnfr(), termination)
+ if termination:
+ for i in range(10):
+ vnfrs = yield from self.querier.get_vnfrs()
+ if len(vnfrs) == 0:
+ return True
+
+ for vnfr in vnfrs:
+ self.log.debug("VNFR still exists = %s", vnfr)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+
+ assert len(vnfrs) == 0
+
+ while True:
+ vnfrs = yield from self.querier.get_vnfrs()
+ if len(vnfrs) != 0 and termination is False:
+ vnfr = vnfrs[0]
+ self.log.debug("Rcvd VNFR with %s status", vnfr.operational_status)
+ if vnfr.operational_status == 'running':
+ self.log.debug("!!! Rcvd VNFR with running status !!!")
+ return True
+
+ elif vnfr.operational_status == "failed":
+ self.log.debug("!!! Rcvd VNFR with failed status !!!")
+ return False
+
+ self.log.debug("Sleeping for 10 seconds")
+ yield from asyncio.sleep(10, loop=self.loop)
+
+
+ @asyncio.coroutine
+ def verify_vnfr_cloud_account(vnf_index, cloud_account):
+ self.log.debug("Verifying vnfr record Cloud account for vnf index = %d is %s", vnf_index,cloud_account)
+ vnfrs = yield from self.querier.get_vnfrs()
+ cloud_accounts = [vnfr.cloud_account for vnfr in vnfrs if vnfr.member_vnf_index_ref == vnf_index]
+ self.log.debug("VNFR cloud account for index %d is %s", vnf_index,cloud_accounts[0])
+ assert cloud_accounts[0] == cloud_account
+
+ @asyncio.coroutine
+ def verify_vlr_record(termination=False):
+ vlr_xpath = XPaths.vlr()
+ self.log.debug("Verifying vlr record path = %s, termination: %s",
+ vlr_xpath, termination)
+ res_iter = yield from self.dts.query_read(vlr_xpath)
+
+ for i in res_iter:
+ result = yield from i
+ if termination:
+ self.assertIsNone(result)
+
+ self.log.debug("Got vlr record %s", result)
+
+ @asyncio.coroutine
+ def verify_vlrs(nsr_id, count=0):
+ while True:
+ nsrs = yield from self.querier.get_nsr_opdatas()
+ nsr = nsrs[0]
+ self.log.debug("Got nsr record %s", nsr)
+ if nsr.operational_status == 'running':
+ self.log.debug("!!! Rcvd NSR with running status !!!")
+ # Check the VLR count
+ if (len(nsr.vlr)) == count:
+ self.log.debug("NSR %s has %d VLRs", nsr_id, count)
+ break
+
+ self.log.debug("Rcvd NSR %s with %s status", nsr_id, nsr.operational_status)
+ self.log.debug("Sleeping for 10 seconds")
+ yield from asyncio.sleep(10, loop=self.loop)
+
+ @asyncio.coroutine
+ def verify_nsd_ref_count(termination):
+ self.log.debug("Verifying nsd ref count= %s", XPaths.nsd_ref_count())
+ res_iter = yield from self.dts.query_read(XPaths.nsd_ref_count())
+
+ for i in res_iter:
+ result = yield from i
+ self.log.debug("Got nsd ref count record %s", result)
+
+ @asyncio.coroutine
+ def verify_vnfd_ref_count(termination):
+ self.log.debug("Verifying vnfd ref count= %s", XPaths.vnfd_ref_count())
+ res_iter = yield from self.dts.query_read(XPaths.vnfd_ref_count())
+
+ for i in res_iter:
+ result = yield from i
+ self.log.debug("Got vnfd ref count record %s", result)
+
+ @asyncio.coroutine
+ def verify_scale_group_reaches_state(nsr_id, scale_group, index, state, timeout=1000):
+ start_time = time.time()
+ instance_state = None
+ while (time.time() - start_time) < timeout:
+ results = yield from self.querier.get_nsr_opdatas(nsr_id=nsr_id)
+ if len(results) == 1:
+ result = results[0]
+ if len(result.scaling_group_record) == 0:
+ continue
+
+ if len(result.scaling_group_record[0].instance) == 0:
+ continue
+
+ instance = result.scaling_group_record[0].instance[0]
+ self.assertEqual(instance.scaling_group_index_ref, index)
+
+ instance_state = instance.op_status
+ if instance_state == state:
+ self.log.debug("Scale group instance reached %s state", state)
+ return
+
+ yield from asyncio.sleep(1, loop=self.loop)
+
+ self.assertEqual(state, instance_state)
+
+ @asyncio.coroutine
+ def verify_results(termination=False, nsrid=None):
+ yield from verify_vnfr_record(termination)
+ #yield from verify_vlr_record(termination)
+ yield from verify_nsr_opdata(termination)
+ yield from verify_nsr_config(termination)
+ yield from verify_nsd_ref_count(termination)
+ yield from verify_vnfd_ref_count(termination)
+
+ # Config Manager
+ yield from verify_cm_state(termination, nsrid)
+ yield from verify_nsr_config_status(termination, nsrid)
+
+ @asyncio.coroutine
+ def verify_scale_instance(index):
+ self.log.debug("Verifying scale record path = %s, Termination=%d",
+ XPaths.vnfr(), termination)
+ if termination:
+ for i in range(5):
+ vnfrs = yield from self.querier.get_vnfrs()
+ if len(vnfrs) == 0:
+ return True
+
+ for vnfr in vnfrs:
+ self.log.debug("VNFR still exists = %s", vnfr)
+
+
+ assert len(vnfrs) == 0
+
+ while True:
+ vnfrs = yield from self.querier.get_vnfrs()
+ if len(vnfrs) != 0 and termination is False:
+ vnfr = vnfrs[0]
+ self.log.debug("Rcvd VNFR with %s status", vnfr.operational_status)
+ if vnfr.operational_status == 'running':
+ self.log.debug("!!! Rcvd VNFR with running status !!!")
+ return True
+
+ elif vnfr.operational_status == "failed":
+ self.log.debug("!!! Rcvd VNFR with failed status !!!")
+ return False
+
+ self.log.debug("Sleeping for 10 seconds")
+ yield from asyncio.sleep(10, loop=self.loop)
+
+ @asyncio.coroutine
+ def terminate_ns(nsr_id):
+ xpath = XPaths.nsr_config(nsr_id)
+ self.log.debug("Terminating network service with path %s", xpath)
+ yield from self.dts.query_delete(xpath, flags=rwdts.XactFlag.ADVISE)
+ self.log.debug("Terminated network service with path %s", xpath)
+
+ @asyncio.coroutine
+ def run_test():
+ yield from self.wait_tasklets()
+
+
+ cloud_type = "mock"
+ yield from self.configure_cloud_account(self.dts, cloud_type, "mock_account")
+ yield from self.configure_cloud_account(self.dts, cloud_type, "mock_account1")
+
+ yield from self.ping_pong.publish_desciptors()
+
+ # Attempt deleting VNFD not in use
+ yield from self.ping_pong.update_ping_vnfd()
+
+ # Attempt updating NSD not in use
+ yield from self.ping_pong.update_nsd()
+
+ # Attempt deleting VNFD not in use
+ yield from self.ping_pong.delete_ping_vnfd()
+
+ # Attempt deleting NSD not in use
+ yield from self.ping_pong.delete_nsd()
+
+ yield from self.ping_pong.publish_desciptors()
+
+ nsr_id = yield from self.nsr_publisher.publish()
+
+ yield from verify_results(nsrid=nsr_id)
+
+ # yield from self.nsr_publisher.create_scale_group_instance("ping_group", 1)
+
+ # yield from verify_scale_group_reaches_state(nsr_id, "ping_group", 1, "running")
+
+ # yield from self.nsr_publisher.delete_scale_group_instance("ping_group", 1)
+
+ yield from asyncio.sleep(10, loop=self.loop)
+
+ # Attempt deleting VNFD in use
+ yield from self.ping_pong.delete_ping_vnfd()
+
+ # Attempt updating NSD in use
+ yield from self.ping_pong.update_nsd()
+
+ # Update NSD in use with new VL
+ yield from self.nsr_publisher.add_nsr_vl()
+
+ # Verify the new VL has been added
+ yield from verify_vlrs(nsr_id, count=2)
+
+ # Delete the added VL
+ yield from self.nsr_publisher.del_nsr_vl()
+
+ # Verify the new VL has been added
+ yield from verify_vlrs(nsr_id, count=1)
+
+ # Attempt deleting NSD in use
+ yield from self.ping_pong.delete_nsd()
+
+ yield from terminate_ns(nsr_id)
+
+ yield from asyncio.sleep(25, loop=self.loop)
+ self.log.debug("Verifying termination results")
+ yield from verify_results(termination=True, nsrid=nsr_id)
+ self.log.debug("Verified termination results")
+
+ # Multi site NS case
+ self.log.debug("Testing multi site NS")
+ self.nsr_publisher.update_vnf_cloud_map({1:"mock_account1",2:"mock_account"})
+ nsr_id = yield from self.nsr_publisher.publish()
+
+ yield from verify_results(nsrid=nsr_id)
+ yield from verify_vnfr_cloud_account(1,"mock_account1")
+ yield from verify_vnfr_cloud_account(2,"mock_account")
+ yield from verify_vlrs(nsr_id, count=2)
+
+ yield from terminate_ns(nsr_id)
+
+ yield from asyncio.sleep(25, loop=self.loop)
+ self.log.debug("Verifying termination results for multi site NS")
+ yield from verify_results(termination=True, nsrid=nsr_id)
+ self.log.debug("Verified termination results for multi site NS")
+
+ self.log.debug("Attempting to delete VNFD for real")
+ yield from self.ping_pong.delete_ping_vnfd()
+
+ self.log.debug("Attempting to delete NSD for real")
+ yield from self.ping_pong.delete_nsd()
+
+ future = asyncio.ensure_future(run_test(), loop=self.loop)
+ self.run_until(future.done)
+ if future.exception() is not None:
+ self.log.error("Caught exception during test")
+ raise future.exception()
+
+
+def main():
+ plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins")
+ if 'VNS_DIR' not in os.environ:
+ os.environ['VNS_DIR'] = os.path.join(plugin_dir, 'rwvns')
+
+ if 'VNFM_DIR' not in os.environ:
+ os.environ['VNFM_DIR'] = os.path.join(plugin_dir, 'rwvnfm')
+
+ if 'NSM_DIR' not in os.environ:
+ os.environ['NSM_DIR'] = os.path.join(plugin_dir, 'rwnsm')
+
+ if 'RM_DIR' not in os.environ:
+ os.environ['RM_DIR'] = os.path.join(plugin_dir, 'rwresmgrtasklet')
+
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('-n', '--no-runner', action='store_true')
+ args, unittest_args = parser.parse_known_args()
+ if args.no_runner:
+ runner = None
+
+ ManoTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+ main()
+
+# vim: sw=4
diff --git a/rwlaunchpad/test/mgmt_recovery.py b/rwlaunchpad/test/mgmt_recovery.py
new file mode 100755
index 0000000..29f0ab0
--- /dev/null
+++ b/rwlaunchpad/test/mgmt_recovery.py
@@ -0,0 +1,385 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import logging
+import os
+import resource
+import socket
+import sys
+import subprocess
+import shlex
+import shutil
+import netifaces
+
+from rift.rwlib.util import certs
+import rift.rwcal.cloudsim
+import rift.rwcal.cloudsim.net
+import rift.vcs
+import rift.vcs.core as core
+import rift.vcs.demo
+import rift.vcs.vms
+
+import rift.rwcal.cloudsim
+import rift.rwcal.cloudsim.net
+
+from rift.vcs.ext import ClassProperty
+
+logger = logging.getLogger(__name__)
+
+
+class NsmTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a network services manager tasklet.
+ """
+
+ def __init__(self, name='network-services-manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a NsmTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(NsmTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwnsmtasklet')
+ plugin_name = ClassProperty('rwnsmtasklet')
+
+
+class VnsTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a network services manager tasklet.
+ """
+
+ def __init__(self, name='virtual-network-service', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a VnsTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(VnsTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwvnstasklet')
+ plugin_name = ClassProperty('rwvnstasklet')
+
+
+class VnfmTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a virtual network function manager tasklet.
+ """
+
+ def __init__(self, name='virtual-network-function-manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a VnfmTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(VnfmTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwvnfmtasklet')
+ plugin_name = ClassProperty('rwvnfmtasklet')
+
+
+class ResMgrTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a Resource Manager tasklet.
+ """
+
+ def __init__(self, name='Resource-Manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a ResMgrTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(ResMgrTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwresmgrtasklet')
+ plugin_name = ClassProperty('rwresmgrtasklet')
+
+
+class MonitorTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a tasklet that is used to monitor NFVI metrics.
+ """
+
+ def __init__(self, name='nfvi-metrics-monitor', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a MonitorTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+
+ """
+ super(MonitorTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwmonitor')
+ plugin_name = ClassProperty('rwmonitor')
+
+
+def get_ui_ssl_args():
+ """Returns the SSL parameter string for launchpad UI processes"""
+
+ try:
+ use_ssl, certfile_path, keyfile_path = certs.get_bootstrap_cert_and_key()
+ except certs.BootstrapSslMissingException:
+ logger.error('No bootstrap certificates found. Disabling UI SSL')
+ use_ssl = False
+
+ # If we're not using SSL, no SSL arguments are necessary
+ if not use_ssl:
+ return ""
+
+ return "--enable-https --keyfile-path=%s --certfile-path=%s" % (keyfile_path, certfile_path)
+
+
+class UIServer(rift.vcs.NativeProcess):
+ def __init__(self, name="RW.MC.UI",
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ super(UIServer, self).__init__(
+ name=name,
+ exe="./usr/share/rw.ui/skyquake/scripts/launch_ui.sh",
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ @property
+ def args(self):
+ return get_ui_ssl_args()
+
+
+class RedisServer(rift.vcs.NativeProcess):
+ def __init__(self, name="RW.Redis.Server",
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ super(RedisServer, self).__init__(
+ name=name,
+ exe="/usr/bin/redis-server",
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ @property
+ def args(self):
+ return "./usr/bin/active_redis.conf --port 9999"
+
+class ConfigManagerTasklet(rift.vcs.core.Tasklet):
+ """
+ This class represents a Resource Manager tasklet.
+ """
+
+ def __init__(self, name='Configuration-Manager', uid=None,
+ config_ready=True,
+ recovery_action=core.RecoveryType.FAILCRITICAL.value,
+ data_storetype=core.DataStore.NOSTORE.value,
+ ):
+ """
+ Creates a ConfigManagerTasklet object.
+
+ Arguments:
+ name - the name of the tasklet
+ uid - a unique identifier
+ """
+ super(ConfigManagerTasklet, self).__init__(name=name, uid=uid,
+ config_ready=config_ready,
+ recovery_action=recovery_action,
+ data_storetype=data_storetype,
+ )
+
+ plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwconmantasklet')
+ plugin_name = ClassProperty('rwconmantasklet')
+
+
+class Demo(rift.vcs.demo.Demo):
+ def __init__(self,mgmt_ip_list):
+
+ procs = [
+ ConfigManagerTasklet(),
+ UIServer(),
+ RedisServer(),
+ rift.vcs.RestPortForwardTasklet(),
+ rift.vcs.RestconfTasklet(),
+ rift.vcs.RiftCli(),
+ rift.vcs.uAgentTasklet(),
+ rift.vcs.Launchpad(),
+ ]
+
+ standby_procs = [
+ RedisServer(),
+ rift.vcs.uAgentTasklet(mode_active=False),
+ ]
+
+ restart_procs = [
+ VnfmTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=core.DataStore.REDIS.value),
+ VnsTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=core.DataStore.REDIS.value),
+ MonitorTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=core.DataStore.REDIS.value),
+ NsmTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=core.DataStore.REDIS.value),
+ ResMgrTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=core.DataStore.REDIS.value),
+ ]
+ super(Demo, self).__init__(
+ # Construct the system. This system consists of 1 cluster in 1
+ # colony. The master cluster houses CLI and management VMs
+ sysinfo = rift.vcs.SystemInfo(
+ zookeeper=rift.vcs.manifest.RaZookeeper(zake=False, master_ip=mgmt_ip_list[0]),
+ colonies=[
+ rift.vcs.Colony(
+ name='master',
+ uid=1,
+ clusters=[
+ rift.vcs.VirtualMachine(
+ name='vm-templ-1',
+ ip=mgmt_ip_list[0],
+ procs=procs,
+ restart_procs=restart_procs,
+ ),
+ rift.vcs.VirtualMachine(
+ name='vm-templ-2',
+ ip=mgmt_ip_list[1],
+ standby_procs=standby_procs,
+ start=False,
+ ),
+ ] if len(mgmt_ip_list) == 2 else [
+ rift.vcs.VirtualMachine(
+ name='vm-templ-1',
+ ip=mgmt_ip_list[0],
+ procs=procs,
+ restart_procs=restart_procs,
+ ),
+ ]
+ )
+ ],
+ ),
+
+ # Define the generic portmap.
+ port_map = {},
+
+ # Define a mapping from the placeholder logical names to the real
+ # port names for each of the different modes supported by this demo.
+ port_names = {
+ 'ethsim': {
+ },
+ 'pci': {
+ }
+ },
+
+ # Define the connectivity between logical port names.
+ port_groups = {},
+ )
+
+
+def main(argv=sys.argv[1:]):
+ logging.basicConfig(format='%(asctime)-15s %(levelname)s %(message)s')
+
+ # Create a parser which includes all generic demo arguments
+ parser = rift.vcs.demo.DemoArgParser()
+
+ args = parser.parse_args(argv)
+
+ # Disable loading any kernel modules for the launchpad VM
+ # since it doesn't need it and it will fail within containers
+ os.environ["NO_KERNEL_MODS"] = "1"
+
+ # Remove the persistant DTS recovery files
+ for f in os.listdir(os.environ["INSTALLDIR"]):
+ if f.endswith(".db"):
+ os.remove(os.path.join(os.environ["INSTALLDIR"], f))
+
+ #load demo info and create Demo object
+ demo = Demo(args.mgmt_ip_list)
+
+ # Create the prepared system from the demo
+ system = rift.vcs.demo.prepared_system_from_demo_and_args(demo, args,
+ northbound_listing="cli_launchpad_schema_listing.txt",
+ netconf_trace_override=True)
+
+ confd_ip = socket.gethostbyname(socket.gethostname())
+ intf = netifaces.ifaddresses('eth0')
+ if intf and netifaces.AF_INET in intf and len(intf[netifaces.AF_INET]):
+ confd_ip = intf[netifaces.AF_INET][0]['addr']
+ rift.vcs.logger.configure_sink(config_file=None, confd_ip=confd_ip)
+
+ # Start the prepared system
+ system.start()
+
+
+if __name__ == "__main__":
+ resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) )
+ try:
+ main()
+ except rift.vcs.demo.ReservationError:
+ print("ERROR: unable to retrieve a list of IP addresses from the reservation system")
+ sys.exit(1)
+ except rift.vcs.demo.MissingModeError:
+ print("ERROR: you need to provide a mode to run the script")
+ sys.exit(1)
+ finally:
+ os.system("stty sane")
diff --git a/rwlaunchpad/test/pytest/lp_kt_utm_test.py b/rwlaunchpad/test/pytest/lp_kt_utm_test.py
new file mode 100644
index 0000000..0a8d6ba
--- /dev/null
+++ b/rwlaunchpad/test/pytest/lp_kt_utm_test.py
@@ -0,0 +1,306 @@
+#!/usr/bin/env python
+"""
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+@file lp_test.py
+@author Austin Cormier (Austin.Cormier@riftio.com)
+@date 10/15/2015
+@brief Launchpad Module Test
+"""
+
+import json
+import logging
+import os
+import pytest
+import shlex
+import requests
+import subprocess
+import time
+import uuid
+import gi
+
+gi.require_version('RwlogMgmtYang', '1.0')
+gi.require_version('RwBaseYang', '1.0')
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+gi.require_version('RwIwpYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwConmanYang', '1.0')
+gi.require_version('RwVnfdYang', '1.0')
+
+from gi.repository import (
+ NsdYang,
+ NsrYang,
+ RwBaseYang,
+ RwCloudYang,
+ RwIwpYang,
+ RwlogMgmtYang,
+ RwNsmYang,
+ RwNsrYang,
+ RwResourceMgrYang,
+ RwConmanYang,
+ RwVnfdYang,
+ VldYang,
+ )
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+RW_KT_UTM_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/kt_utm"
+ )
+
+RW_KT_UTM_NSD_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/nsds/utm_only"
+ )
+
+
+class PackageError(Exception):
+ pass
+
+
+def raise_package_error():
+ raise PackageError("Could not find ns packages")
+
+
+@pytest.fixture(scope='module')
+def iwp_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwIwpYang)
+
+
+@pytest.fixture(scope='module')
+def rwlog_mgmt_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwlogMgmtYang)
+
+
+@pytest.fixture(scope='module')
+def resource_mgr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwResourceMgrYang)
+
+
+@pytest.fixture(scope='module')
+def cloud_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwCloudYang)
+
+
+@pytest.fixture(scope='module')
+def vnfd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwVnfdYang)
+
+
+@pytest.fixture(scope='module')
+def vld_proxy(request, mgmt_session):
+ return mgmt_session.proxy(VldYang)
+
+
+@pytest.fixture(scope='module')
+def nsd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsdYang)
+
+
+@pytest.fixture(scope='module')
+def nsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsrYang)
+
+
+@pytest.fixture(scope='module')
+def rwnsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsrYang)
+
+
+@pytest.fixture(scope='module')
+def base_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwBaseYang)
+
+
+@pytest.fixture(scope='module')
+def so_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwConmanYang)
+
+
+@pytest.fixture(scope='module')
+def nsm_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsmYang)
+
+
+@pytest.fixture(scope='session')
+def kt_utm_vnfd_package_file():
+ ktutm_pkg_file = os.path.join(
+ RW_KT_UTM_PKG_INSTALL_DIR,
+ "kt_utm_vnfd.tar.gz",
+ )
+ if not os.path.exists(ktutm_pkg_file):
+ raise_package_error()
+
+ return ktutm_pkg_file
+
+@pytest.fixture(scope='session')
+def utm_only_nsd_package_file():
+ ktutm_nsd_pkg_file = os.path.join(
+ RW_KT_UTM_NSD_PKG_INSTALL_DIR,
+ "utm_only_nsd.tar.gz",
+ )
+ if not os.path.exists(ktutm_nsd_pkg_file):
+ raise_package_error()
+
+ return ktutm_nsd_pkg_file
+
+def upload_descriptor(logger, descriptor_file, host="127.0.0.1"):
+ curl_cmd = 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
+ file=descriptor_file,
+ host=host,
+ )
+ logger.debug("Uploading descriptor %s using cmd: %s", descriptor_file, curl_cmd)
+ stdout = subprocess.check_output(shlex.split(curl_cmd), universal_newlines=True)
+
+ json_out = json.loads(stdout)
+ transaction_id = json_out["transaction_id"]
+
+ return transaction_id
+
+
+class DescriptorOnboardError(Exception):
+ pass
+
+
+def wait_unboard_transaction_finished(logger, transaction_id, timeout_secs=600, host="127.0.0.1"):
+ logger.info("Waiting for onboard trans_id %s to complete",
+ transaction_id)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+ r = requests.get(
+ 'http://{host}:4567/api/upload/{t_id}/state'.format(
+ host=host, t_id=transaction_id
+ )
+ )
+ state = r.json()
+ if state["status"] == "pending":
+ time.sleep(1)
+ continue
+
+ elif state["status"] == "success":
+ logger.info("Descriptor onboard was successful")
+ return
+
+ else:
+ raise DescriptorOnboardError(state)
+
+ if state["status"] != "success":
+ raise DescriptorOnboardError(state)
+
+def create_nsr_from_nsd_id(nsd_id):
+ nsr = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr.id = str(uuid.uuid4())
+ nsr.name = "UTM-only"
+ nsr.short_name = "UTM-only"
+ nsr.description = "1 VNFs with 5 VLs"
+ nsr.nsd_ref = nsd_id
+ nsr.admin_status = "ENABLED"
+
+ return nsr
+
+@pytest.mark.incremental
+class TestLaunchpadStartStop(object):
+ def test_configure_logging(self, rwlog_mgmt_proxy):
+ logging = RwlogMgmtYang.Logging.from_dict({
+ "console": {
+ "on": True,
+ "filter": {
+ "category": [{
+ "name": "rw-generic",
+ "severity": "error"
+ }],
+ }
+ }
+ })
+ rwlog_mgmt_proxy.merge_config("/rwlog-mgmt:logging", logging)
+
+ def test_configure_cloud_account(self, cloud_proxy, logger):
+ cloud_account = RwCloudYang.CloudAccountConfig()
+ # cloud_account.name = "cloudsim_proxy"
+ # cloud_account.account_type = "cloudsim_proxy"
+ cloud_account.name = "openstack"
+ cloud_account.account_type = "openstack"
+ cloud_account.openstack.key = 'pluto'
+ cloud_account.openstack.secret = 'mypasswd'
+ cloud_account.openstack.auth_url = 'http://10.66.4.13:5000/v3/'
+ cloud_account.openstack.tenant = 'demo'
+ cloud_account.openstack.mgmt_network = 'private'
+
+ cloud_proxy.merge_config("/rw-cloud:cloud-account", cloud_account)
+
+ def test_configure_pools(self, resource_mgr_proxy):
+ pools = RwResourceMgrYang.ResourcePools.from_dict({
+ "pools": [{ "name": "vm_pool_a",
+ "resource_type": "compute",
+ "pool_type" : "dynamic"},
+ {"name": "network_pool_a",
+ "resource_type": "network",
+ "pool_type" : "dynamic",}]})
+
+ resource_mgr_proxy.merge_config('/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools', pools)
+
+ def test_configure_resource_orchestrator(self, so_proxy):
+ cfg = RwConmanYang.RoEndpoint.from_dict({'ro_ip_address': '127.0.0.1',
+ 'ro_port' : 2022,
+ 'ro_username' : 'admin',
+ 'ro_password' : 'admin'})
+ so_proxy.merge_config('/rw-conman:cm-config', cfg)
+
+ def test_configure_service_orchestrator(self, nsm_proxy):
+ cfg = RwNsmYang.SoEndpoint.from_dict({'cm_ip_address': '127.0.0.1',
+ 'cm_port' : 2022,
+ 'cm_username' : 'admin',
+ 'cm_password' : 'admin'})
+ nsm_proxy.merge_config('/rw-nsm:ro-config/rw-nsm:cm-endpoint', cfg)
+
+
+ def test_onboard_ktutm_vnfd(self, logger, vnfd_proxy, kt_utm_vnfd_package_file):
+ logger.info("Onboarding kt_utm_vnfd package: %s", kt_utm_vnfd_package_file)
+ trans_id = upload_descriptor(logger, kt_utm_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 1, "There should only be a single vnfd"
+ vnfd = vnfds[0]
+ assert vnfd.name == "kt_utm_vnfd"
+
+ def test_onboard_utm_only_nsd(self, logger, nsd_proxy, utm_only_nsd_package_file):
+ logger.info("Onboarding utm_onlynsd package: %s", utm_only_nsd_package_file)
+ trans_id = upload_descriptor(logger, utm_only_nsd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsds = catalog.nsd
+ assert len(nsds) == 1, "There should only be a single nsd"
+ nsd = nsds[0]
+
+ def test_instantiate_utm_only_nsr(self, logger, nsd_proxy, nsr_proxy, rwnsr_proxy, base_proxy):
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsd = catalog.nsd[0]
+
+ nsr = create_nsr_from_nsd_id(nsd.id)
+ nsr_proxy.merge_config('/ns-instance-config', nsr)
+
+ nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsrs = nsr_opdata.nsr
+ assert len(nsrs) == 1
+ assert nsrs[0].ns_instance_config_ref == nsr.id
diff --git a/rwlaunchpad/test/pytest/lp_kt_utm_wims_test.py b/rwlaunchpad/test/pytest/lp_kt_utm_wims_test.py
new file mode 100644
index 0000000..705565b
--- /dev/null
+++ b/rwlaunchpad/test/pytest/lp_kt_utm_wims_test.py
@@ -0,0 +1,333 @@
+#!/usr/bin/env python
+"""
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+@file lp_test.py
+@author Austin Cormier (Austin.Cormier@riftio.com)
+@date 10/15/2015
+@brief Launchpad Module Test
+"""
+
+import json
+import logging
+import os
+import pytest
+import shlex
+import requests
+import subprocess
+import time
+import uuid
+import gi
+
+gi.require_version('RwlogMgmtYang', '1.0')
+gi.require_version('RwBaseYang', '1.0')
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwIwpYang', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwConmanYang', '1.0')
+gi.require_version('RwVnfdYang', '1.0')
+
+from gi.repository import (
+ NsdYang,
+ NsrYang,
+ RwBaseYang,
+ RwCloudYang,
+ RwIwpYang,
+ RwlogMgmtYang,
+ RwNsmYang,
+ RwNsrYang,
+ RwResourceMgrYang,
+ RwConmanYang,
+ RwVnfdYang,
+ VldYang,
+ )
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+RW_KT_UTM_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/kt_utm"
+ )
+
+RW_KT_WIMS_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/kt_wims"
+ )
+
+RW_KT_UTM_WIMS_NSD_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/nsds/utm_wims"
+ )
+
+
+class PackageError(Exception):
+ pass
+
+
+def raise_package_error():
+ raise PackageError("Could not find ns packages")
+
+
+@pytest.fixture(scope='module')
+def iwp_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwIwpYang)
+
+
+@pytest.fixture(scope='module')
+def rwlog_mgmt_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwlogMgmtYang)
+
+
+@pytest.fixture(scope='module')
+def resource_mgr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwResourceMgrYang)
+
+
+@pytest.fixture(scope='module')
+def cloud_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwCloudYang)
+
+
+@pytest.fixture(scope='module')
+def vnfd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwVnfdYang)
+
+
+@pytest.fixture(scope='module')
+def vld_proxy(request, mgmt_session):
+ return mgmt_session.proxy(VldYang)
+
+
+@pytest.fixture(scope='module')
+def nsd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsdYang)
+
+
+@pytest.fixture(scope='module')
+def nsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsrYang)
+
+
+@pytest.fixture(scope='module')
+def rwnsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsrYang)
+
+
+@pytest.fixture(scope='module')
+def base_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwBaseYang)
+
+
+@pytest.fixture(scope='module')
+def so_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwConmanYang)
+
+
+@pytest.fixture(scope='module')
+def nsm_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsmYang)
+
+
+@pytest.fixture(scope='session')
+def kt_utm_vnfd_package_file():
+ ktutm_pkg_file = os.path.join(
+ RW_KT_UTM_PKG_INSTALL_DIR,
+ "kt_utm_vnfd.tar.gz",
+ )
+ if not os.path.exists(ktutm_pkg_file):
+ raise_package_error()
+
+ return ktutm_pkg_file
+
+@pytest.fixture(scope='session')
+def kt_wims_vnfd_package_file():
+ ktwims_pkg_file = os.path.join(
+ RW_KT_WIMS_PKG_INSTALL_DIR,
+ "kt_wims_vnfd.tar.gz",
+ )
+ if not os.path.exists(ktwims_pkg_file):
+ raise_package_error()
+
+ return ktwims_pkg_file
+
+@pytest.fixture(scope='session')
+def utm_wims_nsd_package_file():
+ ktutm_wims_nsd_pkg_file = os.path.join(
+ RW_KT_UTM_WIMS_NSD_PKG_INSTALL_DIR,
+ "utm_wims_nsd.tar.gz",
+ )
+ if not os.path.exists(ktutm_wims_nsd_pkg_file):
+ raise_package_error()
+
+ return ktutm_wims_nsd_pkg_file
+
+def upload_descriptor(logger, descriptor_file, host="127.0.0.1"):
+ curl_cmd = 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
+ file=descriptor_file,
+ host=host,
+ )
+ logger.debug("Uploading descriptor %s using cmd: %s", descriptor_file, curl_cmd)
+ stdout = subprocess.check_output(shlex.split(curl_cmd), universal_newlines=True)
+
+ json_out = json.loads(stdout)
+ transaction_id = json_out["transaction_id"]
+
+ return transaction_id
+
+
+class DescriptorOnboardError(Exception):
+ pass
+
+
+def wait_unboard_transaction_finished(logger, transaction_id, timeout_secs=600, host="127.0.0.1"):
+ logger.info("Waiting for onboard trans_id %s to complete",
+ transaction_id)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+ r = requests.get(
+ 'http://{host}:4567/api/upload/{t_id}/state'.format(
+ host=host, t_id=transaction_id
+ )
+ )
+ state = r.json()
+ if state["status"] == "pending":
+ time.sleep(1)
+ continue
+
+ elif state["status"] == "success":
+ logger.info("Descriptor onboard was successful")
+ return
+
+ else:
+ raise DescriptorOnboardError(state)
+
+ if state["status"] != "success":
+ raise DescriptorOnboardError(state)
+
+def create_nsr_from_nsd_id(nsd_id):
+ nsr = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr.id = str(uuid.uuid4())
+ nsr.name = "UTM-WIMS"
+ nsr.short_name = "UTM-WIMS"
+ nsr.description = "2 VNFs with 4 VLs"
+ nsr.nsd_ref = nsd_id
+ nsr.admin_status = "ENABLED"
+
+ return nsr
+
+@pytest.mark.incremental
+class TestLaunchpadStartStop(object):
+ def test_configure_logging(self, rwlog_mgmt_proxy):
+ logging = RwlogMgmtYang.Logging.from_dict({
+ "console": {
+ "on": True,
+ "filter": {
+ "category": [{
+ "name": "rw-generic",
+ "severity": "error"
+ }],
+ }
+ }
+ })
+ rwlog_mgmt_proxy.merge_config("/rwlog-mgmt:logging", logging)
+
+ def test_configure_cloud_account(self, cloud_proxy, logger):
+ cloud_account = RwCloudYang.CloudAccountConfig()
+ # cloud_account.name = "cloudsim_proxy"
+ # cloud_account.account_type = "cloudsim_proxy"
+ cloud_account.name = "openstack"
+ cloud_account.account_type = "openstack"
+ cloud_account.openstack.key = 'pluto'
+ cloud_account.openstack.secret = 'mypasswd'
+ cloud_account.openstack.auth_url = 'http://10.66.4.xx:5000/v3/'
+ cloud_account.openstack.tenant = 'demo'
+ cloud_account.openstack.mgmt_network = 'private'
+
+ cloud_proxy.merge_config("/rw-cloud:cloud-account", cloud_account)
+
+ def test_configure_pools(self, resource_mgr_proxy):
+ pools = RwResourceMgrYang.ResourcePools.from_dict({
+ "pools": [{ "name": "vm_pool_a",
+ "resource_type": "compute",
+ "pool_type" : "dynamic"},
+ {"name": "network_pool_a",
+ "resource_type": "network",
+ "pool_type" : "dynamic",}]})
+
+ resource_mgr_proxy.merge_config('/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools', pools)
+
+ def test_configure_resource_orchestrator(self, so_proxy):
+ cfg = RwConmanYang.RoEndpoint.from_dict({'ro_ip_address': '127.0.0.1',
+ 'ro_port' : 2022,
+ 'ro_username' : 'admin',
+ 'ro_password' : 'admin'})
+ so_proxy.merge_config('/rw-conman:cm-config', cfg)
+
+ def test_configure_service_orchestrator(self, nsm_proxy):
+ cfg = RwNsmYang.SoEndpoint.from_dict({'cm_ip_address': '127.0.0.1',
+ 'cm_port' : 2022,
+ 'cm_username' : 'admin',
+ 'cm_password' : 'admin'})
+ nsm_proxy.merge_config('/rw-nsm:ro-config/rw-nsm:cm-endpoint', cfg)
+
+
+ def test_onboard_ktutm_vnfd(self, logger, vnfd_proxy, kt_utm_vnfd_package_file):
+ logger.info("Onboarding kt_utm_vnfd package: %s", kt_utm_vnfd_package_file)
+ trans_id = upload_descriptor(logger, kt_utm_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 1, "There should only be a single vnfd"
+ vnfd = vnfds[0]
+ assert vnfd.name == "kt_utm_vnfd"
+
+ def test_onboard_ktwims_vnfd(self, logger, vnfd_proxy, kt_wims_vnfd_package_file):
+ logger.info("Onboarding kt_wims_vnfd package: %s", kt_wims_vnfd_package_file)
+ trans_id = upload_descriptor(logger, kt_wims_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 2, "There should only be two vnfd"
+ assert "kt_wims_vnfd" in [vnfds[0].name, vnfds[1].name]
+
+ def test_onboard_utm_wims_nsd(self, logger, nsd_proxy, utm_wims_nsd_package_file):
+ logger.info("Onboarding utm_wims_nsd package: %s", utm_wims_nsd_package_file)
+ trans_id = upload_descriptor(logger, utm_wims_nsd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsds = catalog.nsd
+ assert len(nsds) == 1, "There should only be a single nsd"
+ nsd = nsds[0]
+
+ def test_instantiate_utm_wims_nsr(self, logger, nsd_proxy, nsr_proxy, rwnsr_proxy, base_proxy):
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsd = catalog.nsd[0]
+
+ nsr = create_nsr_from_nsd_id(nsd.id)
+ nsr_proxy.merge_config('/ns-instance-config', nsr)
+
+ nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsrs = nsr_opdata.nsr
+ assert len(nsrs) == 1
+ assert nsrs[0].ns_instance_config_ref == nsr.id
diff --git a/rwlaunchpad/test/pytest/lp_test.py b/rwlaunchpad/test/pytest/lp_test.py
new file mode 100644
index 0000000..b987b35
--- /dev/null
+++ b/rwlaunchpad/test/pytest/lp_test.py
@@ -0,0 +1,390 @@
+#!/usr/bin/env python
+"""
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+@file lp_test.py
+@author Austin Cormier (Austin.Cormier@riftio.com)
+@date 10/15/2015
+@brief Launchpad Module Test
+"""
+
+import json
+import logging
+import os
+import pytest
+import shlex
+import requests
+import subprocess
+import time
+import uuid
+import datetime
+
+import gi
+gi.require_version('RwBaseYang', '1.0')
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwIwpYang', '1.0')
+gi.require_version('RwlogMgmtYang', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwConmanYang', '1.0')
+gi.require_version('RwVnfdYang', '1.0')
+
+from gi.repository import (
+ NsdYang,
+ NsrYang,
+ RwBaseYang,
+ RwCloudYang,
+ RwIwpYang,
+ RwlogMgmtYang,
+ RwNsmYang,
+ RwNsrYang,
+ RwResourceMgrYang,
+ RwConmanYang,
+ RwVnfdYang,
+ VldYang,
+ )
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+RW_PING_PONG_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_ROOT"],
+ "images"
+ )
+
+class PackageError(Exception):
+ pass
+
+
+def raise_package_error():
+ raise PackageError("Could not find ns packages")
+
+
+@pytest.fixture(scope='module')
+def iwp_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwIwpYang)
+
+
+@pytest.fixture(scope='module')
+def rwlog_mgmt_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwlogMgmtYang)
+
+
+@pytest.fixture(scope='module')
+def resource_mgr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwResourceMgrYang)
+
+
+@pytest.fixture(scope='module')
+def cloud_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwCloudYang)
+
+
+@pytest.fixture(scope='module')
+def vnfd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwVnfdYang)
+
+
+@pytest.fixture(scope='module')
+def vld_proxy(request, mgmt_session):
+ return mgmt_session.proxy(VldYang)
+
+
+@pytest.fixture(scope='module')
+def nsd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsdYang)
+
+
+@pytest.fixture(scope='module')
+def nsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsrYang)
+
+
+@pytest.fixture(scope='module')
+def rwnsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsrYang)
+
+
+@pytest.fixture(scope='module')
+def base_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwBaseYang)
+
+
+@pytest.fixture(scope='module')
+def so_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwConmanYang)
+
+
+@pytest.fixture(scope='module')
+def nsm_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsmYang)
+
+
+@pytest.fixture(scope='session')
+def ping_vnfd_package_file():
+ ping_pkg_file = os.path.join(
+ RW_PING_PONG_PKG_INSTALL_DIR,
+ "ping_vnfd_with_image.tar.gz",
+ )
+ if not os.path.exists(ping_pkg_file):
+ raise_package_error()
+
+ return ping_pkg_file
+
+
+@pytest.fixture(scope='session')
+def pong_vnfd_package_file():
+ pong_pkg_file = os.path.join(
+ RW_PING_PONG_PKG_INSTALL_DIR,
+ "pong_vnfd_with_image.tar.gz",
+ )
+ if not os.path.exists(pong_pkg_file):
+ raise_package_error()
+
+ return pong_pkg_file
+
+
+@pytest.fixture(scope='session')
+def ping_pong_nsd_package_file():
+ ping_pong_pkg_file = os.path.join(
+ RW_PING_PONG_PKG_INSTALL_DIR,
+ "ping_pong_nsd.tar.gz",
+ )
+ if not os.path.exists(ping_pong_pkg_file):
+ raise_package_error()
+
+ return ping_pong_pkg_file
+
+
+def create_nsr_from_nsd_id(nsd_id):
+ nsr = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr.id = str(uuid.uuid4())
+ nsr.name = "pingpong_{}".format(datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
+ nsr.short_name = "nsr_short_name"
+ nsr.description = "This is a description"
+ nsr.nsd_ref = nsd_id
+ nsr.admin_status = "ENABLED"
+ nsr.cloud_account = "openstack"
+
+ param = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter()
+ param.xpath = '/nsd:nsd-catalog/nsd:nsd/nsd:vendor'
+ param.value = "rift-o-matic"
+
+ nsr.input_parameter.append(param)
+
+ return nsr
+
+
+def upload_descriptor(logger, descriptor_file, host="127.0.0.1"):
+ curl_cmd = 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
+ file=descriptor_file,
+ host=host,
+ )
+ logger.debug("Uploading descriptor %s using cmd: %s", descriptor_file, curl_cmd)
+ stdout = subprocess.check_output(shlex.split(curl_cmd), universal_newlines=True)
+
+ json_out = json.loads(stdout)
+ transaction_id = json_out["transaction_id"]
+
+ return transaction_id
+
+
+class DescriptorOnboardError(Exception):
+ pass
+
+
+def wait_unboard_transaction_finished(logger, transaction_id, timeout_secs=600, host="127.0.0.1"):
+ logger.info("Waiting for onboard trans_id %s to complete",
+ transaction_id)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+ r = requests.get(
+ 'http://{host}:4567/api/upload/{t_id}/state'.format(
+ host=host, t_id=transaction_id
+ )
+ )
+ state = r.json()
+ if state["status"] == "pending":
+ time.sleep(1)
+ continue
+
+ elif state["status"] == "success":
+ logger.info("Descriptor onboard was successful")
+ return
+
+ else:
+ raise DescriptorOnboardError(state)
+
+ if state["status"] != "success":
+ raise DescriptorOnboardError(state)
+
+
+@pytest.mark.incremental
+class TestLaunchpadStartStop(object):
+ def test_configure_logging(self, rwlog_mgmt_proxy):
+ logging = RwlogMgmtYang.Logging.from_dict({
+ "console": {
+ "on": True,
+ "filter": {
+ "category": [{
+ "name": "rw-generic",
+ "severity": "error"
+ }],
+ }
+ }
+ })
+ rwlog_mgmt_proxy.merge_config("/rwlog-mgmt:logging", logging)
+
+ def test_configure_cloud_account(self, cloud_proxy, logger):
+ cloud_account = RwCloudYang.CloudAccount()
+ # cloud_account.name = "cloudsim_proxy"
+ # cloud_account.account_type = "cloudsim_proxy"
+ cloud_account.name = "openstack"
+ cloud_account.account_type = "openstack"
+ cloud_account.openstack.key = 'pluto'
+ cloud_account.openstack.secret = 'mypasswd'
+ cloud_account.openstack.auth_url = 'http://10.96.4.2:5000/v3/'
+ cloud_account.openstack.tenant = 'mano1'
+ cloud_account.openstack.mgmt_network = 'private1'
+
+ cloud_proxy.merge_config("/rw-cloud:cloud/account", cloud_account)
+
+ def test_onboard_ping_vnfd(self, logger, vnfd_proxy, ping_vnfd_package_file):
+ logger.info("Onboarding ping_vnfd package: %s", ping_vnfd_package_file)
+ trans_id = upload_descriptor(logger, ping_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 1, "There should only be a single vnfd"
+ vnfd = vnfds[0]
+ assert vnfd.name == "ping_vnfd"
+
+ def test_onboard_pong_vnfd(self, logger, vnfd_proxy, pong_vnfd_package_file):
+ logger.info("Onboarding pong_vnfd package: %s", pong_vnfd_package_file)
+ trans_id = upload_descriptor(logger, pong_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 2, "There should be two vnfds"
+ assert "pong_vnfd" in [vnfds[0].name, vnfds[1].name]
+
+ def test_onboard_ping_pong_nsd(self, logger, nsd_proxy, ping_pong_nsd_package_file):
+ logger.info("Onboarding ping_pong_nsd package: %s", ping_pong_nsd_package_file)
+ trans_id = upload_descriptor(logger, ping_pong_nsd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsds = catalog.nsd
+ assert len(nsds) == 1, "There should only be a single nsd"
+ nsd = nsds[0]
+ assert nsd.name == "ping_pong_nsd"
+
+ def test_instantiate_ping_pong_nsr(self, logger, nsd_proxy, nsr_proxy, rwnsr_proxy, base_proxy):
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsd = catalog.nsd[0]
+
+ nsr = create_nsr_from_nsd_id(nsd.id)
+ rwnsr_proxy.merge_config('/ns-instance-config', nsr)
+
+ nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsrs = nsr_opdata.nsr
+ assert len(nsrs) == 1
+ assert nsrs[0].ns_instance_config_ref == nsr.id
+
+ # logger.info("Waiting up to 30 seconds for ping and pong components to show "
+ # "up in show tasklet info")
+
+ # start_time = time.time()
+ # while (time.time() - start_time) < 30:
+ # vcs_info = base_proxy.get('/vcs/info')
+ # components = vcs_info.components.component_info
+
+ # def find_component_by_name(name):
+ # for component in components:
+ # if name in component.component_name:
+ # return component
+
+ # logger.warning("Did not find %s component name in show tasklet info",
+ # name)
+
+ # return None
+
+ # """
+ # ping_cluster_component = find_component_by_name(
+ # "rw_ping_vnfd:rwping_cluster"
+ # )
+ # if ping_cluster_component is None:
+ # continue
+
+ # pong_cluster_component = find_component_by_name(
+ # "rw_pong_vnfd:rwpong_cluster"
+ # )
+ # if pong_cluster_component is None:
+ # continue
+ # """
+
+ # ping_vm_component = find_component_by_name(
+ # "rw_ping_vnfd:rwping_vm"
+ # )
+ # if ping_vm_component is None:
+ # continue
+
+ # pong_vm_component = find_component_by_name(
+ # "rw_pong_vnfd:rwpong_vm"
+ # )
+ # if pong_vm_component is None:
+ # continue
+
+ # ping_proc_component = find_component_by_name(
+ # "rw_ping_vnfd:rwping_proc"
+ # )
+ # if ping_proc_component is None:
+ # continue
+
+ # pong_proc_component = find_component_by_name(
+ # "rw_pong_vnfd:rwpong_proc"
+ # )
+ # if pong_proc_component is None:
+ # continue
+
+ # ping_tasklet_component = find_component_by_name(
+ # "rw_ping_vnfd:rwping_tasklet"
+ # )
+ # if ping_tasklet_component is None:
+ # continue
+
+ # pong_tasklet_component = find_component_by_name(
+ # "rw_pong_vnfd:rwpong_tasklet"
+ # )
+ # if pong_tasklet_component is None:
+ # continue
+
+ # logger.info("TEST SUCCESSFUL: All ping and pong components were found in show tasklet info")
+ # break
+
+ # else:
+ # assert False, "Did not find all ping and pong component in time"
+
+ #def test_terminate_ping_pong_ns(self, logger, nsd_proxy, nsr_proxy, rwnsr_proxy, base_proxy):
+ # nsr_configs = nsr_proxy.get_config('/ns-instance-config')
+ # nsr = nsr_configs.nsr[0]
+ # nsr_id = nsr.id
+
+ # nsr_configs = nsr_proxy.delete_config("/ns-instance-config/nsr[id='{}']".format(nsr_id))
diff --git a/rwlaunchpad/test/pytest/lp_tg_2vrouter_ts_epa_test.py b/rwlaunchpad/test/pytest/lp_tg_2vrouter_ts_epa_test.py
new file mode 100644
index 0000000..16a8990
--- /dev/null
+++ b/rwlaunchpad/test/pytest/lp_tg_2vrouter_ts_epa_test.py
@@ -0,0 +1,325 @@
+#!/usr/bin/env python
+"""
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+@file lp_3vnfs_test.py
+@author Austin Cormier (Austin.Cormier@riftio.com)
+@date 10/15/2015
+@brief Launchpad Module Test ExtVNF
+"""
+
+import json
+import logging
+import os
+import pytest
+import shlex
+import requests
+import subprocess
+import time
+import uuid
+
+import gi
+gi.require_version('RwIwpYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('RwVnfdYang', '1.0')
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwBaseYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwConmanYang', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+
+
+
+from gi.repository import RwIwpYang, NsdYang, NsrYang, RwNsrYang, VldYang, RwVnfdYang, RwCloudYang, RwBaseYang, RwResourceMgrYang, RwConmanYang, RwNsmYang
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+RW_VROUTER_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/vrouter"
+ )
+RW_TRAFGEN_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/trafgen"
+ )
+RW_TRAFSINK_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/trafsink"
+ )
+RW_TG_2VROUTER_TS_NSD_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/nsds/tg_2vrouter_ts"
+ )
+
+
+class PackageError(Exception):
+ pass
+
+
+def raise_package_error():
+ raise PackageError("Could not find ns packages")
+
+
+@pytest.fixture(scope='module')
+def iwp_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwIwpYang)
+
+@pytest.fixture(scope='module')
+def resource_mgr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwResourceMgrYang)
+
+
+@pytest.fixture(scope='module')
+def cloud_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwCloudYang)
+
+
+@pytest.fixture(scope='module')
+def vnfd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwVnfdYang)
+
+
+@pytest.fixture(scope='module')
+def vld_proxy(request, mgmt_session):
+ return mgmt_session.proxy(VldYang)
+
+
+@pytest.fixture(scope='module')
+def nsd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsdYang)
+
+
+@pytest.fixture(scope='module')
+def nsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsrYang)
+
+
+@pytest.fixture(scope='module')
+def rwnsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsrYang)
+
+
+@pytest.fixture(scope='module')
+def base_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwBaseYang)
+
+@pytest.fixture(scope='module')
+def so_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwConmanYang)
+
+@pytest.fixture(scope='module')
+def nsm_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsmYang)
+
+@pytest.fixture(scope='session')
+def vrouter_vnfd_package_file():
+ vrouter_pkg_file = os.path.join(
+ RW_VROUTER_PKG_INSTALL_DIR,
+ "vrouter_vnfd_with_epa.tar.gz",
+ )
+ if not os.path.exists(vrouter_pkg_file):
+ raise_package_error()
+
+ return vrouter_pkg_file
+
+@pytest.fixture(scope='session')
+def tg_vnfd_package_file():
+ tg_pkg_file = os.path.join(
+ RW_TRAFGEN_PKG_INSTALL_DIR,
+ "trafgen_vnfd_with_epa.tar.gz",
+ )
+ if not os.path.exists(tg_pkg_file):
+ raise_package_error()
+
+ return tg_pkg_file
+
+@pytest.fixture(scope='session')
+def ts_vnfd_package_file():
+ ts_pkg_file = os.path.join(
+ RW_TRAFSINK_PKG_INSTALL_DIR,
+ "trafsink_vnfd_with_epa.tar.gz",
+ )
+ if not os.path.exists(ts_pkg_file):
+ raise_package_error()
+
+ return ts_pkg_file
+
+@pytest.fixture(scope='session')
+def tg_2vrouter_ts_nsd_package_file():
+ tg_2vrouter_ts_nsd_pkg_file = os.path.join(
+ RW_TG_2VROUTER_TS_NSD_PKG_INSTALL_DIR,
+ "tg_2vrouter_ts_nsd_with_epa.tar.gz",
+ )
+ if not os.path.exists(tg_2vrouter_ts_nsd_pkg_file):
+ raise_package_error()
+
+ return tg_2vrouter_ts_nsd_pkg_file
+
+
+def create_nsr_from_nsd_id(nsd_id):
+ nsr = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr.id = str(uuid.uuid4())
+ nsr.name = "TG-2Vrouter-TS EPA"
+ nsr.short_name = "TG-2Vrouter-TS EPA"
+ nsr.description = "4 VNFs with Trafgen, 2 Vrouters and Trafsink EPA"
+ nsr.nsd_ref = nsd_id
+ nsr.admin_status = "ENABLED"
+
+ return nsr
+
+
+def upload_descriptor(logger, descriptor_file, host="127.0.0.1"):
+ curl_cmd = 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
+ file=descriptor_file,
+ host=host,
+ )
+ logger.debug("Uploading descriptor %s using cmd: %s", descriptor_file, curl_cmd)
+ stdout = subprocess.check_output(shlex.split(curl_cmd), universal_newlines=True)
+
+ json_out = json.loads(stdout)
+ transaction_id = json_out["transaction_id"]
+
+ return transaction_id
+
+
+class DescriptorOnboardError(Exception):
+ pass
+
+
+def wait_unboard_transaction_finished(logger, transaction_id, timeout_secs=600, host="127.0.0.1"):
+ logger.info("Waiting for onboard trans_id %s to complete",
+ transaction_id)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+ r = requests.get(
+ 'http://{host}:4567/api/upload/{t_id}/state'.format(
+ host=host, t_id=transaction_id
+ )
+ )
+ state = r.json()
+ if state["status"] == "pending":
+ time.sleep(1)
+ continue
+
+ elif state["status"] == "success":
+ logger.info("Descriptor onboard was successful")
+ return
+
+ else:
+ raise DescriptorOnboardError(state)
+
+ if state["status"] != "success":
+ raise DescriptorOnboardError(state)
+
+@pytest.mark.incremental
+class TestLaunchpadStartStop(object):
+ def test_configure_cloud_account(self, cloud_proxy, logger):
+ cloud_account = RwCloudYang.CloudAccountConfig()
+ #cloud_account.name = "cloudsim_proxy"
+ #cloud_account.account_type = "cloudsim_proxy"
+ cloud_account.name = "riftuser1"
+ cloud_account.account_type = "openstack"
+ cloud_account.openstack.key = 'pluto'
+ cloud_account.openstack.secret = 'mypasswd'
+ cloud_account.openstack.auth_url = 'http://10.66.4.xx:5000/v3/'
+ cloud_account.openstack.tenant = 'demo'
+ cloud_account.openstack.mgmt_network = 'private'
+
+ cloud_proxy.merge_config("/rw-cloud:cloud-account", cloud_account)
+
+ def test_configure_pools(self, resource_mgr_proxy):
+ pools = RwResourceMgrYang.ResourcePools.from_dict({
+ "pools": [{ "name": "vm_pool_a",
+ "resource_type": "compute",
+ "pool_type" : "dynamic"},
+ {"name": "network_pool_a",
+ "resource_type": "network",
+ "pool_type" : "dynamic",}]})
+
+ resource_mgr_proxy.merge_config('/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools', pools)
+
+ def test_configure_resource_orchestrator(self, so_proxy):
+ cfg = RwConmanYang.RoEndpoint.from_dict({'ro_ip_address': '127.0.0.1',
+ 'ro_port' : 2022,
+ 'ro_username' : 'admin',
+ 'ro_password' : 'admin'})
+ so_proxy.merge_config('/rw-conman:cm-config', cfg)
+
+ def test_configure_service_orchestrator(self, nsm_proxy):
+ cfg = RwNsmYang.SoEndpoint.from_dict({'cm_ip_address': '127.0.0.1',
+ 'cm_port' : 2022,
+ 'cm_username' : 'admin',
+ 'cm_password' : 'admin'})
+ nsm_proxy.merge_config('/rw-nsm:ro-config/rw-nsm:cm-endpoint', cfg)
+
+
+ def test_onboard_tg_vnfd(self, logger, vnfd_proxy, tg_vnfd_package_file):
+ logger.info("Onboarding trafgen_vnfd package: %s", tg_vnfd_package_file)
+ trans_id = upload_descriptor(logger, tg_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 1, "There should be one vnfds"
+ assert "trafgen_vnfd" in [vnfds[0].name]
+
+ def test_onboard_vrouter_vnfd(self, logger, vnfd_proxy, vrouter_vnfd_package_file):
+ logger.info("Onboarding vrouter_vnfd package: %s", vrouter_vnfd_package_file)
+ trans_id = upload_descriptor(logger, vrouter_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 2, "There should be two vnfds"
+ assert "vrouter_vnfd" in [vnfds[0].name, vnfds[1].name]
+
+ def test_onboard_ts_vnfd(self, logger, vnfd_proxy, ts_vnfd_package_file):
+ logger.info("Onboarding trafsink_vnfd package: %s", ts_vnfd_package_file)
+ trans_id = upload_descriptor(logger, ts_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 3, "There should be three vnfds"
+ assert "trafsink_vnfd" in [vnfds[0].name, vnfds[1].name, vnfds[2].name]
+
+ def test_onboard_tg_2vrouter_ts_nsd(self, logger, nsd_proxy, tg_2vrouter_ts_nsd_package_file):
+ logger.info("Onboarding tg_2vrouter_ts nsd package: %s", tg_2vrouter_ts_nsd_package_file)
+ trans_id = upload_descriptor(logger, tg_2vrouter_ts_nsd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsds = catalog.nsd
+ assert len(nsds) == 1, "There should only be a single nsd"
+ nsd = nsds[0]
+ assert nsd.name == "tg_vrouter_ts_nsd"
+ assert nsd.short_name == "tg_2vrouter_ts_nsd"
+
+ def test_instantiate_tg_2vrouter_ts_nsr(self, logger, nsd_proxy, nsr_proxy, rwnsr_proxy, base_proxy):
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsd = catalog.nsd[0]
+
+ nsr = create_nsr_from_nsd_id(nsd.id)
+ nsr_proxy.merge_config('/ns-instance-config', nsr)
+
+ nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsrs = nsr_opdata.nsr
+ assert len(nsrs) == 1
+ assert nsrs[0].ns_instance_config_ref == nsr.id
+
+
diff --git a/rwlaunchpad/test/pytest/lp_tg_2vrouter_ts_test.py b/rwlaunchpad/test/pytest/lp_tg_2vrouter_ts_test.py
new file mode 100644
index 0000000..ed00a25
--- /dev/null
+++ b/rwlaunchpad/test/pytest/lp_tg_2vrouter_ts_test.py
@@ -0,0 +1,325 @@
+#!/usr/bin/env python
+"""
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+@file lp_3vnfs_test.py
+@author Austin Cormier (Austin.Cormier@riftio.com)
+@date 10/15/2015
+@brief Launchpad Module Test ExtVNF
+"""
+
+import json
+import logging
+import os
+import pytest
+import shlex
+import requests
+import subprocess
+import time
+import uuid
+
+import gi
+gi.require_version('RwIwpYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('RwVnfdYang', '1.0')
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwBaseYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwConmanYang', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+
+
+
+from gi.repository import RwIwpYang, NsdYang, NsrYang, RwNsrYang, VldYang, RwVnfdYang, RwCloudYang, RwBaseYang, RwResourceMgrYang, RwConmanYang, RwNsmYang
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+RW_VROUTER_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/vrouter"
+ )
+RW_TRAFGEN_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/trafgen"
+ )
+RW_TRAFSINK_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/trafsink"
+ )
+RW_TG_2VROUTER_TS_NSD_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/nsds/tg_2vrouter_ts"
+ )
+
+
+class PackageError(Exception):
+ pass
+
+
+def raise_package_error():
+ raise PackageError("Could not find ns packages")
+
+
+@pytest.fixture(scope='module')
+def iwp_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwIwpYang)
+
+@pytest.fixture(scope='module')
+def resource_mgr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwResourceMgrYang)
+
+
+@pytest.fixture(scope='module')
+def cloud_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwCloudYang)
+
+
+@pytest.fixture(scope='module')
+def vnfd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwVnfdYang)
+
+
+@pytest.fixture(scope='module')
+def vld_proxy(request, mgmt_session):
+ return mgmt_session.proxy(VldYang)
+
+
+@pytest.fixture(scope='module')
+def nsd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsdYang)
+
+
+@pytest.fixture(scope='module')
+def nsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsrYang)
+
+
+@pytest.fixture(scope='module')
+def rwnsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsrYang)
+
+
+@pytest.fixture(scope='module')
+def base_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwBaseYang)
+
+@pytest.fixture(scope='module')
+def so_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwConmanYang)
+
+@pytest.fixture(scope='module')
+def nsm_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsmYang)
+
+@pytest.fixture(scope='session')
+def vrouter_vnfd_package_file():
+ vrouter_pkg_file = os.path.join(
+ RW_VROUTER_PKG_INSTALL_DIR,
+ "vrouter_vnfd.tar.gz",
+ )
+ if not os.path.exists(vrouter_pkg_file):
+ raise_package_error()
+
+ return vrouter_pkg_file
+
+@pytest.fixture(scope='session')
+def tg_vnfd_package_file():
+ tg_pkg_file = os.path.join(
+ RW_TRAFGEN_PKG_INSTALL_DIR,
+ "trafgen_vnfd.tar.gz",
+ )
+ if not os.path.exists(tg_pkg_file):
+ raise_package_error()
+
+ return tg_pkg_file
+
+@pytest.fixture(scope='session')
+def ts_vnfd_package_file():
+ ts_pkg_file = os.path.join(
+ RW_TRAFSINK_PKG_INSTALL_DIR,
+ "trafsink_vnfd.tar.gz",
+ )
+ if not os.path.exists(ts_pkg_file):
+ raise_package_error()
+
+ return ts_pkg_file
+
+@pytest.fixture(scope='session')
+def tg_2vrouter_ts_nsd_package_file():
+ tg_2vrouter_ts_nsd_pkg_file = os.path.join(
+ RW_TG_2VROUTER_TS_NSD_PKG_INSTALL_DIR,
+ "tg_2vrouter_ts_nsd.tar.gz",
+ )
+ if not os.path.exists(tg_2vrouter_ts_nsd_pkg_file):
+ raise_package_error()
+
+ return tg_2vrouter_ts_nsd_pkg_file
+
+
+def create_nsr_from_nsd_id(nsd_id):
+ nsr = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr.id = str(uuid.uuid4())
+ nsr.name = "TG-2Vrouter-TS EPA"
+ nsr.short_name = "TG-2Vrouter-TS EPA"
+ nsr.description = "4 VNFs with Trafgen, 2 Vrouters and Trafsink EPA"
+ nsr.nsd_ref = nsd_id
+ nsr.admin_status = "ENABLED"
+
+ return nsr
+
+
+def upload_descriptor(logger, descriptor_file, host="127.0.0.1"):
+ curl_cmd = 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
+ file=descriptor_file,
+ host=host,
+ )
+ logger.debug("Uploading descriptor %s using cmd: %s", descriptor_file, curl_cmd)
+ stdout = subprocess.check_output(shlex.split(curl_cmd), universal_newlines=True)
+
+ json_out = json.loads(stdout)
+ transaction_id = json_out["transaction_id"]
+
+ return transaction_id
+
+
+class DescriptorOnboardError(Exception):
+ pass
+
+
+def wait_unboard_transaction_finished(logger, transaction_id, timeout_secs=600, host="127.0.0.1"):
+ logger.info("Waiting for onboard trans_id %s to complete",
+ transaction_id)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+ r = requests.get(
+ 'http://{host}:4567/api/upload/{t_id}/state'.format(
+ host=host, t_id=transaction_id
+ )
+ )
+ state = r.json()
+ if state["status"] == "pending":
+ time.sleep(1)
+ continue
+
+ elif state["status"] == "success":
+ logger.info("Descriptor onboard was successful")
+ return
+
+ else:
+ raise DescriptorOnboardError(state)
+
+ if state["status"] != "success":
+ raise DescriptorOnboardError(state)
+
+@pytest.mark.incremental
+class TestLaunchpadStartStop(object):
+ def test_configure_cloud_account(self, cloud_proxy, logger):
+ cloud_account = RwCloudYang.CloudAccountConfig()
+ #cloud_account.name = "cloudsim_proxy"
+ #cloud_account.account_type = "cloudsim_proxy"
+ cloud_account.name = "riftuser1"
+ cloud_account.account_type = "openstack"
+ cloud_account.openstack.key = 'pluto'
+ cloud_account.openstack.secret = 'mypasswd'
+ cloud_account.openstack.auth_url = 'http://10.66.4.xx:5000/v3/'
+ cloud_account.openstack.tenant = 'demo'
+ cloud_account.openstack.mgmt_network = 'private'
+
+ cloud_proxy.merge_config("/rw-cloud:cloud-account", cloud_account)
+
+ def test_configure_pools(self, resource_mgr_proxy):
+ pools = RwResourceMgrYang.ResourcePools.from_dict({
+ "pools": [{ "name": "vm_pool_a",
+ "resource_type": "compute",
+ "pool_type" : "dynamic"},
+ {"name": "network_pool_a",
+ "resource_type": "network",
+ "pool_type" : "dynamic",}]})
+
+ resource_mgr_proxy.merge_config('/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools', pools)
+
+ def test_configure_resource_orchestrator(self, so_proxy):
+ cfg = RwConmanYang.RoEndpoint.from_dict({'ro_ip_address': '127.0.0.1',
+ 'ro_port' : 2022,
+ 'ro_username' : 'admin',
+ 'ro_password' : 'admin'})
+ so_proxy.merge_config('/rw-conman:cm-config', cfg)
+
+ def test_configure_service_orchestrator(self, nsm_proxy):
+ cfg = RwNsmYang.SoEndpoint.from_dict({'cm_ip_address': '127.0.0.1',
+ 'cm_port' : 2022,
+ 'cm_username' : 'admin',
+ 'cm_password' : 'admin'})
+ nsm_proxy.merge_config('/rw-nsm:ro-config/rw-nsm:cm-endpoint', cfg)
+
+
+ def test_onboard_tg_vnfd(self, logger, vnfd_proxy, tg_vnfd_package_file):
+ logger.info("Onboarding trafgen_vnfd package: %s", tg_vnfd_package_file)
+ trans_id = upload_descriptor(logger, tg_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 1, "There should be one vnfds"
+ assert "trafgen_vnfd" in [vnfds[0].name]
+
+ def test_onboard_vrouter_vnfd(self, logger, vnfd_proxy, vrouter_vnfd_package_file):
+ logger.info("Onboarding vrouter_vnfd package: %s", vrouter_vnfd_package_file)
+ trans_id = upload_descriptor(logger, vrouter_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 2, "There should be two vnfds"
+ assert "vrouter_vnfd" in [vnfds[0].name, vnfds[1].name]
+
+ def test_onboard_ts_vnfd(self, logger, vnfd_proxy, ts_vnfd_package_file):
+ logger.info("Onboarding trafsink_vnfd package: %s", ts_vnfd_package_file)
+ trans_id = upload_descriptor(logger, ts_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 3, "There should be three vnfds"
+ assert "trafsink_vnfd" in [vnfds[0].name, vnfds[1].name, vnfds[2].name]
+
+ def test_onboard_tg_2vrouter_ts_nsd(self, logger, nsd_proxy, tg_2vrouter_ts_nsd_package_file):
+ logger.info("Onboarding tg_2vrouter_ts nsd package: %s", tg_2vrouter_ts_nsd_package_file)
+ trans_id = upload_descriptor(logger, tg_2vrouter_ts_nsd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsds = catalog.nsd
+ assert len(nsds) == 1, "There should only be a single nsd"
+ nsd = nsds[0]
+ assert nsd.name == "tg_vrouter_ts_nsd"
+ assert nsd.short_name == "tg_2vrouter_ts_nsd"
+
+ def test_instantiate_tg_2vrouter_ts_nsr(self, logger, nsd_proxy, nsr_proxy, rwnsr_proxy, base_proxy):
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsd = catalog.nsd[0]
+
+ nsr = create_nsr_from_nsd_id(nsd.id)
+ nsr_proxy.merge_config('/ns-instance-config', nsr)
+
+ nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsrs = nsr_opdata.nsr
+ assert len(nsrs) == 1
+ assert nsrs[0].ns_instance_config_ref == nsr.id
+
+
diff --git a/rwlaunchpad/test/pytest/lp_tg_vrouter_ts_epa_sriov_test.py b/rwlaunchpad/test/pytest/lp_tg_vrouter_ts_epa_sriov_test.py
new file mode 100644
index 0000000..4d6e345
--- /dev/null
+++ b/rwlaunchpad/test/pytest/lp_tg_vrouter_ts_epa_sriov_test.py
@@ -0,0 +1,323 @@
+#!/usr/bin/env python
+"""
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+@file lp_3vnfs_test.py
+@author Austin Cormier (Austin.Cormier@riftio.com)
+@date 10/15/2015
+@brief Launchpad Module Test ExtVNF
+"""
+
+import json
+import logging
+import os
+import pytest
+import shlex
+import requests
+import subprocess
+import time
+import uuid
+
+import gi
+gi.require_version('RwIwpYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('RwVnfdYang', '1.0')
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwBaseYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwConmanYang', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+
+
+from gi.repository import RwIwpYang, NsdYang, NsrYang, RwNsrYang, VldYang, RwVnfdYang, RwCloudYang, RwBaseYang, RwResourceMgrYang, RwConmanYang, RwNsmYang
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+RW_VROUTER_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/vrouter"
+ )
+RW_TRAFGEN_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/trafgen"
+ )
+RW_TRAFSINK_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/vnfds/trafsink"
+ )
+RW_TG_VROUTER_TS_NSD_PKG_INSTALL_DIR = os.path.join(
+ os.environ["RIFT_INSTALL"],
+ "usr/rift/mano/nsds/tg_vrouter_ts"
+ )
+
+
+class PackageError(Exception):
+ pass
+
+
+def raise_package_error():
+ raise PackageError("Could not find ns packages")
+
+
+@pytest.fixture(scope='module')
+def iwp_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwIwpYang)
+
+@pytest.fixture(scope='module')
+def resource_mgr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwResourceMgrYang)
+
+
+@pytest.fixture(scope='module')
+def cloud_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwCloudYang)
+
+
+@pytest.fixture(scope='module')
+def vnfd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwVnfdYang)
+
+
+@pytest.fixture(scope='module')
+def vld_proxy(request, mgmt_session):
+ return mgmt_session.proxy(VldYang)
+
+
+@pytest.fixture(scope='module')
+def nsd_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsdYang)
+
+
+@pytest.fixture(scope='module')
+def nsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(NsrYang)
+
+
+@pytest.fixture(scope='module')
+def rwnsr_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsrYang)
+
+
+@pytest.fixture(scope='module')
+def base_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwBaseYang)
+
+@pytest.fixture(scope='module')
+def so_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwConmanYang)
+
+@pytest.fixture(scope='module')
+def nsm_proxy(request, mgmt_session):
+ return mgmt_session.proxy(RwNsmYang)
+
+@pytest.fixture(scope='session')
+def vrouter_vnfd_package_file():
+ vrouter_pkg_file = os.path.join(
+ RW_VROUTER_PKG_INSTALL_DIR,
+ "vrouter_vnfd_with_epa_sriov.tar.gz",
+ )
+ if not os.path.exists(vrouter_pkg_file):
+ raise_package_error()
+
+ return vrouter_pkg_file
+
+@pytest.fixture(scope='session')
+def tg_vnfd_package_file():
+ tg_pkg_file = os.path.join(
+ RW_TRAFGEN_PKG_INSTALL_DIR,
+ "trafgen_vnfd_with_epa_sriov.tar.gz",
+ )
+ if not os.path.exists(tg_pkg_file):
+ raise_package_error()
+
+ return tg_pkg_file
+
+@pytest.fixture(scope='session')
+def ts_vnfd_package_file():
+ ts_pkg_file = os.path.join(
+ RW_TRAFSINK_PKG_INSTALL_DIR,
+ "trafsink_vnfd_with_epa_sriov.tar.gz",
+ )
+ if not os.path.exists(ts_pkg_file):
+ raise_package_error()
+
+ return ts_pkg_file
+
+@pytest.fixture(scope='session')
+def tg_vrouter_ts_nsd_package_file():
+ tg_vrouter_ts_nsd_pkg_file = os.path.join(
+ RW_TG_VROUTER_TS_NSD_PKG_INSTALL_DIR,
+ "tg_vrouter_ts_nsd_with_epa_sriov.tar.gz",
+ )
+ if not os.path.exists(tg_vrouter_ts_nsd_pkg_file):
+ raise_package_error()
+
+ return tg_vrouter_ts_nsd_pkg_file
+
+
+def create_nsr_from_nsd_id(nsd_id):
+ nsr = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr.id = str(uuid.uuid4())
+ nsr.name = "TG-Vrouter-TS-EPA-SRIOV"
+ nsr.short_name = "TG-Vrouter-TS-EPA-SRIOV"
+ nsr.description = "3 VNFs with Trafgen, Vrouter and Trafsink EPA SRIOV"
+ nsr.nsd_ref = nsd_id
+ nsr.admin_status = "ENABLED"
+
+ return nsr
+
+
+def upload_descriptor(logger, descriptor_file, host="127.0.0.1"):
+ curl_cmd = 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
+ file=descriptor_file,
+ host=host,
+ )
+ logger.debug("Uploading descriptor %s using cmd: %s", descriptor_file, curl_cmd)
+ stdout = subprocess.check_output(shlex.split(curl_cmd), universal_newlines=True)
+
+ json_out = json.loads(stdout)
+ transaction_id = json_out["transaction_id"]
+
+ return transaction_id
+
+
+class DescriptorOnboardError(Exception):
+ pass
+
+
+def wait_unboard_transaction_finished(logger, transaction_id, timeout_secs=600, host="127.0.0.1"):
+ logger.info("Waiting for onboard trans_id %s to complete",
+ transaction_id)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+ r = requests.get(
+ 'http://{host}:4567/api/upload/{t_id}/state'.format(
+ host=host, t_id=transaction_id
+ )
+ )
+ state = r.json()
+ if state["status"] == "pending":
+ time.sleep(1)
+ continue
+
+ elif state["status"] == "success":
+ logger.info("Descriptor onboard was successful")
+ return
+
+ else:
+ raise DescriptorOnboardError(state)
+
+ if state["status"] != "success":
+ raise DescriptorOnboardError(state)
+
+@pytest.mark.incremental
+class TestLaunchpadStartStop(object):
+ def test_configure_cloud_account(self, cloud_proxy, logger):
+ cloud_account = RwCloudYang.CloudAccountConfig()
+ #cloud_account.name = "cloudsim_proxy"
+ #cloud_account.account_type = "cloudsim_proxy"
+ cloud_account.name = "riftuser1"
+ cloud_account.account_type = "openstack"
+ cloud_account.openstack.key = 'pluto'
+ cloud_account.openstack.secret = 'mypasswd'
+ cloud_account.openstack.auth_url = 'http://10.66.4.xx:5000/v3/'
+ cloud_account.openstack.tenant = 'demo'
+ cloud_account.openstack.mgmt_network = 'private'
+
+ cloud_proxy.merge_config("/rw-cloud:cloud-account", cloud_account)
+
+ def test_configure_pools(self, resource_mgr_proxy):
+ pools = RwResourceMgrYang.ResourcePools.from_dict({
+ "pools": [{ "name": "vm_pool_a",
+ "resource_type": "compute",
+ "pool_type" : "dynamic"},
+ {"name": "network_pool_a",
+ "resource_type": "network",
+ "pool_type" : "dynamic",}]})
+
+ resource_mgr_proxy.merge_config('/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools', pools)
+
+ def test_configure_resource_orchestrator(self, so_proxy):
+ cfg = RwConmanYang.RoEndpoint.from_dict({'ro_ip_address': '127.0.0.1',
+ 'ro_port' : 2022,
+ 'ro_username' : 'admin',
+ 'ro_password' : 'admin'})
+ so_proxy.merge_config('/rw-conman:cm-config', cfg)
+
+ def test_configure_service_orchestrator(self, nsm_proxy):
+ cfg = RwNsmYang.SoEndpoint.from_dict({'cm_ip_address': '127.0.0.1',
+ 'cm_port' : 2022,
+ 'cm_username' : 'admin',
+ 'cm_password' : 'admin'})
+ nsm_proxy.merge_config('/rw-nsm:ro-config/rw-nsm:cm-endpoint', cfg)
+
+
+ def test_onboard_tg_vnfd(self, logger, vnfd_proxy, tg_vnfd_package_file):
+ logger.info("Onboarding trafgen_vnfd package: %s", tg_vnfd_package_file)
+ trans_id = upload_descriptor(logger, tg_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 1, "There should be one vnfds"
+ assert "trafgen_vnfd" in [vnfds[0].name]
+
+ def test_onboard_vrouter_vnfd(self, logger, vnfd_proxy, vrouter_vnfd_package_file):
+ logger.info("Onboarding vrouter_vnfd package: %s", vrouter_vnfd_package_file)
+ trans_id = upload_descriptor(logger, vrouter_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 2, "There should be two vnfds"
+ assert "vrouter_vnfd" in [vnfds[0].name, vnfds[1].name]
+
+ def test_onboard_ts_vnfd(self, logger, vnfd_proxy, ts_vnfd_package_file):
+ logger.info("Onboarding trafsink_vnfd package: %s", ts_vnfd_package_file)
+ trans_id = upload_descriptor(logger, ts_vnfd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ vnfds = catalog.vnfd
+ assert len(vnfds) == 3, "There should be three vnfds"
+ assert "trafsink_vnfd" in [vnfds[0].name, vnfds[1].name, vnfds[2].name]
+
+ def test_onboard_tg_vrouter_ts_nsd(self, logger, nsd_proxy, tg_vrouter_ts_nsd_package_file):
+ logger.info("Onboarding tg_vrouter_ts nsd package: %s", tg_vrouter_ts_nsd_package_file)
+ trans_id = upload_descriptor(logger, tg_vrouter_ts_nsd_package_file)
+ wait_unboard_transaction_finished(logger, trans_id)
+
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsds = catalog.nsd
+ assert len(nsds) == 1, "There should only be a single nsd"
+ nsd = nsds[0]
+ assert nsd.name == "tg_vrouter_ts_nsd"
+
+ def test_instantiate_tg_vrouter_ts_nsr(self, logger, nsd_proxy, nsr_proxy, rwnsr_proxy, base_proxy):
+ catalog = nsd_proxy.get_config('/nsd-catalog')
+ nsd = catalog.nsd[0]
+
+ nsr = create_nsr_from_nsd_id(nsd.id)
+ nsr_proxy.merge_config('/ns-instance-config', nsr)
+
+ nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsrs = nsr_opdata.nsr
+ assert len(nsrs) == 1
+ assert nsrs[0].ns_instance_config_ref == nsr.id
+
+
diff --git a/rwlaunchpad/test/racfg/lprecovery_test.racfg b/rwlaunchpad/test/racfg/lprecovery_test.racfg
new file mode 100644
index 0000000..43e07aa
--- /dev/null
+++ b/rwlaunchpad/test/racfg/lprecovery_test.racfg
@@ -0,0 +1,19 @@
+{
+ "test_name":"TC_LPRECOVERY_TEST",
+ "commandline":"./launchpad_recovery",
+ "target_vm":"VM",
+ "test_description":"Test targeting launchpad recovery feature",
+ "run_as_root": true,
+ "status":"broken",
+ "keywords":["nightly","smoke"],
+ "timelimit": 4800,
+ "networks":[],
+ "vms":[
+ {
+ "name": "VM",
+ "memory": 8192,
+ "cpus": 2
+ }
+ ]
+}
+
diff --git a/rwlaunchpad/test/tosca_ut.py b/rwlaunchpad/test/tosca_ut.py
new file mode 100755
index 0000000..40efe41
--- /dev/null
+++ b/rwlaunchpad/test/tosca_ut.py
@@ -0,0 +1,183 @@
+#!/usr/bin/env python3
+
+############################################################################
+# Copyright 2016 RIFT.io Inc #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); #
+# you may not use this file except in compliance with the License. #
+# You may obtain a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+############################################################################
+
+import argparse
+import logging
+import os
+import shutil
+import sys
+import tarfile
+import tempfile
+import unittest
+import xmlrunner
+
+import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
+
+from rift.mano.utils.compare_desc import CompareDescShell
+
+from rift.tasklets.rwlaunchpad.tosca import ExportTosca
+from rift.tasklets.rwlaunchpad.tosca import ImportTosca
+
+from rift.package.package import TarPackageArchive
+
+class PingPongDescriptors(object):
+ def __init__(self):
+ ping_vnfd, pong_vnfd, nsd = \
+ ping_pong_nsd.generate_ping_pong_descriptors(
+ pingcount=1,
+ external_vlr_count=1,
+ internal_vlr_count=0,
+ num_vnf_vms=1,
+ ping_md5sum='1234567890abcdefg',
+ pong_md5sum='1234567890abcdefg',
+ mano_ut=False,
+ use_scale_group=True,
+ use_mon_params=True,
+ use_placement_group=False,
+ use_ns_init_conf=False,
+ )
+ self.ping_pong_nsd = nsd.descriptor.nsd[0]
+ self.ping_vnfd = ping_vnfd.descriptor.vnfd[0]
+ self.pong_vnfd = pong_vnfd.descriptor.vnfd[0]
+
+
+class ToscaTestCase(unittest.TestCase):
+ """ Unittest for YANG to TOSCA and back translations
+
+ This generates the Ping Pong descrptors using the script
+ in examles and then converts it to TOSCA and back to YANG.
+ """
+ default_timeout = 0
+ top_dir = __file__[:__file__.find('/modules/core/')]
+ log_level = logging.WARN
+ log = None
+
+ @classmethod
+ def setUpClass(cls):
+ fmt = logging.Formatter(
+ '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
+ stderr_handler = logging.StreamHandler(stream=sys.stderr)
+ stderr_handler.setFormatter(fmt)
+ logging.basicConfig(level=cls.log_level)
+ cls.log = logging.getLogger('tosca-ut')
+ cls.log.addHandler(stderr_handler)
+
+ def setUp(self):
+ """Run before each test method to initialize test environment."""
+
+ super(ToscaTestCase, self).setUp()
+ self.output_dir = tempfile.mkdtemp()
+
+ def compare_dict(self, gen_d, exp_d):
+ gen = "--generated="+str(gen_d)
+ exp = "--expected="+str(exp_d)
+ CompareDescShell.compare_dicts(gen, exp, log=self.log)
+
+ def yang_to_tosca(self, descs):
+ """Convert YANG model to TOSCA model"""
+ pkg = ExportTosca(self.log)
+ nsd_id = pkg.add_nsd(descs.ping_pong_nsd)
+ pkg.add_vnfd(nsd_id, descs.ping_vnfd)
+ pkg.add_vnfd(nsd_id, descs.pong_vnfd)
+
+ return pkg.create_archive('ping_pong_nsd', self.output_dir)
+
+ def tosca_to_yang(self, tosca_file):
+ """Convert TOSCA model to YANG model"""
+ if ImportTosca.is_tosca_package(tosca_file):
+ # This could be a tosca package, try processing
+ tosca = ImportTosca(self.log, tosca_file, out_dir=self.output_dir)
+ files = tosca.translate()
+ if files is None or len(files) < 3:
+ raise ValueError("Could not process as a "
+ "TOSCA package {}: {}".format(tosca_file, files))
+ else:
+ self.log.info("Tosca package was translated successfully")
+ return files
+ else:
+ raise ValueError("Not a valid TOSCA archive: {}".
+ format(tosca_file))
+
+ def compare_descs(self, descs, yang_files):
+ """Compare the sescriptors generated with original"""
+ for yang_file in yang_files:
+ if tarfile.is_tarfile(yang_file):
+ with open(yang_file, "r+b") as tar:
+ archive = TarPackageArchive(self.log, tar)
+ pkg = archive.create_package()
+ desc_type = pkg.descriptor_type
+ if desc_type == 'nsd':
+ nsd_yang = pkg.descriptor_msg.as_dict()
+ self.compare_dict(nsd_yang,
+ descs.ping_pong_nsd.as_dict())
+ elif desc_type == 'vnfd':
+ vnfd_yang = pkg.descriptor_msg.as_dict()
+ if 'ping_vnfd' == vnfd_yang['name']:
+ self.compare_dict(vnfd_yang,
+ descs.ping_vnfd.as_dict())
+ elif 'pong_vnfd' == vnfd_yang['name']:
+ self.compare_dict(vnfd_yang,
+ descs.pong_vnfd.as_dict())
+ else:
+ raise Exception("Unknown descriptor type {} found: {}".
+ format(desc_type, pkg.files))
+ else:
+ raise Exception("Did not find a valid tar file for yang model: {}".
+ format(yang_file))
+
+ def test_output(self):
+ try:
+ # Generate the Ping Pong descriptors
+ descs = PingPongDescriptors()
+
+ # Translate the descriptors to TOSCA
+ tosca_file = self.yang_to_tosca(descs)
+
+ # Now translate back to YANG
+ yang_files = self.tosca_to_yang(tosca_file)
+
+ # Compare the generated YANG to original
+ self.compare_descs(descs, yang_files)
+
+ # Removing temp dir only on success to allow debug in case of failures
+ if self.output_dir is not None:
+ shutil.rmtree(self.output_dir)
+ self.output_dir = None
+
+ except Exception as e:
+ self.log.exception(e)
+ self.fail("Exception {}".format(e))
+
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('-n', '--no-runner', action='store_true')
+ args, unittest_args = parser.parse_known_args()
+ if args.no_runner:
+ runner = None
+ else:
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ ToscaTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+ main()
diff --git a/rwlaunchpad/test/utest_nsr_handler.py b/rwlaunchpad/test/utest_nsr_handler.py
new file mode 100755
index 0000000..ffab929
--- /dev/null
+++ b/rwlaunchpad/test/utest_nsr_handler.py
@@ -0,0 +1,485 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import time
+import unittest
+import uuid
+
+import xmlrunner
+
+import gi.repository.RwDts as rwdts
+import gi.repository.RwNsmYang as rwnsmyang
+import gi.repository.NsrYang as NsrYang
+import gi.repository.RwNsrYang as RwNsrYang
+import gi.repository.RwTypes as RwTypes
+import gi.repository.ProtobufC as ProtobufC
+import gi.repository.RwResourceMgrYang as RwResourceMgrYang
+import gi.repository.RwLaunchpadYang as launchpadyang
+import rift.tasklets
+import rift.test.dts
+
+import mano_ut
+
+
+if sys.version_info < (3, 4, 4):
+ asyncio.ensure_future = asyncio.async
+
+
+class NsrDtsHandler(object):
+ """ The network service DTS handler """
+ NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
+ SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
+
+ def __init__(self, dts, log, loop, nsm):
+ self._dts = dts
+ self._log = log
+ self._loop = loop
+ self._nsm = nsm
+
+ self._nsr_regh = None
+ self._scale_regh = None
+
+ @property
+ def nsm(self):
+ """ Return the NS manager instance """
+ return self._nsm
+
+ def get_scale_group_instances(self, nsr_id, group_name):
+ def nsr_id_from_keyspec(ks):
+ nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
+ nsr_id = nsr_path_entry.key00.id
+ return nsr_id
+
+ def group_name_from_keyspec(ks):
+ group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
+ group_name = group_path_entry.key00.scaling_group_name_ref
+ return group_name
+
+
+ xact_ids = set()
+ for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ if elem_nsr_id != nsr_id:
+ continue
+
+ elem_group_name = group_name_from_keyspec(keyspec)
+ if elem_group_name != group_name:
+ continue
+
+ xact_ids.add(instance_cfg.id)
+
+ return xact_ids
+
+ @asyncio.coroutine
+ def register(self):
+ """ Register for Nsr create/update/delete/read requests from dts """
+
+ def nsr_id_from_keyspec(ks):
+ nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
+ nsr_id = nsr_path_entry.key00.id
+ return nsr_id
+
+ def group_name_from_keyspec(ks):
+ group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
+ group_name = group_path_entry.key00.scaling_group_name_ref
+ return group_name
+
+ def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
+ """ Return boolean indicating if scaling group instance was already commited previously.
+
+ By looking at the existing elements in this registration handle (elements not part
+ of this current xact), we can tell if the instance was configured previously without
+ keeping any application state.
+ """
+ for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ elem_group_name = group_name_from_keyspec(keyspec)
+
+ if elem_nsr_id != nsr_id or group_name != elem_group_name:
+ continue
+
+ if instance_cfg.id == instance_id:
+ return True
+
+ return False
+
+ def get_scale_group_instance_delta(nsr_id, group_name, xact):
+
+ #1. Find all elements in the transaction add to the "added"
+ #2. Find matching elements in current elements, remove from "added".
+ #3. Find elements only in current, add to "deleted"
+
+ xact_ids = set()
+ for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ if elem_nsr_id != nsr_id:
+ continue
+
+ elem_group_name = group_name_from_keyspec(keyspec)
+ if elem_group_name != group_name:
+ continue
+
+ xact_ids.add(instance_cfg.id)
+
+ current_ids = set()
+ for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
+ elem_nsr_id = nsr_id_from_keyspec(keyspec)
+ if elem_nsr_id != nsr_id:
+ continue
+
+ elem_group_name = group_name_from_keyspec(keyspec)
+ if elem_group_name != group_name:
+ continue
+
+ current_ids.add(instance_cfg.id)
+
+ delta = {
+ "added": xact_ids - current_ids,
+ "deleted": current_ids - xact_ids
+ }
+ return delta
+
+ def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
+ # Unforunately, it is currently difficult to figure out what has exactly
+ # changed in this xact without Pbdelta support (RIFT-4916)
+ # As a workaround, we can fetch the pre and post xact elements and
+ # perform a comparison to figure out adds/deletes/updates
+ xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
+ curr_cfgs = list(dts_member_reg.elements)
+
+ xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
+ curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
+
+ # Find Adds
+ added_keys = set(xact_key_map) - set(curr_key_map)
+ added_cfgs = [xact_key_map[key] for key in added_keys]
+
+ # Find Deletes
+ deleted_keys = set(curr_key_map) - set(xact_key_map)
+ deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
+
+ # Find Updates
+ updated_keys = set(curr_key_map) & set(xact_key_map)
+ updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
+
+ return added_cfgs, deleted_cfgs, updated_cfgs
+
+ def on_apply(dts, acg, xact, action, scratch):
+ """Apply the configuration"""
+ def handle_create_nsr(msg):
+ # Handle create nsr requests """
+ # Do some validations
+ if not msg.has_field("nsd_ref"):
+ err = "NSD reference not provided"
+ self._log.error(err)
+ raise NetworkServiceRecordError(err)
+
+ self._log.info("Creating NetworkServiceRecord %s from nsd_id %s",
+ msg.id, msg.nsd_ref)
+
+ #nsr = self.nsm.create_nsr(msg)
+ return nsr
+
+ def handle_delete_nsr(msg):
+ @asyncio.coroutine
+ def delete_instantiation(ns_id):
+ """ Delete instantiation """
+ pass
+ #with self._dts.transaction() as xact:
+ #yield from self._nsm.terminate_ns(ns_id, xact)
+
+ # Handle delete NSR requests
+ self._log.info("Delete req for NSR Id: %s received", msg.id)
+ # Terminate the NSR instance
+ #nsr = self._nsm.get_ns_by_nsr_id(msg.id)
+
+ #nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
+ #event_descr = "Terminate rcvd for NS Id:%s" % msg.id
+ #nsr.record_event("terminate-rcvd", event_descr)
+
+ #self._loop.create_task(delete_instantiation(msg.id))
+
+ @asyncio.coroutine
+ def begin_instantiation(nsr):
+ # Begin instantiation
+ pass
+ #self._log.info("Beginning NS instantiation: %s", nsr.id)
+ #yield from self._nsm.instantiate_ns(nsr.id, xact)
+
+ self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
+ xact, action, scratch)
+
+ if action == rwdts.AppconfAction.INSTALL and xact.id is None:
+ self._log.debug("No xact handle. Skipping apply config")
+ xact = None
+
+ (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, "id")
+
+ for msg in added_msgs:
+ self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
+ #if msg.id not in self._nsm.nsrs:
+ # self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
+ # nsr = handle_create_nsr(msg)
+ # self._loop.create_task(begin_instantiation(nsr))
+
+ for msg in deleted_msgs:
+ self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
+ try:
+ handle_delete_nsr(msg)
+ except Exception:
+ self._log.exception("Failed to terminate NS:%s", msg.id)
+
+ for msg in updated_msgs:
+ self._log.info("Update NSR received in on_apply to change scaling groups in NS:%s", msg.id)
+
+ for group in msg.scaling_group:
+ instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
+ self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
+
+ #for instance_id in instance_delta["added"]:
+ # self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
+
+ #for instance_id in instance_delta["deleted"]:
+ # self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
+
+
+ return RwTypes.RwStatus.SUCCESS
+
+ @asyncio.coroutine
+ def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
+ """ Prepare calllback from DTS for NSR """
+
+ xpath = ks_path.to_xpath(NsrYang.get_schema())
+ action = xact_info.query_action
+ self._log.debug(
+ "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
+ xact, action, xact_info, xpath, msg
+ )
+
+ fref = ProtobufC.FieldReference.alloc()
+ fref.goto_whole_message(msg.to_pbcm())
+
+ if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
+ pass
+ # Ensure the Cloud account has been specified if this is an NSR create
+ #if msg.id not in self._nsm.nsrs:
+ # if not msg.has_field("cloud_account"):
+ # raise NsrInstantiationFailed("Cloud account not specified in NSR")
+
+ # We do not allow scaling actions to occur if the NS is not in running state
+ #elif msg.has_field("scaling_group"):
+ # nsr = self._nsm.nsrs[msg.id]
+ # if nsr.state != NetworkServiceRecordState.RUNNING:
+ # raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
+
+ # if len(msg.scaling_group) > 1:
+ # raise ScalingOperationError("Only a single scaling group can be configured at a time")
+
+ # for group_msg in msg.scaling_group:
+ # num_new_group_instances = len(group_msg.instance)
+ # if num_new_group_instances > 1:
+ # raise ScalingOperationError("Only a single scaling instance can be created at a time")
+
+ # elif num_new_group_instances == 1:
+ # scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
+ # if len(scale_group.instances) == scale_group.max_instance_count:
+ # raise ScalingOperationError("Max instances for %s reached" % scale_group)
+
+
+ acg.handle.prepare_complete_ok(xact_info.handle)
+
+
+ self._log.debug("Registering for NSR config using xpath: %s",
+ NsrDtsHandler.NSR_XPATH)
+
+ acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
+ with self._dts.appconf_group_create(handler=acg_hdl) as acg:
+ self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
+ flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
+ on_prepare=on_prepare)
+
+ self._scale_regh = acg.register(
+ xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
+ flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
+ )
+
+
+class XPaths(object):
+ @staticmethod
+ def nsr_config(nsr_id=None):
+ return ("C,/nsr:ns-instance-config/nsr:nsr" +
+ ("[nsr:id='{}']".format(nsr_id) if nsr_id is not None else ""))
+
+ def scaling_group_instance(nsr_id, group_name, instance_id):
+ return ("C,/nsr:ns-instance-config/nsr:nsr" +
+ "[nsr:id='{}']".format(nsr_id) +
+ "/nsr:scaling-group" +
+ "[nsr:scaling-group-name-ref='{}']".format(group_name) +
+ "/nsr:instance" +
+ "[nsr:id='{}']".format(instance_id)
+ )
+
+
+class NsrHandlerTestCase(rift.test.dts.AbstractDTSTest):
+ """
+ DTS GI interface unittests
+ """
+ @classmethod
+ def configure_schema(cls):
+ return NsrYang.get_schema()
+
+ @classmethod
+ def configure_timeout(cls):
+ return 240
+
+ def configure_test(self, loop, test_id):
+ self.log.debug("STARTING - %s", self.id())
+ self.tinfo = self.new_tinfo(self.id())
+ self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+ self.handler = NsrDtsHandler(self.dts, self.log, self.loop, None)
+
+ self.tinfo_c = self.new_tinfo(self.id() + "_client")
+ self.dts_c = rift.tasklets.DTS(self.tinfo_c, self.schema, self.loop)
+
+ @rift.test.dts.async_test
+ def test_add_delete_ns(self):
+
+ nsr1_uuid = "nsr1_uuid" # str(uuid.uuid4())
+ nsr2_uuid = "nsr2_uuid" # str(uuid.uuid4())
+
+ assert nsr1_uuid != nsr2_uuid
+
+ yield from self.handler.register()
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ self.log.debug("Creating NSR")
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_update(
+ XPaths.nsr_config(nsr1_uuid),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr(id=nsr1_uuid, name="fu"),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_update(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=1234),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_delete(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 1234),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_create(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance(id=12345),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
+ self.log.debug("Got group ids in nsr2 after adding 12345 to nsr1: %s", group_ids)
+ group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
+ self.log.debug("Got group ids in nsr1 after adding 12345 to nsr1: %s", group_ids)
+ assert group_ids == {12345}
+
+ self.log.debug("\n\nADD A COMPLETELY DIFFERENT NSR\n")
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_update(
+ XPaths.nsr_config(nsr2_uuid),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr(id=nsr2_uuid, name="fu2"),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
+ self.log.debug("Got group ids in nsr2 after adding new nsr: %s", group_ids)
+ group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
+ self.log.debug("Got group ids in nsr1 after adding new nsr: %s", group_ids)
+ assert group_ids == {12345}
+
+ self.log.debug("\n\nDELETE A COMPLETELY DIFFERENT NSR\n")
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_delete(
+ XPaths.nsr_config(nsr2_uuid),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(.5, loop=self.loop)
+
+ group_ids = self.handler.get_scale_group_instances(nsr2_uuid, "group")
+ self.log.debug("Got group ids in nsr2 after deleting nsr2: %s", group_ids)
+ group_ids = self.handler.get_scale_group_instances(nsr1_uuid, "group")
+ self.log.debug("Got group ids in nsr1 after deleting nsr2: %s", group_ids)
+ assert group_ids == {12345}
+
+ with self.dts_c.transaction() as xact:
+ block = xact.block_create()
+ block.add_query_delete(
+ XPaths.scaling_group_instance(nsr1_uuid, "group", 12345),
+ flags=rwdts.XactFlag.ADVISE | rwdts.XactFlag.TRACE,
+ )
+ yield from block.execute(now=True)
+
+ yield from asyncio.sleep(2, loop=self.loop)
+
+def main():
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('-n', '--no-runner', action='store_true')
+ args, unittest_args = parser.parse_known_args()
+ if args.no_runner:
+ runner = None
+
+ NsrHandlerTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+ main()
diff --git a/rwlaunchpad/test/utest_ro_account.py b/rwlaunchpad/test/utest_ro_account.py
new file mode 100644
index 0000000..6e480d4
--- /dev/null
+++ b/rwlaunchpad/test/utest_ro_account.py
@@ -0,0 +1,153 @@
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import asyncio
+import sys
+import types
+import unittest
+import uuid
+
+import rift.test.dts
+import rift.tasklets.rwnsmtasklet.cloud as cloud
+import rift.tasklets.rwnsmtasklet.openmano_nsm as openmano_nsm
+import rw_peas
+
+import gi
+gi.require_version('RwDtsYang', '1.0')
+from gi.repository import (
+ RwLaunchpadYang as launchpadyang,
+ RwDts as rwdts,
+ RwVnfdYang,
+ RwVnfrYang,
+ RwNsrYang,
+ RwNsdYang,
+ VnfrYang
+ )
+
+
+class DescriptorPublisher(object):
+ def __init__(self, log, dts, loop):
+ self.log = log
+ self.loop = loop
+ self.dts = dts
+
+ self._registrations = []
+
+ @asyncio.coroutine
+ def publish(self, w_path, path, desc):
+ ready_event = asyncio.Event(loop=self.loop)
+
+ @asyncio.coroutine
+ def on_ready(regh, status):
+ self.log.debug("Create element: %s, obj-type:%s obj:%s",
+ path, type(desc), desc)
+ with self.dts.transaction() as xact:
+ regh.create_element(path, desc, xact.xact)
+ self.log.debug("Created element: %s, obj:%s", path, desc)
+ ready_event.set()
+
+ handler = rift.tasklets.DTS.RegistrationHandler(
+ on_ready=on_ready
+ )
+
+ self.log.debug("Registering path: %s, obj:%s", w_path, desc)
+ reg = yield from self.dts.register(
+ w_path,
+ handler,
+ flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ
+ )
+ self._registrations.append(reg)
+ self.log.debug("Registered path : %s", w_path)
+ yield from ready_event.wait()
+
+ return reg
+
+ def unpublish_all(self):
+ self.log.debug("Deregistering all published descriptors")
+ for reg in self._registrations:
+ reg.deregister()
+
+class RoAccountDtsTestCase(rift.test.dts.AbstractDTSTest):
+ @classmethod
+ def configure_schema(cls):
+ return launchpadyang.get_schema()
+
+ @classmethod
+ def configure_timeout(cls):
+ return 240
+
+ def configure_test(self, loop, test_id):
+ self.log.debug("STARTING - %s", test_id)
+ self.tinfo = self.new_tinfo(str(test_id))
+ self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+ self.tinfo_sub = self.new_tinfo(str(test_id) + "_sub")
+ self.dts_sub = rift.tasklets.DTS(self.tinfo_sub, self.schema, self.loop)
+
+ self.publisher = DescriptorPublisher(self.log, self.dts, self.loop)
+
+ def tearDown(self):
+ super().tearDown()
+
+ @rift.test.dts.async_test
+ def test_orch_account_create(self):
+ orch = cloud.ROAccountPluginSelector(self.dts, self.log, self.loop, None)
+
+ yield from orch.register()
+
+ # Test if we have a default plugin in case no RO is specified.
+ assert type(orch.ro_plugin) is cloud.RwNsPlugin
+ mock_orch_acc = launchpadyang.ResourceOrchestrator.from_dict(
+ {'name': 'rift-ro', 'account_type': 'rift_ro', 'rift_ro': {'rift_ro': True}})
+
+ # Test rift-ro plugin
+ w_xpath = "C,/rw-launchpad:resource-orchestrator"
+ xpath = w_xpath
+ yield from self.publisher.publish(w_xpath, xpath, mock_orch_acc)
+ yield from asyncio.sleep(5, loop=self.loop)
+
+ assert type(orch.ro_plugin) is cloud.RwNsPlugin
+
+ # Test Openmano plugin
+ mock_orch_acc = launchpadyang.ResourceOrchestrator.from_dict(
+ {'name': 'openmano',
+ 'account_type': 'openmano',
+ 'openmano': {'tenant_id': "abc"}})
+ yield from self.publisher.publish(w_xpath, xpath, mock_orch_acc)
+ yield from asyncio.sleep(5, loop=self.loop)
+
+ print (type(orch.ro_plugin))
+ assert type(orch.ro_plugin) is openmano_nsm.OpenmanoNsPlugin
+
+ # Test delete
+ yield from self.dts.query_delete("C,/rw-launchpad:resource-orchestrator",
+ flags=rwdts.XactFlag.ADVISE)
+ assert orch.ro_plugin == None
+
+
+def main(argv=sys.argv[1:]):
+
+ # The unittest framework requires a program name, so use the name of this
+ # file instead (we do not want to have to pass a fake program name to main
+ # when this is called from the interpreter).
+ unittest.main(
+ argv=[__file__] + argv,
+ testRunner=None#xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+ )
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/rwlaunchpad/test/utest_rwmonitor.py b/rwlaunchpad/test/utest_rwmonitor.py
new file mode 100755
index 0000000..46c33b3
--- /dev/null
+++ b/rwlaunchpad/test/utest_rwmonitor.py
@@ -0,0 +1,873 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import argparse
+import asyncio
+import concurrent.futures
+import logging
+import os
+import sys
+import time
+import unittest
+import uuid
+import xmlrunner
+
+import gi
+gi.require_version('NsrYang', '1.0')
+gi.require_version('RwcalYang', '1.0')
+gi.require_version('RwmonYang', '1.0')
+gi.require_version('RwVnfrYang', '1.0')
+gi.require_version('RwTypes', '1.0')
+gi.require_version('RwMon', '1.0')
+
+from gi.repository import (
+ NsrYang,
+ RwTypes,
+ RwVnfrYang,
+ RwcalYang,
+ RwmonYang,
+ VnfrYang,
+ )
+
+from rift.tasklets.rwmonitor.core import (
+ AccountAlreadyRegisteredError,
+ AccountInUseError,
+ InstanceConfiguration,
+ Monitor,
+ NfviInterface,
+ NfviMetrics,
+ NfviMetricsCache,
+ NfviMetricsPluginManager,
+ PluginFactory,
+ PluginNotSupportedError,
+ PluginUnavailableError,
+ UnknownAccountError,
+ )
+import rw_peas
+
+
+class wait_for_pending_tasks(object):
+ """
+ This class defines a decorator that can be used to ensure that any asyncio
+ tasks created as a side-effect of coroutine are allowed to come to
+ completion.
+ """
+
+ def __init__(self, loop, timeout=1):
+ self.loop = loop
+ self.timeout = timeout
+
+ def __call__(self, coro):
+ @asyncio.coroutine
+ def impl():
+ original = self.pending_tasks()
+ result = yield from coro()
+
+ current = self.pending_tasks()
+ remaining = current - original
+
+ if remaining:
+ yield from asyncio.wait(
+ remaining,
+ timeout=self.timeout,
+ loop=self.loop,
+ )
+
+ return result
+
+ return impl
+
+ def pending_tasks(self):
+ return {t for t in asyncio.Task.all_tasks(loop=self.loop) if not t.done()}
+
+
+class MockTasklet(object):
+ def __init__(self, dts, log, loop, records):
+ self.dts = dts
+ self.log = log
+ self.loop = loop
+ self.records = records
+ self.polling_period = 0
+ self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=16)
+
+
+def make_nsr(ns_instance_config_ref=str(uuid.uuid4())):
+ nsr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr()
+ nsr.ns_instance_config_ref = ns_instance_config_ref
+ return nsr
+
+def make_vnfr(id=str(uuid.uuid4())):
+ vnfr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.id = id
+ return vnfr
+
+def make_vdur(id=str(uuid.uuid4()), vim_id=str(uuid.uuid4())):
+ vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.id = id
+ vdur.vim_id = vim_id
+ return vdur
+
+
+class TestNfviMetricsCache(unittest.TestCase):
+ class Plugin(object):
+ def nfvi_metrics_available(self, cloud_account):
+ return True
+
+ def nfvi_metrics(self, account, vim_id):
+ metrics = RwmonYang.NfviMetrics()
+ metrics.vcpu.utilization = 0.5
+ return metrics
+
+ def setUp(self):
+ self.loop = asyncio.new_event_loop()
+ self.logger = logging.getLogger('test-logger')
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ self.plugin_manager = NfviMetricsPluginManager(self.logger)
+ self.plugin_manager.register(self.account, "mock")
+
+ mock = self.plugin_manager.plugin(self.account.name)
+ mock.set_impl(TestNfviMetricsCache.Plugin())
+
+ self.vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ self.vdur.id = "test-vdur-id"
+ self.vdur.vim_id = "test-vim-id"
+ self.vdur.vm_flavor.vcpu_count = 4
+ self.vdur.vm_flavor.memory_mb = 1
+ self.vdur.vm_flavor.storage_gb = 1
+
+ def test_create_destroy_entry(self):
+ cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+ self.assertEqual(len(cache._nfvi_metrics), 0)
+
+ cache.create_entry(self.account, self.vdur)
+ self.assertEqual(len(cache._nfvi_metrics), 1)
+
+ cache.destroy_entry(self.vdur.id)
+ self.assertEqual(len(cache._nfvi_metrics), 0)
+
+ def test_retrieve(self):
+ NfviMetrics.SAMPLE_INTERVAL = 1
+
+ cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+ cache.create_entry(self.account, self.vdur)
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def retrieve_metrics():
+ metrics = cache.retrieve("test-vim-id")
+ self.assertEqual(metrics.vcpu.utilization, 0.0)
+
+ yield from asyncio.sleep(NfviMetrics.SAMPLE_INTERVAL, loop=self.loop)
+
+ metrics = cache.retrieve("test-vim-id")
+ self.assertEqual(metrics.vcpu.utilization, 0.5)
+
+ self.loop.run_until_complete(retrieve_metrics())
+
+ def test_id_mapping(self):
+ cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+
+ cache.create_entry(self.account, self.vdur)
+
+ self.assertEqual(cache.to_vim_id(self.vdur.id), self.vdur.vim_id)
+ self.assertEqual(cache.to_vdur_id(self.vdur.vim_id), self.vdur.id)
+ self.assertTrue(cache.contains_vdur_id(self.vdur.id))
+ self.assertTrue(cache.contains_vim_id(self.vdur.vim_id))
+
+ cache.destroy_entry(self.vdur.id)
+
+ self.assertFalse(cache.contains_vdur_id(self.vdur.id))
+ self.assertFalse(cache.contains_vim_id(self.vdur.vim_id))
+
+
+class TestNfviMetrics(unittest.TestCase):
+ class Plugin(object):
+ def nfvi_metrics_available(self, cloud_account):
+ return True
+
+ def nfvi_metrics(self, account, vim_id):
+ metrics = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
+ metrics.vcpu.utilization = 0.5
+ return None, metrics
+
+ def setUp(self):
+ self.loop = asyncio.new_event_loop()
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ self.plugin = TestNfviMetrics.Plugin()
+ self.logger = logging.getLogger('test-logger')
+
+ self.vdur = make_vdur()
+ self.vdur.vm_flavor.vcpu_count = 4
+ self.vdur.vm_flavor.memory_mb = 100
+ self.vdur.vm_flavor.storage_gb = 2
+ self.vdur.vim_id = 'test-vim-id'
+
+ def test_update(self):
+ nfvi_metrics = NfviMetrics(
+ self.logger,
+ self.loop,
+ self.account,
+ self.plugin,
+ self.vdur,
+ )
+
+ # Reduce the SAMPLE_INTERVAL so that the test does not take a long time
+ nfvi_metrics.SAMPLE_INTERVAL = 1
+
+ # The metrics have never been retrieved so they should be updated
+ self.assertTrue(nfvi_metrics.should_update())
+
+ # The metrics return will be empty because the cache version is empty.
+ # However, this trigger an update to retrieve metrics from the plugin.
+ metrics = nfvi_metrics.retrieve()
+ self.assertEqual(metrics.vcpu.utilization, 0.0)
+
+ # An update has been trigger by the retrieve call so additional updates
+ # should not happen
+ self.assertFalse(nfvi_metrics.should_update())
+ self.assertFalse(nfvi_metrics._updating.done())
+
+ # Allow the event loop to run until the update is complete
+ @asyncio.coroutine
+ @wait_for_pending_tasks(self.loop)
+ def wait_for_update():
+ yield from asyncio.wait_for(
+ nfvi_metrics._updating,
+ timeout=2,
+ loop=self.loop,
+ )
+
+ self.loop.run_until_complete(wait_for_update())
+
+ # Check that we have a new metrics object
+ metrics = nfvi_metrics.retrieve()
+ self.assertEqual(metrics.vcpu.utilization, 0.5)
+
+ # We have just updated the metrics so it should be unnecessary to update
+ # right now
+ self.assertFalse(nfvi_metrics.should_update())
+ self.assertTrue(nfvi_metrics._updating.done())
+
+ # Wait an amount of time equal to the SAMPLE_INTERVAL. This ensures
+ # that the metrics that were just retrieved become stale...
+ time.sleep(NfviMetrics.SAMPLE_INTERVAL)
+
+ # ...now it is time to update again
+ self.assertTrue(nfvi_metrics.should_update())
+
+
+class TestNfviInterface(unittest.TestCase):
+ class NfviPluginImpl(object):
+ def __init__(self):
+ self._alarms = set()
+
+ def nfvi_metrics(self, account, vm_id):
+ return rwmon.NfviMetrics()
+
+ def nfvi_metrics_available(self, account):
+ return True
+
+ def alarm_create(self, account, vim_id, alarm):
+ alarm.alarm_id = str(uuid.uuid4())
+ self._alarms.add(alarm.alarm_id)
+ return RwTypes.RwStatus.SUCCESS
+
+ def alarm_delete(self, account, alarm_id):
+ self._alarms.remove(alarm_id)
+ return RwTypes.RwStatus.SUCCESS
+
+ def setUp(self):
+ self.loop = asyncio.new_event_loop()
+ self.logger = logging.getLogger('test-logger')
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ # Define the VDUR to avoid division by zero
+ self.vdur = make_vdur()
+ self.vdur.vm_flavor.vcpu_count = 4
+ self.vdur.vm_flavor.memory_mb = 100
+ self.vdur.vm_flavor.storage_gb = 2
+ self.vdur.vim_id = 'test-vim-id'
+
+ self.plugin_manager = NfviMetricsPluginManager(self.logger)
+ self.plugin_manager.register(self.account, "mock")
+
+ self.cache = NfviMetricsCache(
+ self.logger,
+ self.loop,
+ self.plugin_manager,
+ )
+
+ self.nfvi_interface = NfviInterface(
+ self.loop,
+ self.logger,
+ self.plugin_manager,
+ self.cache
+ )
+
+ def test_nfvi_metrics_available(self):
+ self.assertTrue(self.nfvi_interface.nfvi_metrics_available(self.account))
+
+ def test_retrieve(self):
+ pass
+
+ def test_alarm_create_and_destroy(self):
+ alarm = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_Alarms()
+ alarm.name = "test-alarm"
+ alarm.description = "test-description"
+ alarm.vdur_id = "test-vdur-id"
+ alarm.metric = "CPU_UTILIZATION"
+ alarm.statistic = "MINIMUM"
+ alarm.operation = "GT"
+ alarm.value = 0.1
+ alarm.period = 10
+ alarm.evaluations = 1
+
+ plugin_impl = TestNfviInterface.NfviPluginImpl()
+ plugin = self.plugin_manager.plugin(self.account.name)
+ plugin.set_impl(plugin_impl)
+
+ self.assertEqual(len(plugin_impl._alarms), 0)
+
+ @asyncio.coroutine
+ @wait_for_pending_tasks(self.loop)
+ def wait_for_create():
+ coro = self.nfvi_interface.alarm_create(
+ self.account,
+ "test-vim-id",
+ alarm,
+ )
+ yield from asyncio.wait_for(
+ coro,
+ timeout=2,
+ loop=self.loop,
+ )
+
+ self.loop.run_until_complete(wait_for_create())
+ self.assertEqual(len(plugin_impl._alarms), 1)
+ self.assertTrue(alarm.alarm_id is not None)
+
+ @asyncio.coroutine
+ @wait_for_pending_tasks(self.loop)
+ def wait_for_destroy():
+ coro = self.nfvi_interface.alarm_destroy(
+ self.account,
+ alarm.alarm_id,
+ )
+ yield from asyncio.wait_for(
+ coro,
+ timeout=2,
+ loop=self.loop,
+ )
+
+ self.loop.run_until_complete(wait_for_destroy())
+ self.assertEqual(len(plugin_impl._alarms), 0)
+
+
+class TestVdurNfviMetrics(unittest.TestCase):
+ def setUp(self):
+ # Reduce the sample interval so that test run quickly
+ NfviMetrics.SAMPLE_INTERVAL = 0.1
+
+ # Create a mock plugin to define the metrics retrieved. The plugin will
+ # return a VCPU utilization of 0.5.
+ class MockPlugin(object):
+ def __init__(self):
+ self.metrics = RwmonYang.NfviMetrics()
+
+ def nfvi_metrics(self, account, vim_id):
+ self.metrics.vcpu.utilization = 0.5
+ return self.metrics
+
+ self.loop = asyncio.get_event_loop()
+ self.logger = logging.getLogger('test-logger')
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ # Define the VDUR to avoid division by zero
+ vdur = make_vdur()
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+ vdur.vim_id = 'test-vim-id'
+
+ # Instantiate the mock plugin
+ self.plugin_manager = NfviMetricsPluginManager(self.logger)
+ self.plugin_manager.register(self.account, "mock")
+
+ self.plugin = self.plugin_manager.plugin(self.account.name)
+ self.plugin.set_impl(MockPlugin())
+
+ self.cache = NfviMetricsCache(
+ self.logger,
+ self.loop,
+ self.plugin_manager,
+ )
+
+ self.manager = NfviInterface(
+ self.loop,
+ self.logger,
+ self.plugin_manager,
+ self.cache,
+ )
+
+ self.metrics = NfviMetrics(
+ self.logger,
+ self.loop,
+ self.account,
+ self.plugin,
+ vdur,
+ )
+
+ def test_retrieval(self):
+ metrics_a = None
+ metrics_b = None
+
+ # Define a coroutine that can be added to the asyncio event loop
+ @asyncio.coroutine
+ def update():
+ # Output from the metrics calls with be written to these nonlocal
+ # variables
+ nonlocal metrics_a
+ nonlocal metrics_b
+
+ # This first call will return the current metrics values and
+ # schedule a request to the NFVI to retrieve metrics from the data
+ # source. All metrics will be zero at this point.
+ metrics_a = self.metrics.retrieve()
+
+ # Wait for the scheduled update to take effect
+ yield from asyncio.sleep(0.2, loop=self.loop)
+
+ # Retrieve the updated metrics
+ metrics_b = self.metrics.retrieve()
+
+ self.loop.run_until_complete(update())
+
+ # Check that the metrics returned indicate that the plugin was queried
+ # and returned the appropriate value, i.e. 0.5 utilization
+ self.assertEqual(0.0, metrics_a.vcpu.utilization)
+ self.assertEqual(0.5, metrics_b.vcpu.utilization)
+
+
+class TestNfviMetricsPluginManager(unittest.TestCase):
+ def setUp(self):
+ self.logger = logging.getLogger('test-logger')
+ self.plugins = NfviMetricsPluginManager(self.logger)
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ def test_mock_plugin(self):
+ # Register an account name with a mock plugin. If successful, the
+ # plugin manager should return a non-None object.
+ self.plugins.register(self.account, 'mock')
+ self.assertIsNotNone(self.plugins.plugin(self.account.name))
+
+ # Now unregister the cloud account
+ self.plugins.unregister(self.account.name)
+
+ # Trying to retrieve a plugin for a cloud account that has not been
+ # registered with the manager is expected to raise an exception.
+ with self.assertRaises(KeyError):
+ self.plugins.plugin(self.account.name)
+
+ def test_multiple_registration(self):
+ self.plugins.register(self.account, 'mock')
+
+ # Attempting to register the account with another type of plugin will
+ # also cause an exception to be raised.
+ with self.assertRaises(AccountAlreadyRegisteredError):
+ self.plugins.register(self.account, 'mock')
+
+ # Attempting to register the account with 'openstack' again with cause
+ # an exception to be raised.
+ with self.assertRaises(AccountAlreadyRegisteredError):
+ self.plugins.register(self.account, 'openstack')
+
+ def test_unsupported_plugin(self):
+ # If an attempt is made to register a cloud account with an unknown
+ # type of plugin, a PluginNotSupportedError should be raised.
+ with self.assertRaises(PluginNotSupportedError):
+ self.plugins.register(self.account, 'unsupported-plugin')
+
+ def test_anavailable_plugin(self):
+ # Create a factory that always raises PluginUnavailableError
+ class UnavailablePluginFactory(PluginFactory):
+ PLUGIN_NAME = "unavailable-plugin"
+
+ def create(self, cloud_account):
+ raise PluginUnavailableError()
+
+ # Register the factory
+ self.plugins.register_plugin_factory(UnavailablePluginFactory())
+
+ # Ensure that the correct exception propagates when the cloud account
+ # is registered.
+ with self.assertRaises(PluginUnavailableError):
+ self.plugins.register(self.account, "unavailable-plugin")
+
+
+class TestMonitor(unittest.TestCase):
+ """
+ The Monitor class is the implementation that is called by the
+ MonitorTasklet. It provides the unified interface for controlling and
+ querying the monitoring functionality.
+ """
+
+ def setUp(self):
+ # Reduce the sample interval so that test run quickly
+ NfviMetrics.SAMPLE_INTERVAL = 0.1
+
+ self.loop = asyncio.get_event_loop()
+ self.logger = logging.getLogger('test-logger')
+ self.config = InstanceConfiguration()
+ self.monitor = Monitor(self.loop, self.logger, self.config)
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ def test_instance_config(self):
+ """
+ Configuration data for an instance is pass to the Monitor when it is
+ created. The data is passed in the InstanceConfiguration object. This
+ object is typically shared between the tasklet and the monitor, and
+ provides a way for the tasklet to update the configuration of the
+ monitor.
+ """
+ self.assertTrue(hasattr(self.monitor._config, "polling_period"))
+ self.assertTrue(hasattr(self.monitor._config, "min_cache_lifetime"))
+ self.assertTrue(hasattr(self.monitor._config, "max_polling_frequency"))
+
+ def test_monitor_cloud_accounts(self):
+ """
+ This test checks the cloud accounts are correctly added and deleted,
+ and that the correct exceptions are raised on duplicate adds or
+ deletes.
+
+ """
+ # Add the cloud account to the monitor
+ self.monitor.add_cloud_account(self.account)
+ self.assertIn(self.account.name, self.monitor._cloud_accounts)
+
+ # Add the cloud account to the monitor again
+ with self.assertRaises(AccountAlreadyRegisteredError):
+ self.monitor.add_cloud_account(self.account)
+
+ # Delete the cloud account
+ self.monitor.remove_cloud_account(self.account.name)
+ self.assertNotIn(self.account.name, self.monitor._cloud_accounts)
+
+ # Delete the cloud account again
+ with self.assertRaises(UnknownAccountError):
+ self.monitor.remove_cloud_account(self.account.name)
+
+ def test_monitor_cloud_accounts_illegal_removal(self):
+ """
+ A cloud account may not be removed while there are plugins or records
+ that are associated with it. Attempting to delete such a cloud account
+ will raise an exception.
+ """
+ # Add the cloud account to the monitor
+ self.monitor.add_cloud_account(self.account)
+
+ # Create a VNFR associated with the cloud account
+ vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.cloud_account = self.account.name
+ vnfr.id = 'test-vnfr-id'
+
+ # Add a VDUR to the VNFR
+ vdur = vnfr.vdur.add()
+ vdur.vim_id = 'test-vim-id-1'
+ vdur.id = 'test-vdur-id-1'
+
+ # Now add the VNFR to the monitor
+ self.monitor.add_vnfr(vnfr)
+
+ # Check that the monitor contains the VNFR, VDUR, and metrics
+ self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
+ self.assertTrue(self.monitor.is_registered_vnfr(vnfr.id))
+ self.assertEqual(1, len(self.monitor.metrics))
+
+ # Deleting the cloud account now should raise an exception because the
+ # VNFR and VDUR are associated with the cloud account.
+ with self.assertRaises(AccountInUseError):
+ self.monitor.remove_cloud_account(self.account.name)
+
+ # Now remove the VNFR from the monitor
+ self.monitor.remove_vnfr(vnfr.id)
+ self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
+ self.assertFalse(self.monitor.is_registered_vnfr(vnfr.id))
+ self.assertEqual(0, len(self.monitor.metrics))
+
+ # Safely delete the cloud account
+ self.monitor.remove_cloud_account(self.account.name)
+
+ def test_vdur_registration(self):
+ """
+ When a VDUR is registered with the Monitor it is registered with the
+ VdurNfviMetricsManager. Thus it is assigned a plugin that can be used
+ to retrieve the NFVI metrics associated with the VDU.
+ """
+ # Define the VDUR to be registered
+ vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+ vdur.vim_id = 'test-vim-id'
+ vdur.id = 'test-vdur-id'
+
+ # Before registering the VDUR, the cloud account needs to be added to
+ # the monitor.
+ self.monitor.add_cloud_account(self.account)
+
+ # Register the VDUR with the monitor
+ self.monitor.add_vdur(self.account, vdur)
+ self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
+
+ # Check that the VDUR has been added to the metrics cache
+ self.assertTrue(self.monitor.cache.contains_vdur_id(vdur.id))
+
+ # Unregister the VDUR
+ self.monitor.remove_vdur(vdur.id)
+ self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
+
+ # Check that the VDUR has been removed from the metrics cache
+ self.assertFalse(self.monitor.cache.contains_vdur_id(vdur.id))
+
+ def test_vnfr_add_update_delete(self):
+ """
+ When a VNFR is added to the Monitor a record is created of the
+ relationship between the VNFR and any VDURs that it contains. Each VDUR
+ is then registered with the VdurNfviMetricsManager. A VNFR can also be
+ updated so that it contains more of less VDURs. Any VDURs that are
+ added to the VNFR are registered with the NdurNfviMetricsManager, and
+ any that are removed are unregistered. When a VNFR is deleted, all of
+ the VDURs contained in the VNFR are unregistered.
+ """
+ # Define the VDUR to be registered
+ vdur = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.vim_id = 'test-vim-id-1'
+ vdur.id = 'test-vdur-id-1'
+
+ vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.cloud_account = self.account.name
+ vnfr.id = 'test-vnfr-id'
+
+ vnfr.vdur.append(vdur)
+
+ self.monitor.add_cloud_account(self.account)
+
+ # Add the VNFR to the monitor. This will also register VDURs contained
+ # in the VNFR with the monitor.
+ self.monitor.add_vnfr(vnfr)
+ self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
+
+ # Add another VDUR to the VNFR and update the monitor. Both VDURs
+ # should now be registered
+ vdur = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.vim_id = 'test-vim-id-2'
+ vdur.id = 'test-vdur-id-2'
+
+ vnfr.vdur.append(vdur)
+
+ self.monitor.update_vnfr(vnfr)
+ self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
+ self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-2'))
+
+ # Delete the VNFR from the monitor. This should remove the VNFR and all
+ # of the associated VDURs from the monitor.
+ self.monitor.remove_vnfr(vnfr.id)
+ self.assertFalse(self.monitor.is_registered_vnfr('test-vnfr-id'))
+ self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-1'))
+ self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-2'))
+
+ with self.assertRaises(KeyError):
+ self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+
+ with self.assertRaises(KeyError):
+ self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ def test_complete(self):
+ """
+ This test simulates the addition of a VNFR to the Monitor (along with
+ updates), and retrieves NFVI metrics from the VDUR. The VNFR is then
+ deleted, which should result in a cleanup of all the data in the
+ Monitor.
+ """
+ # Create the VNFR
+ vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.cloud_account = self.account.name
+ vnfr.id = 'test-vnfr-id'
+
+ # Create 2 VDURs
+ vdur = vnfr.vdur.add()
+ vdur.id = 'test-vdur-id-1'
+ vdur.vim_id = 'test-vim-id-1'
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+
+ vdur = vnfr.vdur.add()
+ vdur.id = 'test-vdur-id-2'
+ vdur.vim_id = 'test-vim-id-2'
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+
+ class MockPlugin(object):
+ def __init__(self):
+ self._metrics = dict()
+ self._metrics['test-vim-id-1'] = RwmonYang.NfviMetrics()
+ self._metrics['test-vim-id-2'] = RwmonYang.NfviMetrics()
+
+ def nfvi_metrics(self, account, vim_id):
+ metrics = self._metrics[vim_id]
+
+ if vim_id == 'test-vim-id-1':
+ metrics.memory.used += 1000
+ else:
+ metrics.memory.used += 2000
+
+ return metrics
+
+ class MockFactory(PluginFactory):
+ PLUGIN_NAME = "mock"
+
+ def create(self, cloud_account):
+ plugin = rw_peas.PeasPlugin("rwmon_mock", 'RwMon-1.0')
+ impl = plugin.get_interface("Monitoring")
+ impl.set_impl(MockPlugin())
+ return impl
+
+ # Modify the mock plugin factory
+ self.monitor._nfvi_plugins._factories["mock"] = MockFactory()
+
+ # Add the cloud account the monitor
+ self.monitor.add_cloud_account(self.account)
+
+ # Add the VNFR to the monitor.
+ self.monitor.add_vnfr(vnfr)
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call1():
+ # call #1 (time = 0.00s)
+ # The metrics for these VDURs have not been populated yet so a
+ # default metrics object (all zeros) is returned, and a request is
+ # scheduled with the data source to retrieve the metrics.
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(0, metrics1.memory.used)
+ self.assertEqual(0, metrics2.memory.used)
+
+ self.loop.run_until_complete(call1())
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call2():
+ # call #2 (wait 0.05s)
+ # The metrics have been populated with data from the data source
+ # due to the request made during call #1.
+ yield from asyncio.sleep(0.05)
+
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(1000, metrics1.memory.used)
+ self.assertEqual(2000, metrics2.memory.used)
+
+ self.loop.run_until_complete(call2())
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call3():
+ # call #3 (wait 0.50s)
+ # This call exceeds 0.1s (the sample interval of the plugin)
+ # from when the data was retrieved. The cached metrics are
+ # immediately returned, but a request is made to the data source to
+ # refresh these metrics.
+ yield from asyncio.sleep(0.10)
+
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(1000, metrics1.memory.used)
+ self.assertEqual(2000, metrics2.memory.used)
+
+ self.loop.run_until_complete(call3())
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call4():
+ # call #4 (wait 1.00s)
+ # The metrics retrieved differ from those in call #3 because the
+ # cached metrics have been updated.
+ yield from asyncio.sleep(0.10)
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(2000, metrics1.memory.used)
+ self.assertEqual(4000, metrics2.memory.used)
+
+ self.loop.run_until_complete(call4())
+
+
+def main(argv=sys.argv[1:]):
+ logging.basicConfig(format='TEST %(message)s')
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+
+ args = parser.parse_args(argv)
+
+ # Set the global logging level
+ logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+ # Set the logger in this test to use a null handler
+ logging.getLogger('test-logger').addHandler(logging.NullHandler())
+
+ # The unittest framework requires a program name, so use the name of this
+ # file instead (we do not want to have to pass a fake program name to main
+ # when this is called from the interpreter).
+ unittest.main(argv=[__file__] + argv,
+ testRunner=xmlrunner.XMLTestRunner(
+ output=os.environ["RIFT_MODULE_TEST"]))
+
+if __name__ == '__main__':
+ main()
diff --git a/rwlaunchpad/test/utest_rwnsm.py b/rwlaunchpad/test/utest_rwnsm.py
new file mode 100755
index 0000000..e125739
--- /dev/null
+++ b/rwlaunchpad/test/utest_rwnsm.py
@@ -0,0 +1,215 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import argparse
+import logging
+import os
+import sys
+import unittest
+import uuid
+import xmlrunner
+
+from gi.repository import (
+ NsdYang,
+ NsrYang,
+ )
+
+logger = logging.getLogger('test-rwnsmtasklet')
+
+import rift.tasklets.rwnsmtasklet.rwnsmtasklet as rwnsmtasklet
+import rift.tasklets.rwnsmtasklet.xpath as rwxpath
+
+class TestGiXpath(unittest.TestCase):
+ def setUp(self):
+ rwxpath.reset_cache()
+
+ def test_nsd_elements(self):
+ """
+ Test that a particular element in a list is corerctly retrieved. In
+ this case, we are trying to retrieve an NSD from the NSD catalog.
+
+ """
+ # Create the initial NSD catalog
+ nsd_catalog = NsdYang.YangData_Nsd_NsdCatalog()
+
+ # Create an NSD, set its 'id', and add it to the catalog
+ nsd_id = str(uuid.uuid4())
+ nsd_catalog.nsd.append(
+ NsdYang.YangData_Nsd_NsdCatalog_Nsd(
+ id=nsd_id,
+ )
+ )
+
+ # Retrieve the NSD using and xpath expression
+ xpath = '/nsd:nsd-catalog/nsd:nsd[nsd:id={}]'.format(nsd_id)
+ nsd = rwxpath.getxattr(nsd_catalog, xpath)
+
+ self.assertEqual(nsd_id, nsd.id)
+
+ # Modified the name of the NSD using an xpath expression
+ rwxpath.setxattr(nsd_catalog, xpath + "/nsd:name", "test-name")
+
+ name = rwxpath.getxattr(nsd_catalog, xpath + "/nsd:name")
+ self.assertEqual("test-name", name)
+
+ def test_nsd_scalar_fields(self):
+ """
+ Test that setxattr correctly sets the value specified by an xpath.
+
+ """
+ # Define a simple NSD
+ nsd = NsdYang.YangData_Nsd_NsdCatalog_Nsd()
+
+ # Check that the unset fields are in fact set to None
+ self.assertEqual(None, rwxpath.getxattr(nsd, "/nsd:nsd-catalog/nsd:nsd/nsd:name"))
+ self.assertEqual(None, rwxpath.getxattr(nsd, "/nsd:nsd-catalog/nsd:nsd/nsd:short-name"))
+
+ # Set the values of the 'name' and 'short-name' fields
+ rwxpath.setxattr(nsd, "/nsd:nsd-catalog/nsd:nsd/nsd:name", "test-name")
+ rwxpath.setxattr(nsd, "/nsd:nsd-catalog/nsd:nsd/nsd:short-name", "test-short-name")
+
+ # Check that the 'name' and 'short-name' fields are correctly set
+ self.assertEqual(nsd.name, rwxpath.getxattr(nsd, "/nsd:nsd-catalog/nsd:nsd/nsd:name"))
+ self.assertEqual(nsd.short_name, rwxpath.getxattr(nsd, "/nsd:nsd-catalog/nsd:nsd/nsd:short-name"))
+
+
+class TestInputParameterSubstitution(unittest.TestCase):
+ def setUp(self):
+ self.substitute_input_parameters = rwnsmtasklet.InputParameterSubstitution(logger)
+
+ def test_null_arguments(self):
+ """
+ If None is passed to the substitutor for either the NSD or the NSR
+ config, no exception should be raised.
+
+ """
+ nsd = NsdYang.YangData_Nsd_NsdCatalog_Nsd()
+ nsr_config = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+
+ self.substitute_input_parameters(None, None)
+ self.substitute_input_parameters(nsd, None)
+ self.substitute_input_parameters(None, nsr_config)
+
+ def test_illegal_input_parameter(self):
+ """
+ In the NSD there is a list of the parameters that are allowed to be
+ sbustituted by input parameters. This test checks that when an input
+ parameter is provided in the NSR config that is not in the NSD, it is
+ not applied.
+
+ """
+ # Define the original NSD
+ nsd = NsdYang.YangData_Nsd_NsdCatalog_Nsd()
+ nsd.name = "robert"
+ nsd.short_name = "bob"
+
+ # Define which parameters may be modified
+ nsd.input_parameter_xpath.append(
+ NsdYang.YangData_Nsd_NsdCatalog_Nsd_InputParameterXpath(
+ xpath="/nsd:nsd-catalog/nsd:nsd/nsd:name",
+ label="NSD Name",
+ )
+ )
+
+ # Define the input parameters that are intended to be modified
+ nsr_config = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr_config.input_parameter.extend([
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter(
+ xpath="/nsd:nsd-catalog/nsd:nsd/nsd:name",
+ value="alice",
+ ),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter(
+ xpath="/nsd:nsd-catalog/nsd:nsd/nsd:short-name",
+ value="alice",
+ ),
+ ])
+
+ self.substitute_input_parameters(nsd, nsr_config)
+
+ # Verify that only the parameter in the input_parameter_xpath list is
+ # modified after the input parameters have been applied.
+ self.assertEqual("alice", nsd.name)
+ self.assertEqual("bob", nsd.short_name)
+
+ def test_substitution(self):
+ """
+ Test that substitution of input parameters occurs as expected.
+
+ """
+ # Define the original NSD
+ nsd = NsdYang.YangData_Nsd_NsdCatalog_Nsd()
+ nsd.name = "robert"
+ nsd.short_name = "bob"
+
+ # Define which parameters may be modified
+ nsd.input_parameter_xpath.extend([
+ NsdYang.YangData_Nsd_NsdCatalog_Nsd_InputParameterXpath(
+ xpath="/nsd:nsd-catalog/nsd:nsd/nsd:name",
+ label="NSD Name",
+ ),
+ NsdYang.YangData_Nsd_NsdCatalog_Nsd_InputParameterXpath(
+ xpath="/nsd:nsd-catalog/nsd:nsd/nsd:short-name",
+ label="NSD Short Name",
+ ),
+ ])
+
+ # Define the input parameters that are intended to be modified
+ nsr_config = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
+ nsr_config.input_parameter.extend([
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter(
+ xpath="/nsd:nsd-catalog/nsd:nsd/nsd:name",
+ value="robert",
+ ),
+ NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter(
+ xpath="/nsd:nsd-catalog/nsd:nsd/nsd:short-name",
+ value="bob",
+ ),
+ ])
+
+ self.substitute_input_parameters(nsd, nsr_config)
+
+ # Verify that both the 'name' and 'short-name' fields are correctly
+ # replaced.
+ self.assertEqual("robert", nsd.name)
+ self.assertEqual("bob", nsd.short_name)
+
+
+def main(argv=sys.argv[1:]):
+ logging.basicConfig(format='TEST %(message)s')
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+
+ args = parser.parse_args(argv)
+
+ # Set the global logging level
+ logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.FATAL)
+
+ # Make the test logger very quiet
+ logger.addHandler(logging.NullHandler())
+
+ # The unittest framework requires a program name, so use the name of this
+ # file instead (we do not want to have to pass a fake program name to main
+ # when this is called from the interpreter).
+ unittest.main(argv=[__file__] + argv,
+ testRunner=xmlrunner.XMLTestRunner(
+ output=os.environ["RIFT_MODULE_TEST"]))
+
+if __name__ == '__main__':
+ main()
diff --git a/rwlaunchpad/test/utest_scaling_rpc.py b/rwlaunchpad/test/utest_scaling_rpc.py
new file mode 100644
index 0000000..b2290af
--- /dev/null
+++ b/rwlaunchpad/test/utest_scaling_rpc.py
@@ -0,0 +1,177 @@
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import asyncio
+import os
+import sys
+import unittest
+import uuid
+import xmlrunner
+import argparse
+import logging
+import time
+import types
+
+import gi
+gi.require_version('RwCloudYang', '1.0')
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwNsmYang', '1.0')
+gi.require_version('RwLaunchpadYang', '1.0')
+gi.require_version('RwResourceMgrYang', '1.0')
+gi.require_version('RwcalYang', '1.0')
+gi.require_version('RwNsrYang', '1.0')
+gi.require_version('NsrYang', '1.0')
+gi.require_version('RwlogMgmtYang', '1.0')
+
+from gi.repository import (
+ RwCloudYang as rwcloudyang,
+ RwDts as rwdts,
+ RwLaunchpadYang as launchpadyang,
+ RwNsmYang as rwnsmyang,
+ RwNsrYang as rwnsryang,
+ NsrYang as nsryang,
+ RwResourceMgrYang as rmgryang,
+ RwcalYang as rwcalyang,
+ RwConfigAgentYang as rwcfg_agent,
+ RwlogMgmtYang
+)
+
+from gi.repository.RwTypes import RwStatus
+import rift.mano.examples.ping_pong_nsd as ping_pong_nsd
+import rift.tasklets
+import rift.test.dts
+import rw_peas
+
+
+
+
+class ManoTestCase(rift.test.dts.AbstractDTSTest):
+ """
+ DTS GI interface unittests
+
+ Note: Each tests uses a list of asyncio.Events for staging through the
+ test. These are required here because we are bring up each coroutine
+ ("tasklet") at the same time and are not implementing any re-try
+ mechanisms. For instance, this is used in numerous tests to make sure that
+ a publisher is up and ready before the subscriber sends queries. Such
+ event lists should not be used in production software.
+ """
+
+ @classmethod
+ def configure_suite(cls, rwmain):
+ nsm_dir = os.environ.get('NSM_DIR')
+
+ rwmain.add_tasklet(nsm_dir, 'rwnsmtasklet')
+
+ @classmethod
+ def configure_schema(cls):
+ return rwnsmyang.get_schema()
+
+ @classmethod
+ def configure_timeout(cls):
+ return 240
+
+ @staticmethod
+ def get_cal_account(account_type, account_name):
+ """
+ Creates an object for class RwcalYang.Clo
+ """
+ account = rwcloudyang.CloudAccount()
+ if account_type == 'mock':
+ account.name = account_name
+ account.account_type = "mock"
+ account.mock.username = "mock_user"
+ elif ((account_type == 'openstack_static') or (account_type == 'openstack_dynamic')):
+ account.name = account_name
+ account.account_type = 'openstack'
+ account.openstack.key = openstack_info['username']
+ account.openstack.secret = openstack_info['password']
+ account.openstack.auth_url = openstack_info['auth_url']
+ account.openstack.tenant = openstack_info['project_name']
+ account.openstack.mgmt_network = openstack_info['mgmt_network']
+ return account
+
+ @asyncio.coroutine
+ def configure_cloud_account(self, dts, cloud_type, cloud_name="cloud1"):
+ account = self.get_cal_account(cloud_type, cloud_name)
+ account_xpath = "C,/rw-cloud:cloud/rw-cloud:account[rw-cloud:name='{}']".format(cloud_name)
+ self.log.info("Configuring cloud-account: %s", account)
+ yield from dts.query_create(account_xpath,
+ rwdts.XactFlag.ADVISE,
+ account)
+
+ @asyncio.coroutine
+ def wait_tasklets(self):
+ yield from asyncio.sleep(5, loop=self.loop)
+
+ def configure_test(self, loop, test_id):
+ self.log.debug("STARTING - %s", self.id())
+ self.tinfo = self.new_tinfo(self.id())
+ self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+ def test_create_nsr_record(self):
+
+ @asyncio.coroutine
+ def run_test():
+ yield from self.wait_tasklets()
+
+ cloud_type = "mock"
+ yield from self.configure_cloud_account(self.dts, cloud_type, "mock_account")
+
+
+ # Trigger an rpc
+ rpc_ip = nsryang.YangInput_Nsr_ExecScaleIn.from_dict({
+ 'nsr_id_ref': '1',
+ 'instance_id': "1",
+ 'scaling_group_name_ref': "foo"})
+
+ yield from self.dts.query_rpc("/nsr:exec-scale-in", 0, rpc_ip)
+
+ future = asyncio.ensure_future(run_test(), loop=self.loop)
+ self.run_until(future.done)
+ if future.exception() is not None:
+ self.log.error("Caught exception during test")
+ raise future.exception()
+
+
+def main():
+ top_dir = __file__[:__file__.find('/modules/core/')]
+ build_dir = os.path.join(top_dir, '.build/modules/core/rwvx/src/core_rwvx-build')
+ launchpad_build_dir = os.path.join(top_dir, '.build/modules/core/mc/core_mc-build/rwlaunchpad')
+
+ if 'NSM_DIR' not in os.environ:
+ os.environ['NSM_DIR'] = os.path.join(launchpad_build_dir, 'plugins/rwnsm')
+
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('-n', '--no-runner', action='store_true')
+ args, unittest_args = parser.parse_known_args()
+ if args.no_runner:
+ runner = None
+
+ ManoTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+ main()
+
+# vim: sw=4