| #!/usr/bin/env python3 |
| |
| # |
| # Copyright 2016 RIFT.IO Inc |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| import asyncio |
| import logging |
| import os |
| import sys |
| import types |
| import unittest |
| import uuid |
| |
| import xmlrunner |
| |
| import gi.repository.CF as cf |
| import gi.repository.RwDts as rwdts |
| import gi.repository.RwMain as rwmain |
| import gi.repository.RwManifestYang as rwmanifest |
| import gi.repository.RwConmanYang as conmanY |
| import gi.repository.RwLaunchpadYang as launchpadyang |
| |
| import rift.tasklets |
| |
| if sys.version_info < (3, 4, 4): |
| asyncio.ensure_future = asyncio.async |
| |
| |
| class RWSOTestCase(unittest.TestCase): |
| """ |
| DTS GI interface unittests |
| |
| Note: Each tests uses a list of asyncio.Events for staging through the |
| test. These are required here because we are bring up each coroutine |
| ("tasklet") at the same time and are not implementing any re-try |
| mechanisms. For instance, this is used in numerous tests to make sure that |
| a publisher is up and ready before the subscriber sends queries. Such |
| event lists should not be used in production software. |
| """ |
| rwmain = None |
| tinfo = None |
| schema = None |
| id_cnt = 0 |
| |
| @classmethod |
| def setUpClass(cls): |
| msgbroker_dir = os.environ.get('MESSAGE_BROKER_DIR') |
| router_dir = os.environ.get('ROUTER_DIR') |
| cm_dir = os.environ.get('SO_DIR') |
| |
| manifest = rwmanifest.Manifest() |
| manifest.init_phase.settings.rwdtsrouter.single_dtsrouter.enable = True |
| |
| cls.rwmain = rwmain.Gi.new(manifest) |
| cls.tinfo = cls.rwmain.get_tasklet_info() |
| |
| # Run router in mainq. Eliminates some ill-diagnosed bootstrap races. |
| os.environ['RWDTS_ROUTER_MAINQ']='1' |
| cls.rwmain.add_tasklet(msgbroker_dir, 'rwmsgbroker-c') |
| cls.rwmain.add_tasklet(router_dir, 'rwdtsrouter-c') |
| cls.rwmain.add_tasklet(cm_dir, 'rwconmantasklet') |
| |
| cls.log = rift.tasklets.logger_from_tasklet_info(cls.tinfo) |
| cls.log.setLevel(logging.DEBUG) |
| |
| stderr_handler = logging.StreamHandler(stream=sys.stderr) |
| fmt = logging.Formatter( |
| '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s') |
| stderr_handler.setFormatter(fmt) |
| cls.log.addHandler(stderr_handler) |
| cls.schema = conmanY.get_schema() |
| |
| def setUp(self): |
| def scheduler_tick(self, *args): |
| self.call_soon(self.stop) |
| self.run_forever() |
| |
| self.loop = asyncio.new_event_loop() |
| self.loop.scheduler_tick = types.MethodType(scheduler_tick, self.loop) |
| self.loop.set_debug(True) |
| os.environ["PYTHONASYNCIODEBUG"] = "1" |
| asyncio_logger = logging.getLogger("asyncio") |
| asyncio_logger.setLevel(logging.DEBUG) |
| |
| self.asyncio_timer = None |
| self.stop_timer = None |
| self.id_cnt += 1 |
| |
| @asyncio.coroutine |
| def wait_tasklets(self): |
| yield from asyncio.sleep(1, loop=self.loop) |
| |
| def run_until(self, test_done, timeout=30): |
| """ |
| Attach the current asyncio event loop to rwsched and then run the |
| scheduler until the test_done function returns True or timeout seconds |
| pass. |
| |
| @param test_done - function which should return True once the test is |
| complete and the scheduler no longer needs to run. |
| @param timeout - maximum number of seconds to run the test. |
| """ |
| def shutdown(*args): |
| if args: |
| self.log.debug('Shutting down loop due to timeout') |
| |
| if self.asyncio_timer is not None: |
| self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.asyncio_timer) |
| self.asyncio_timer = None |
| |
| if self.stop_timer is not None: |
| self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.stop_timer) |
| self.stop_timer = None |
| |
| self.tinfo.rwsched_instance.CFRunLoopStop() |
| |
| def tick(*args): |
| self.loop.call_later(0.1, self.loop.stop) |
| self.loop.run_forever() |
| if test_done(): |
| shutdown() |
| |
| self.asyncio_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer( |
| cf.CFAbsoluteTimeGetCurrent(), |
| 0.1, |
| tick, |
| None) |
| |
| self.stop_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer( |
| cf.CFAbsoluteTimeGetCurrent() + timeout, |
| 0, |
| shutdown, |
| None) |
| |
| self.tinfo.rwsched_tasklet.CFRunLoopAddTimer( |
| self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(), |
| self.stop_timer, |
| self.tinfo.rwsched_instance.CFRunLoopGetMainMode()) |
| |
| self.tinfo.rwsched_tasklet.CFRunLoopAddTimer( |
| self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(), |
| self.asyncio_timer, |
| self.tinfo.rwsched_instance.CFRunLoopGetMainMode()) |
| |
| self.tinfo.rwsched_instance.CFRunLoopRun() |
| |
| self.assertTrue(test_done()) |
| |
| def new_tinfo(self, name): |
| """ |
| Create a new tasklet info instance with a unique instance_id per test. |
| It is up to each test to use unique names if more that one tasklet info |
| instance is needed. |
| |
| @param name - name of the "tasklet" |
| @return - new tasklet info instance |
| """ |
| ret = self.rwmain.new_tasklet_info(name, RWSOTestCase.id_cnt) |
| |
| log = rift.tasklets.logger_from_tasklet_info(ret) |
| log.setLevel(logging.DEBUG) |
| |
| stderr_handler = logging.StreamHandler(stream=sys.stderr) |
| fmt = logging.Formatter( |
| '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s') |
| stderr_handler.setFormatter(fmt) |
| log.addHandler(stderr_handler) |
| |
| return ret |
| |
| def get_cloud_account_msg(self): |
| cloud_account = launchpadyang.CloudAccount() |
| cloud_account.name = "cloudy" |
| cloud_account.account_type = "mock" |
| cloud_account.mock.username = "rainy" |
| return cloud_account |
| |
| def get_compute_pool_msg(self, name, pool_type): |
| pool_config = rmgryang.ResourcePools() |
| pool = pool_config.pools.add() |
| pool.name = name |
| pool.resource_type = "compute" |
| if pool_type == "static": |
| # Need to query CAL for resource |
| pass |
| else: |
| pool.max_size = 10 |
| return pool_config |
| |
| def get_network_pool_msg(self, name, pool_type): |
| pool_config = rmgryang.ResourcePools() |
| pool = pool_config.pools.add() |
| pool.name = name |
| pool.resource_type = "network" |
| if pool_type == "static": |
| # Need to query CAL for resource |
| pass |
| else: |
| pool.max_size = 4 |
| return pool_config |
| |
| |
| def get_network_reserve_msg(self, xpath): |
| event_id = str(uuid.uuid4()) |
| msg = rmgryang.VirtualLinkEventData() |
| msg.event_id = event_id |
| msg.request_info.name = "mynet" |
| msg.request_info.subnet = "1.1.1.0/24" |
| return msg, xpath.format(event_id) |
| |
| def get_compute_reserve_msg(self,xpath): |
| event_id = str(uuid.uuid4()) |
| msg = rmgryang.VDUEventData() |
| msg.event_id = event_id |
| msg.request_info.name = "mynet" |
| msg.request_info.image_id = "This is a image_id" |
| msg.request_info.vm_flavor.vcpu_count = 4 |
| msg.request_info.vm_flavor.memory_mb = 8192*2 |
| msg.request_info.vm_flavor.storage_gb = 40 |
| c1 = msg.request_info.connection_points.add() |
| c1.name = "myport1" |
| c1.virtual_link_id = "This is a network_id" |
| return msg, xpath.format(event_id) |
| |
| def test_create_resource_pools(self): |
| self.log.debug("STARTING - test_create_resource_pools") |
| tinfo = self.new_tinfo('poolconfig') |
| dts = rift.tasklets.DTS(tinfo, self.schema, self.loop) |
| pool_xpath = "C,/rw-project:project/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools" |
| pool_records_xpath = "D,/rw-project:project/rw-resource-mgr:resource-pool-records" |
| account_xpath = "C,/rw-launchpad:cloud-account" |
| compute_xpath = "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id='{}']" |
| network_xpath = "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id='{}']" |
| |
| @asyncio.coroutine |
| def configure_cloud_account(): |
| msg = self.get_cloud_account_msg() |
| self.log.info("Configuring cloud-account: %s",msg) |
| yield from dts.query_create(account_xpath, |
| rwdts.XactFlag.ADVISE, |
| msg) |
| yield from asyncio.sleep(3, loop=self.loop) |
| |
| @asyncio.coroutine |
| def configure_compute_resource_pools(): |
| msg = self.get_compute_pool_msg("virtual-compute", "dynamic") |
| self.log.info("Configuring compute-resource-pool: %s",msg) |
| yield from dts.query_create(pool_xpath, |
| rwdts.XactFlag.ADVISE, |
| msg) |
| yield from asyncio.sleep(3, loop=self.loop) |
| |
| |
| @asyncio.coroutine |
| def configure_network_resource_pools(): |
| msg = self.get_network_pool_msg("virtual-network", "dynamic") |
| self.log.info("Configuring network-resource-pool: %s",msg) |
| yield from dts.query_create(pool_xpath, |
| rwdts.XactFlag.ADVISE, |
| msg) |
| yield from asyncio.sleep(3, loop=self.loop) |
| |
| |
| @asyncio.coroutine |
| def verify_resource_pools(): |
| self.log.debug("Verifying test_create_resource_pools results") |
| res_iter = yield from dts.query_read(pool_records_xpath,) |
| for result in res_iter: |
| response = yield from result |
| records = response.result.records |
| #self.assertEqual(len(records), 2) |
| #names = [i.name for i in records] |
| #self.assertTrue('virtual-compute' in names) |
| #self.assertTrue('virtual-network' in names) |
| for record in records: |
| self.log.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d", |
| record.name, |
| record.resource_type, |
| record.pool_status, |
| record.max_size, |
| record.busy_resources) |
| @asyncio.coroutine |
| def reserve_network_resources(): |
| msg,xpath = self.get_network_reserve_msg(network_xpath) |
| self.log.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath, msg)) |
| yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg) |
| yield from asyncio.sleep(3, loop=self.loop) |
| yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE) |
| |
| @asyncio.coroutine |
| def reserve_compute_resources(): |
| msg,xpath = self.get_compute_reserve_msg(compute_xpath) |
| self.log.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath, msg)) |
| yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg) |
| yield from asyncio.sleep(3, loop=self.loop) |
| yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE) |
| |
| @asyncio.coroutine |
| def run_test(): |
| yield from self.wait_tasklets() |
| yield from configure_cloud_account() |
| yield from configure_compute_resource_pools() |
| yield from configure_network_resource_pools() |
| yield from verify_resource_pools() |
| yield from reserve_network_resources() |
| yield from reserve_compute_resources() |
| |
| future = asyncio.ensure_future(run_test(), loop=self.loop) |
| self.run_until(future.done) |
| if future.exception() is not None: |
| self.log.error("Caught exception during test") |
| raise future.exception() |
| |
| self.log.debug("DONE - test_create_resource_pools") |
| |
| |
| def main(): |
| plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins") |
| |
| if 'MESSAGE_BROKER_DIR' not in os.environ: |
| os.environ['MESSAGE_BROKER_DIR'] = os.path.join(plugin_dir, 'rwmsgbroker-c') |
| |
| if 'ROUTER_DIR' not in os.environ: |
| os.environ['ROUTER_DIR'] = os.path.join(plugin_dir, 'rwdtsrouter-c') |
| |
| if 'SO_DIR' not in os.environ: |
| os.environ['SO_DIR'] = os.path.join(plugin_dir, 'rwconmantasklet') |
| |
| runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"]) |
| unittest.main(testRunner=runner) |
| |
| if __name__ == '__main__': |
| main() |
| |
| # vim: sw=4 |