blob: 94028eb31432d9905e0df6d938891ece9805d9d8 [file] [log] [blame]
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -04001#!/usr/bin/env python3
2
3#
4# Copyright 2016 RIFT.IO Inc
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19
20import asyncio
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -040021import gi
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -040022import logging
23import os
24import sys
25import types
26import unittest
27import uuid
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -040028import xmlrunner
29
30import gi.repository.CF as cf
31import gi.repository.RwDts as rwdts
32import gi.repository.RwMain as rwmain
33import gi.repository.RwManifestYang as rwmanifest
34import gi.repository.RwConmanYang as conmanY
35import gi.repository.RwLaunchpadYang as launchpadyang
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -040036gi.require_version('RwKeyspec', '1.0')
37from gi.repository.RwKeyspec import quoted_key
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -040038
39import rift.tasklets
40
41if sys.version_info < (3, 4, 4):
42 asyncio.ensure_future = asyncio.async
43
44
45class RWSOTestCase(unittest.TestCase):
46 """
47 DTS GI interface unittests
48
49 Note: Each tests uses a list of asyncio.Events for staging through the
50 test. These are required here because we are bring up each coroutine
51 ("tasklet") at the same time and are not implementing any re-try
52 mechanisms. For instance, this is used in numerous tests to make sure that
53 a publisher is up and ready before the subscriber sends queries. Such
54 event lists should not be used in production software.
55 """
56 rwmain = None
57 tinfo = None
58 schema = None
59 id_cnt = 0
60
61 @classmethod
62 def setUpClass(cls):
63 msgbroker_dir = os.environ.get('MESSAGE_BROKER_DIR')
64 router_dir = os.environ.get('ROUTER_DIR')
65 cm_dir = os.environ.get('SO_DIR')
66
67 manifest = rwmanifest.Manifest()
68 manifest.init_phase.settings.rwdtsrouter.single_dtsrouter.enable = True
69
70 cls.rwmain = rwmain.Gi.new(manifest)
71 cls.tinfo = cls.rwmain.get_tasklet_info()
72
73 # Run router in mainq. Eliminates some ill-diagnosed bootstrap races.
74 os.environ['RWDTS_ROUTER_MAINQ']='1'
75 cls.rwmain.add_tasklet(msgbroker_dir, 'rwmsgbroker-c')
76 cls.rwmain.add_tasklet(router_dir, 'rwdtsrouter-c')
77 cls.rwmain.add_tasklet(cm_dir, 'rwconmantasklet')
78
79 cls.log = rift.tasklets.logger_from_tasklet_info(cls.tinfo)
80 cls.log.setLevel(logging.DEBUG)
81
82 stderr_handler = logging.StreamHandler(stream=sys.stderr)
83 fmt = logging.Formatter(
84 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
85 stderr_handler.setFormatter(fmt)
86 cls.log.addHandler(stderr_handler)
87 cls.schema = conmanY.get_schema()
88
89 def setUp(self):
90 def scheduler_tick(self, *args):
91 self.call_soon(self.stop)
92 self.run_forever()
93
94 self.loop = asyncio.new_event_loop()
95 self.loop.scheduler_tick = types.MethodType(scheduler_tick, self.loop)
96 self.loop.set_debug(True)
97 os.environ["PYTHONASYNCIODEBUG"] = "1"
98 asyncio_logger = logging.getLogger("asyncio")
99 asyncio_logger.setLevel(logging.DEBUG)
100
101 self.asyncio_timer = None
102 self.stop_timer = None
103 self.id_cnt += 1
104
105 @asyncio.coroutine
106 def wait_tasklets(self):
107 yield from asyncio.sleep(1, loop=self.loop)
108
109 def run_until(self, test_done, timeout=30):
110 """
111 Attach the current asyncio event loop to rwsched and then run the
112 scheduler until the test_done function returns True or timeout seconds
113 pass.
114
115 @param test_done - function which should return True once the test is
116 complete and the scheduler no longer needs to run.
117 @param timeout - maximum number of seconds to run the test.
118 """
119 def shutdown(*args):
120 if args:
121 self.log.debug('Shutting down loop due to timeout')
122
123 if self.asyncio_timer is not None:
124 self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.asyncio_timer)
125 self.asyncio_timer = None
126
127 if self.stop_timer is not None:
128 self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.stop_timer)
129 self.stop_timer = None
130
131 self.tinfo.rwsched_instance.CFRunLoopStop()
132
133 def tick(*args):
134 self.loop.call_later(0.1, self.loop.stop)
135 self.loop.run_forever()
136 if test_done():
137 shutdown()
138
139 self.asyncio_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
140 cf.CFAbsoluteTimeGetCurrent(),
141 0.1,
142 tick,
143 None)
144
145 self.stop_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
146 cf.CFAbsoluteTimeGetCurrent() + timeout,
147 0,
148 shutdown,
149 None)
150
151 self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
152 self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
153 self.stop_timer,
154 self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
155
156 self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
157 self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
158 self.asyncio_timer,
159 self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
160
161 self.tinfo.rwsched_instance.CFRunLoopRun()
162
163 self.assertTrue(test_done())
164
165 def new_tinfo(self, name):
166 """
167 Create a new tasklet info instance with a unique instance_id per test.
168 It is up to each test to use unique names if more that one tasklet info
169 instance is needed.
170
171 @param name - name of the "tasklet"
172 @return - new tasklet info instance
173 """
174 ret = self.rwmain.new_tasklet_info(name, RWSOTestCase.id_cnt)
175
176 log = rift.tasklets.logger_from_tasklet_info(ret)
177 log.setLevel(logging.DEBUG)
178
179 stderr_handler = logging.StreamHandler(stream=sys.stderr)
180 fmt = logging.Formatter(
181 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
182 stderr_handler.setFormatter(fmt)
183 log.addHandler(stderr_handler)
184
185 return ret
186
187 def get_cloud_account_msg(self):
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400188 cloud_account = launchpadyang.YangData_RwProject_Project_CloudAccounts_CloudAccountList()
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400189 cloud_account.name = "cloudy"
190 cloud_account.account_type = "mock"
191 cloud_account.mock.username = "rainy"
192 return cloud_account
193
194 def get_compute_pool_msg(self, name, pool_type):
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400195 pool_config = rmgryang.YangData_RwProject_Project_ResourceMgrConfig_ResourcePools()
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400196 pool = pool_config.pools.add()
197 pool.name = name
198 pool.resource_type = "compute"
199 if pool_type == "static":
200 # Need to query CAL for resource
201 pass
202 else:
203 pool.max_size = 10
204 return pool_config
205
206 def get_network_pool_msg(self, name, pool_type):
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400207 pool_config = rmgryang.YangData_RwProject_Project_ResourceMgrConfig_ResourcePools()
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400208 pool = pool_config.pools.add()
209 pool.name = name
210 pool.resource_type = "network"
211 if pool_type == "static":
212 # Need to query CAL for resource
213 pass
214 else:
215 pool.max_size = 4
216 return pool_config
217
218
219 def get_network_reserve_msg(self, xpath):
220 event_id = str(uuid.uuid4())
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400221 msg = rmgryang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData()
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400222 msg.event_id = event_id
223 msg.request_info.name = "mynet"
224 msg.request_info.subnet = "1.1.1.0/24"
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400225 return msg, xpath.format(quoted_key(event_id))
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400226
227 def get_compute_reserve_msg(self,xpath):
228 event_id = str(uuid.uuid4())
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400229 msg = rmgryang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData()
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400230 msg.event_id = event_id
231 msg.request_info.name = "mynet"
232 msg.request_info.image_id = "This is a image_id"
233 msg.request_info.vm_flavor.vcpu_count = 4
234 msg.request_info.vm_flavor.memory_mb = 8192*2
235 msg.request_info.vm_flavor.storage_gb = 40
236 c1 = msg.request_info.connection_points.add()
237 c1.name = "myport1"
238 c1.virtual_link_id = "This is a network_id"
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400239 return msg, xpath.format(quoted_key(event_id))
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400240
241 def test_create_resource_pools(self):
242 self.log.debug("STARTING - test_create_resource_pools")
243 tinfo = self.new_tinfo('poolconfig')
244 dts = rift.tasklets.DTS(tinfo, self.schema, self.loop)
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400245 pool_xpath = "C,/rw-project:project/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools"
246 pool_records_xpath = "D,/rw-project:project/rw-resource-mgr:resource-pool-records"
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400247 account_xpath = "C,/rw-launchpad:cloud-account"
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400248 compute_xpath = "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id={}]"
249 network_xpath = "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id={}]"
Jeremy Mordkoff6f07e6f2016-09-07 18:56:51 -0400250
251 @asyncio.coroutine
252 def configure_cloud_account():
253 msg = self.get_cloud_account_msg()
254 self.log.info("Configuring cloud-account: %s",msg)
255 yield from dts.query_create(account_xpath,
256 rwdts.XactFlag.ADVISE,
257 msg)
258 yield from asyncio.sleep(3, loop=self.loop)
259
260 @asyncio.coroutine
261 def configure_compute_resource_pools():
262 msg = self.get_compute_pool_msg("virtual-compute", "dynamic")
263 self.log.info("Configuring compute-resource-pool: %s",msg)
264 yield from dts.query_create(pool_xpath,
265 rwdts.XactFlag.ADVISE,
266 msg)
267 yield from asyncio.sleep(3, loop=self.loop)
268
269
270 @asyncio.coroutine
271 def configure_network_resource_pools():
272 msg = self.get_network_pool_msg("virtual-network", "dynamic")
273 self.log.info("Configuring network-resource-pool: %s",msg)
274 yield from dts.query_create(pool_xpath,
275 rwdts.XactFlag.ADVISE,
276 msg)
277 yield from asyncio.sleep(3, loop=self.loop)
278
279
280 @asyncio.coroutine
281 def verify_resource_pools():
282 self.log.debug("Verifying test_create_resource_pools results")
283 res_iter = yield from dts.query_read(pool_records_xpath,)
284 for result in res_iter:
285 response = yield from result
286 records = response.result.records
287 #self.assertEqual(len(records), 2)
288 #names = [i.name for i in records]
289 #self.assertTrue('virtual-compute' in names)
290 #self.assertTrue('virtual-network' in names)
291 for record in records:
292 self.log.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d",
293 record.name,
294 record.resource_type,
295 record.pool_status,
296 record.max_size,
297 record.busy_resources)
298 @asyncio.coroutine
299 def reserve_network_resources():
300 msg,xpath = self.get_network_reserve_msg(network_xpath)
301 self.log.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath, msg))
302 yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
303 yield from asyncio.sleep(3, loop=self.loop)
304 yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
305
306 @asyncio.coroutine
307 def reserve_compute_resources():
308 msg,xpath = self.get_compute_reserve_msg(compute_xpath)
309 self.log.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath, msg))
310 yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
311 yield from asyncio.sleep(3, loop=self.loop)
312 yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
313
314 @asyncio.coroutine
315 def run_test():
316 yield from self.wait_tasklets()
317 yield from configure_cloud_account()
318 yield from configure_compute_resource_pools()
319 yield from configure_network_resource_pools()
320 yield from verify_resource_pools()
321 yield from reserve_network_resources()
322 yield from reserve_compute_resources()
323
324 future = asyncio.ensure_future(run_test(), loop=self.loop)
325 self.run_until(future.done)
326 if future.exception() is not None:
327 self.log.error("Caught exception during test")
328 raise future.exception()
329
330 self.log.debug("DONE - test_create_resource_pools")
331
332
333def main():
334 plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins")
335
336 if 'MESSAGE_BROKER_DIR' not in os.environ:
337 os.environ['MESSAGE_BROKER_DIR'] = os.path.join(plugin_dir, 'rwmsgbroker-c')
338
339 if 'ROUTER_DIR' not in os.environ:
340 os.environ['ROUTER_DIR'] = os.path.join(plugin_dir, 'rwdtsrouter-c')
341
342 if 'SO_DIR' not in os.environ:
343 os.environ['SO_DIR'] = os.path.join(plugin_dir, 'rwconmantasklet')
344
345 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
346 unittest.main(testRunner=runner)
347
348if __name__ == '__main__':
349 main()
350
351# vim: sw=4