X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwcm%2Ftest%2Frwso_test.py;fp=rwcm%2Ftest%2Frwso_test.py;h=e0c5011b963b1976ce93ba28cc27d83b03863557;hb=6f07e6f33f751ab4ffe624f6037f887b243bece2;hp=0000000000000000000000000000000000000000;hpb=72a563886272088feb7cb52e4aafbe6d2c580ff9;p=osm%2FSO.git diff --git a/rwcm/test/rwso_test.py b/rwcm/test/rwso_test.py new file mode 100755 index 00000000..e0c5011b --- /dev/null +++ b/rwcm/test/rwso_test.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 + +# +# Copyright 2016 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import asyncio +import logging +import os +import sys +import types +import unittest +import uuid + +import xmlrunner + +import gi.repository.CF as cf +import gi.repository.RwDts as rwdts +import gi.repository.RwMain as rwmain +import gi.repository.RwManifestYang as rwmanifest +import gi.repository.RwConmanYang as conmanY +import gi.repository.RwLaunchpadYang as launchpadyang + +import rift.tasklets + +if sys.version_info < (3, 4, 4): + asyncio.ensure_future = asyncio.async + + +class RWSOTestCase(unittest.TestCase): + """ + DTS GI interface unittests + + Note: Each tests uses a list of asyncio.Events for staging through the + test. These are required here because we are bring up each coroutine + ("tasklet") at the same time and are not implementing any re-try + mechanisms. For instance, this is used in numerous tests to make sure that + a publisher is up and ready before the subscriber sends queries. Such + event lists should not be used in production software. + """ + rwmain = None + tinfo = None + schema = None + id_cnt = 0 + + @classmethod + def setUpClass(cls): + msgbroker_dir = os.environ.get('MESSAGE_BROKER_DIR') + router_dir = os.environ.get('ROUTER_DIR') + cm_dir = os.environ.get('SO_DIR') + + manifest = rwmanifest.Manifest() + manifest.init_phase.settings.rwdtsrouter.single_dtsrouter.enable = True + + cls.rwmain = rwmain.Gi.new(manifest) + cls.tinfo = cls.rwmain.get_tasklet_info() + + # Run router in mainq. Eliminates some ill-diagnosed bootstrap races. + os.environ['RWDTS_ROUTER_MAINQ']='1' + cls.rwmain.add_tasklet(msgbroker_dir, 'rwmsgbroker-c') + cls.rwmain.add_tasklet(router_dir, 'rwdtsrouter-c') + cls.rwmain.add_tasklet(cm_dir, 'rwconmantasklet') + + cls.log = rift.tasklets.logger_from_tasklet_info(cls.tinfo) + cls.log.setLevel(logging.DEBUG) + + stderr_handler = logging.StreamHandler(stream=sys.stderr) + fmt = logging.Formatter( + '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s') + stderr_handler.setFormatter(fmt) + cls.log.addHandler(stderr_handler) + cls.schema = conmanY.get_schema() + + def setUp(self): + def scheduler_tick(self, *args): + self.call_soon(self.stop) + self.run_forever() + + self.loop = asyncio.new_event_loop() + self.loop.scheduler_tick = types.MethodType(scheduler_tick, self.loop) + self.loop.set_debug(True) + os.environ["PYTHONASYNCIODEBUG"] = "1" + asyncio_logger = logging.getLogger("asyncio") + asyncio_logger.setLevel(logging.DEBUG) + + self.asyncio_timer = None + self.stop_timer = None + self.id_cnt += 1 + + @asyncio.coroutine + def wait_tasklets(self): + yield from asyncio.sleep(1, loop=self.loop) + + def run_until(self, test_done, timeout=30): + """ + Attach the current asyncio event loop to rwsched and then run the + scheduler until the test_done function returns True or timeout seconds + pass. + + @param test_done - function which should return True once the test is + complete and the scheduler no longer needs to run. + @param timeout - maximum number of seconds to run the test. + """ + def shutdown(*args): + if args: + self.log.debug('Shutting down loop due to timeout') + + if self.asyncio_timer is not None: + self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.asyncio_timer) + self.asyncio_timer = None + + if self.stop_timer is not None: + self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.stop_timer) + self.stop_timer = None + + self.tinfo.rwsched_instance.CFRunLoopStop() + + def tick(*args): + self.loop.call_later(0.1, self.loop.stop) + self.loop.run_forever() + if test_done(): + shutdown() + + self.asyncio_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer( + cf.CFAbsoluteTimeGetCurrent(), + 0.1, + tick, + None) + + self.stop_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer( + cf.CFAbsoluteTimeGetCurrent() + timeout, + 0, + shutdown, + None) + + self.tinfo.rwsched_tasklet.CFRunLoopAddTimer( + self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(), + self.stop_timer, + self.tinfo.rwsched_instance.CFRunLoopGetMainMode()) + + self.tinfo.rwsched_tasklet.CFRunLoopAddTimer( + self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(), + self.asyncio_timer, + self.tinfo.rwsched_instance.CFRunLoopGetMainMode()) + + self.tinfo.rwsched_instance.CFRunLoopRun() + + self.assertTrue(test_done()) + + def new_tinfo(self, name): + """ + Create a new tasklet info instance with a unique instance_id per test. + It is up to each test to use unique names if more that one tasklet info + instance is needed. + + @param name - name of the "tasklet" + @return - new tasklet info instance + """ + ret = self.rwmain.new_tasklet_info(name, RWSOTestCase.id_cnt) + + log = rift.tasklets.logger_from_tasklet_info(ret) + log.setLevel(logging.DEBUG) + + stderr_handler = logging.StreamHandler(stream=sys.stderr) + fmt = logging.Formatter( + '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s') + stderr_handler.setFormatter(fmt) + log.addHandler(stderr_handler) + + return ret + + def get_cloud_account_msg(self): + cloud_account = launchpadyang.CloudAccount() + cloud_account.name = "cloudy" + cloud_account.account_type = "mock" + cloud_account.mock.username = "rainy" + return cloud_account + + def get_compute_pool_msg(self, name, pool_type): + pool_config = rmgryang.ResourcePools() + pool = pool_config.pools.add() + pool.name = name + pool.resource_type = "compute" + if pool_type == "static": + # Need to query CAL for resource + pass + else: + pool.max_size = 10 + return pool_config + + def get_network_pool_msg(self, name, pool_type): + pool_config = rmgryang.ResourcePools() + pool = pool_config.pools.add() + pool.name = name + pool.resource_type = "network" + if pool_type == "static": + # Need to query CAL for resource + pass + else: + pool.max_size = 4 + return pool_config + + + def get_network_reserve_msg(self, xpath): + event_id = str(uuid.uuid4()) + msg = rmgryang.VirtualLinkEventData() + msg.event_id = event_id + msg.request_info.name = "mynet" + msg.request_info.subnet = "1.1.1.0/24" + return msg, xpath.format(event_id) + + def get_compute_reserve_msg(self,xpath): + event_id = str(uuid.uuid4()) + msg = rmgryang.VDUEventData() + msg.event_id = event_id + msg.request_info.name = "mynet" + msg.request_info.image_id = "This is a image_id" + msg.request_info.vm_flavor.vcpu_count = 4 + msg.request_info.vm_flavor.memory_mb = 8192*2 + msg.request_info.vm_flavor.storage_gb = 40 + c1 = msg.request_info.connection_points.add() + c1.name = "myport1" + c1.virtual_link_id = "This is a network_id" + return msg, xpath.format(event_id) + + def test_create_resource_pools(self): + self.log.debug("STARTING - test_create_resource_pools") + tinfo = self.new_tinfo('poolconfig') + dts = rift.tasklets.DTS(tinfo, self.schema, self.loop) + pool_xpath = "C,/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools" + pool_records_xpath = "D,/rw-resource-mgr:resource-pool-records" + account_xpath = "C,/rw-launchpad:cloud-account" + compute_xpath = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id='{}']" + network_xpath = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id='{}']" + + @asyncio.coroutine + def configure_cloud_account(): + msg = self.get_cloud_account_msg() + self.log.info("Configuring cloud-account: %s",msg) + yield from dts.query_create(account_xpath, + rwdts.XactFlag.ADVISE, + msg) + yield from asyncio.sleep(3, loop=self.loop) + + @asyncio.coroutine + def configure_compute_resource_pools(): + msg = self.get_compute_pool_msg("virtual-compute", "dynamic") + self.log.info("Configuring compute-resource-pool: %s",msg) + yield from dts.query_create(pool_xpath, + rwdts.XactFlag.ADVISE, + msg) + yield from asyncio.sleep(3, loop=self.loop) + + + @asyncio.coroutine + def configure_network_resource_pools(): + msg = self.get_network_pool_msg("virtual-network", "dynamic") + self.log.info("Configuring network-resource-pool: %s",msg) + yield from dts.query_create(pool_xpath, + rwdts.XactFlag.ADVISE, + msg) + yield from asyncio.sleep(3, loop=self.loop) + + + @asyncio.coroutine + def verify_resource_pools(): + self.log.debug("Verifying test_create_resource_pools results") + res_iter = yield from dts.query_read(pool_records_xpath,) + for result in res_iter: + response = yield from result + records = response.result.records + #self.assertEqual(len(records), 2) + #names = [i.name for i in records] + #self.assertTrue('virtual-compute' in names) + #self.assertTrue('virtual-network' in names) + for record in records: + self.log.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d", + record.name, + record.resource_type, + record.pool_status, + record.max_size, + record.busy_resources) + @asyncio.coroutine + def reserve_network_resources(): + msg,xpath = self.get_network_reserve_msg(network_xpath) + self.log.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath, msg)) + yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg) + yield from asyncio.sleep(3, loop=self.loop) + yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE) + + @asyncio.coroutine + def reserve_compute_resources(): + msg,xpath = self.get_compute_reserve_msg(compute_xpath) + self.log.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath, msg)) + yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg) + yield from asyncio.sleep(3, loop=self.loop) + yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE) + + @asyncio.coroutine + def run_test(): + yield from self.wait_tasklets() + yield from configure_cloud_account() + yield from configure_compute_resource_pools() + yield from configure_network_resource_pools() + yield from verify_resource_pools() + yield from reserve_network_resources() + yield from reserve_compute_resources() + + future = asyncio.ensure_future(run_test(), loop=self.loop) + self.run_until(future.done) + if future.exception() is not None: + self.log.error("Caught exception during test") + raise future.exception() + + self.log.debug("DONE - test_create_resource_pools") + + +def main(): + plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins") + + if 'MESSAGE_BROKER_DIR' not in os.environ: + os.environ['MESSAGE_BROKER_DIR'] = os.path.join(plugin_dir, 'rwmsgbroker-c') + + if 'ROUTER_DIR' not in os.environ: + os.environ['ROUTER_DIR'] = os.path.join(plugin_dir, 'rwdtsrouter-c') + + if 'SO_DIR' not in os.environ: + os.environ['SO_DIR'] = os.path.join(plugin_dir, 'rwconmantasklet') + + runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"]) + unittest.main(testRunner=runner) + +if __name__ == '__main__': + main() + +# vim: sw=4