RIFT OSM R1 Initial Submission
[osm/SO.git] / rwcm / test / rwso_test.py
diff --git a/rwcm/test/rwso_test.py b/rwcm/test/rwso_test.py
new file mode 100755 (executable)
index 0000000..e0c5011
--- /dev/null
@@ -0,0 +1,349 @@
+#!/usr/bin/env python3
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+import asyncio
+import logging
+import os
+import sys
+import types
+import unittest
+import uuid
+
+import xmlrunner
+
+import gi.repository.CF as cf
+import gi.repository.RwDts as rwdts
+import gi.repository.RwMain as rwmain
+import gi.repository.RwManifestYang as rwmanifest
+import gi.repository.RwConmanYang as conmanY
+import gi.repository.RwLaunchpadYang as launchpadyang
+
+import rift.tasklets
+
+if sys.version_info < (3, 4, 4):
+    asyncio.ensure_future = asyncio.async
+
+
+class RWSOTestCase(unittest.TestCase):
+    """
+    DTS GI interface unittests
+
+    Note:  Each tests uses a list of asyncio.Events for staging through the
+    test.  These are required here because we are bring up each coroutine
+    ("tasklet") at the same time and are not implementing any re-try
+    mechanisms.  For instance, this is used in numerous tests to make sure that
+    a publisher is up and ready before the subscriber sends queries.  Such
+    event lists should not be used in production software.
+    """
+    rwmain = None
+    tinfo = None
+    schema = None
+    id_cnt = 0
+
+    @classmethod
+    def setUpClass(cls):
+        msgbroker_dir = os.environ.get('MESSAGE_BROKER_DIR')
+        router_dir = os.environ.get('ROUTER_DIR')
+        cm_dir = os.environ.get('SO_DIR')
+
+        manifest = rwmanifest.Manifest()
+        manifest.init_phase.settings.rwdtsrouter.single_dtsrouter.enable = True
+
+        cls.rwmain = rwmain.Gi.new(manifest)
+        cls.tinfo = cls.rwmain.get_tasklet_info()
+
+        # Run router in mainq.  Eliminates some ill-diagnosed bootstrap races.
+        os.environ['RWDTS_ROUTER_MAINQ']='1'
+        cls.rwmain.add_tasklet(msgbroker_dir, 'rwmsgbroker-c')
+        cls.rwmain.add_tasklet(router_dir, 'rwdtsrouter-c')
+        cls.rwmain.add_tasklet(cm_dir, 'rwconmantasklet')
+
+        cls.log = rift.tasklets.logger_from_tasklet_info(cls.tinfo)
+        cls.log.setLevel(logging.DEBUG)
+
+        stderr_handler = logging.StreamHandler(stream=sys.stderr)
+        fmt = logging.Formatter(
+                '%(asctime)-23s %(levelname)-5s  (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
+        stderr_handler.setFormatter(fmt)
+        cls.log.addHandler(stderr_handler)
+        cls.schema = conmanY.get_schema()
+
+    def setUp(self):
+        def scheduler_tick(self, *args):
+            self.call_soon(self.stop)
+            self.run_forever()
+
+        self.loop = asyncio.new_event_loop()
+        self.loop.scheduler_tick = types.MethodType(scheduler_tick, self.loop)
+        self.loop.set_debug(True)
+        os.environ["PYTHONASYNCIODEBUG"] = "1"
+        asyncio_logger = logging.getLogger("asyncio")
+        asyncio_logger.setLevel(logging.DEBUG)
+
+        self.asyncio_timer = None
+        self.stop_timer = None
+        self.id_cnt += 1
+
+    @asyncio.coroutine
+    def wait_tasklets(self):
+        yield from asyncio.sleep(1, loop=self.loop)
+
+    def run_until(self, test_done, timeout=30):
+        """
+        Attach the current asyncio event loop to rwsched and then run the
+        scheduler until the test_done function returns True or timeout seconds
+        pass.
+
+        @param test_done  - function which should return True once the test is
+                            complete and the scheduler no longer needs to run.
+        @param timeout    - maximum number of seconds to run the test.
+        """
+        def shutdown(*args):
+            if args:
+                self.log.debug('Shutting down loop due to timeout')
+
+            if self.asyncio_timer is not None:
+                self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.asyncio_timer)
+                self.asyncio_timer = None
+
+            if self.stop_timer is not None:
+                self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.stop_timer)
+                self.stop_timer = None
+
+            self.tinfo.rwsched_instance.CFRunLoopStop()
+
+        def tick(*args):
+            self.loop.call_later(0.1, self.loop.stop)
+            self.loop.run_forever()
+            if test_done():
+                shutdown()
+
+        self.asyncio_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
+            cf.CFAbsoluteTimeGetCurrent(),
+            0.1,
+            tick,
+            None)
+
+        self.stop_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
+            cf.CFAbsoluteTimeGetCurrent() + timeout,
+            0,
+            shutdown,
+            None)
+
+        self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
+            self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
+            self.stop_timer,
+            self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
+
+        self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
+            self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
+            self.asyncio_timer,
+            self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
+
+        self.tinfo.rwsched_instance.CFRunLoopRun()
+
+        self.assertTrue(test_done())
+
+    def new_tinfo(self, name):
+        """
+        Create a new tasklet info instance with a unique instance_id per test.
+        It is up to each test to use unique names if more that one tasklet info
+        instance is needed.
+
+        @param name - name of the "tasklet"
+        @return     - new tasklet info instance
+        """
+        ret = self.rwmain.new_tasklet_info(name, RWSOTestCase.id_cnt)
+
+        log = rift.tasklets.logger_from_tasklet_info(ret)
+        log.setLevel(logging.DEBUG)
+
+        stderr_handler = logging.StreamHandler(stream=sys.stderr)
+        fmt = logging.Formatter(
+                '%(asctime)-23s %(levelname)-5s  (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
+        stderr_handler.setFormatter(fmt)
+        log.addHandler(stderr_handler)
+
+        return ret
+
+    def get_cloud_account_msg(self):
+        cloud_account = launchpadyang.CloudAccount()
+        cloud_account.name = "cloudy"
+        cloud_account.account_type = "mock"
+        cloud_account.mock.username = "rainy"
+        return cloud_account
+
+    def get_compute_pool_msg(self, name, pool_type):
+        pool_config = rmgryang.ResourcePools()
+        pool = pool_config.pools.add()
+        pool.name = name
+        pool.resource_type = "compute"
+        if pool_type == "static":
+            # Need to query CAL for resource
+            pass
+        else:
+            pool.max_size = 10
+        return pool_config
+
+    def get_network_pool_msg(self, name, pool_type):
+        pool_config = rmgryang.ResourcePools()
+        pool = pool_config.pools.add()
+        pool.name = name
+        pool.resource_type = "network"
+        if pool_type == "static":
+            # Need to query CAL for resource
+            pass
+        else:
+            pool.max_size = 4
+        return pool_config
+
+
+    def get_network_reserve_msg(self, xpath):
+        event_id = str(uuid.uuid4())
+        msg = rmgryang.VirtualLinkEventData()
+        msg.event_id = event_id
+        msg.request_info.name = "mynet"
+        msg.request_info.subnet = "1.1.1.0/24"
+        return msg, xpath.format(event_id)
+
+    def get_compute_reserve_msg(self,xpath):
+        event_id = str(uuid.uuid4())
+        msg = rmgryang.VDUEventData()
+        msg.event_id = event_id
+        msg.request_info.name = "mynet"
+        msg.request_info.image_id  = "This is a image_id"
+        msg.request_info.vm_flavor.vcpu_count = 4
+        msg.request_info.vm_flavor.memory_mb = 8192*2
+        msg.request_info.vm_flavor.storage_gb = 40
+        c1 = msg.request_info.connection_points.add()
+        c1.name = "myport1"
+        c1.virtual_link_id = "This is a network_id"
+        return msg, xpath.format(event_id)
+
+    def test_create_resource_pools(self):
+        self.log.debug("STARTING - test_create_resource_pools")
+        tinfo = self.new_tinfo('poolconfig')
+        dts = rift.tasklets.DTS(tinfo, self.schema, self.loop)
+        pool_xpath = "C,/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools"
+        pool_records_xpath = "D,/rw-resource-mgr:resource-pool-records"
+        account_xpath = "C,/rw-launchpad:cloud-account"
+        compute_xpath = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id='{}']"
+        network_xpath = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id='{}']"
+
+        @asyncio.coroutine
+        def configure_cloud_account():
+            msg = self.get_cloud_account_msg()
+            self.log.info("Configuring cloud-account: %s",msg)
+            yield from dts.query_create(account_xpath,
+                                        rwdts.XactFlag.ADVISE,
+                                        msg)
+            yield from asyncio.sleep(3, loop=self.loop)
+
+        @asyncio.coroutine
+        def configure_compute_resource_pools():
+            msg = self.get_compute_pool_msg("virtual-compute", "dynamic")
+            self.log.info("Configuring compute-resource-pool: %s",msg)
+            yield from dts.query_create(pool_xpath,
+                                        rwdts.XactFlag.ADVISE,
+                                        msg)
+            yield from asyncio.sleep(3, loop=self.loop)
+
+
+        @asyncio.coroutine
+        def configure_network_resource_pools():
+            msg = self.get_network_pool_msg("virtual-network", "dynamic")
+            self.log.info("Configuring network-resource-pool: %s",msg)
+            yield from dts.query_create(pool_xpath,
+                                        rwdts.XactFlag.ADVISE,
+                                        msg)
+            yield from asyncio.sleep(3, loop=self.loop)
+
+
+        @asyncio.coroutine
+        def verify_resource_pools():
+            self.log.debug("Verifying test_create_resource_pools results")
+            res_iter = yield from dts.query_read(pool_records_xpath,)
+            for result in res_iter:
+                response = yield from result
+                records = response.result.records
+                #self.assertEqual(len(records), 2)
+                #names = [i.name for i in records]
+                #self.assertTrue('virtual-compute' in names)
+                #self.assertTrue('virtual-network' in names)
+                for record in records:
+                    self.log.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d",
+                                   record.name,
+                                   record.resource_type,
+                                   record.pool_status,
+                                   record.max_size,
+                                   record.busy_resources)
+        @asyncio.coroutine
+        def reserve_network_resources():
+            msg,xpath = self.get_network_reserve_msg(network_xpath)
+            self.log.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath, msg))
+            yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
+            yield from asyncio.sleep(3, loop=self.loop)
+            yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
+
+        @asyncio.coroutine
+        def reserve_compute_resources():
+            msg,xpath = self.get_compute_reserve_msg(compute_xpath)
+            self.log.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath, msg))
+            yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
+            yield from asyncio.sleep(3, loop=self.loop)
+            yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
+
+        @asyncio.coroutine
+        def run_test():
+            yield from self.wait_tasklets()
+            yield from configure_cloud_account()
+            yield from configure_compute_resource_pools()
+            yield from configure_network_resource_pools()
+            yield from verify_resource_pools()
+            yield from reserve_network_resources()
+            yield from reserve_compute_resources()
+
+        future = asyncio.ensure_future(run_test(), loop=self.loop)
+        self.run_until(future.done)
+        if future.exception() is not None:
+            self.log.error("Caught exception during test")
+            raise future.exception()
+
+        self.log.debug("DONE - test_create_resource_pools")
+
+
+def main():
+    plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins")
+
+    if 'MESSAGE_BROKER_DIR' not in os.environ:
+        os.environ['MESSAGE_BROKER_DIR'] = os.path.join(plugin_dir, 'rwmsgbroker-c')
+
+    if 'ROUTER_DIR' not in os.environ:
+        os.environ['ROUTER_DIR'] = os.path.join(plugin_dir, 'rwdtsrouter-c')
+
+    if 'SO_DIR' not in os.environ:
+        os.environ['SO_DIR'] = os.path.join(plugin_dir, 'rwconmantasklet')
+
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+    unittest.main(testRunner=runner)
+
+if __name__ == '__main__':
+    main()
+
+# vim: sw=4