e0c5011b963b1976ce93ba28cc27d83b03863557
[osm/SO.git] / rwcm / test / rwso_test.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import asyncio
21 import logging
22 import os
23 import sys
24 import types
25 import unittest
26 import uuid
27
28 import xmlrunner
29
30 import gi.repository.CF as cf
31 import gi.repository.RwDts as rwdts
32 import gi.repository.RwMain as rwmain
33 import gi.repository.RwManifestYang as rwmanifest
34 import gi.repository.RwConmanYang as conmanY
35 import gi.repository.RwLaunchpadYang as launchpadyang
36
37 import rift.tasklets
38
39 if sys.version_info < (3, 4, 4):
40 asyncio.ensure_future = asyncio.async
41
42
43 class RWSOTestCase(unittest.TestCase):
44 """
45 DTS GI interface unittests
46
47 Note: Each tests uses a list of asyncio.Events for staging through the
48 test. These are required here because we are bring up each coroutine
49 ("tasklet") at the same time and are not implementing any re-try
50 mechanisms. For instance, this is used in numerous tests to make sure that
51 a publisher is up and ready before the subscriber sends queries. Such
52 event lists should not be used in production software.
53 """
54 rwmain = None
55 tinfo = None
56 schema = None
57 id_cnt = 0
58
59 @classmethod
60 def setUpClass(cls):
61 msgbroker_dir = os.environ.get('MESSAGE_BROKER_DIR')
62 router_dir = os.environ.get('ROUTER_DIR')
63 cm_dir = os.environ.get('SO_DIR')
64
65 manifest = rwmanifest.Manifest()
66 manifest.init_phase.settings.rwdtsrouter.single_dtsrouter.enable = True
67
68 cls.rwmain = rwmain.Gi.new(manifest)
69 cls.tinfo = cls.rwmain.get_tasklet_info()
70
71 # Run router in mainq. Eliminates some ill-diagnosed bootstrap races.
72 os.environ['RWDTS_ROUTER_MAINQ']='1'
73 cls.rwmain.add_tasklet(msgbroker_dir, 'rwmsgbroker-c')
74 cls.rwmain.add_tasklet(router_dir, 'rwdtsrouter-c')
75 cls.rwmain.add_tasklet(cm_dir, 'rwconmantasklet')
76
77 cls.log = rift.tasklets.logger_from_tasklet_info(cls.tinfo)
78 cls.log.setLevel(logging.DEBUG)
79
80 stderr_handler = logging.StreamHandler(stream=sys.stderr)
81 fmt = logging.Formatter(
82 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
83 stderr_handler.setFormatter(fmt)
84 cls.log.addHandler(stderr_handler)
85 cls.schema = conmanY.get_schema()
86
87 def setUp(self):
88 def scheduler_tick(self, *args):
89 self.call_soon(self.stop)
90 self.run_forever()
91
92 self.loop = asyncio.new_event_loop()
93 self.loop.scheduler_tick = types.MethodType(scheduler_tick, self.loop)
94 self.loop.set_debug(True)
95 os.environ["PYTHONASYNCIODEBUG"] = "1"
96 asyncio_logger = logging.getLogger("asyncio")
97 asyncio_logger.setLevel(logging.DEBUG)
98
99 self.asyncio_timer = None
100 self.stop_timer = None
101 self.id_cnt += 1
102
103 @asyncio.coroutine
104 def wait_tasklets(self):
105 yield from asyncio.sleep(1, loop=self.loop)
106
107 def run_until(self, test_done, timeout=30):
108 """
109 Attach the current asyncio event loop to rwsched and then run the
110 scheduler until the test_done function returns True or timeout seconds
111 pass.
112
113 @param test_done - function which should return True once the test is
114 complete and the scheduler no longer needs to run.
115 @param timeout - maximum number of seconds to run the test.
116 """
117 def shutdown(*args):
118 if args:
119 self.log.debug('Shutting down loop due to timeout')
120
121 if self.asyncio_timer is not None:
122 self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.asyncio_timer)
123 self.asyncio_timer = None
124
125 if self.stop_timer is not None:
126 self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.stop_timer)
127 self.stop_timer = None
128
129 self.tinfo.rwsched_instance.CFRunLoopStop()
130
131 def tick(*args):
132 self.loop.call_later(0.1, self.loop.stop)
133 self.loop.run_forever()
134 if test_done():
135 shutdown()
136
137 self.asyncio_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
138 cf.CFAbsoluteTimeGetCurrent(),
139 0.1,
140 tick,
141 None)
142
143 self.stop_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
144 cf.CFAbsoluteTimeGetCurrent() + timeout,
145 0,
146 shutdown,
147 None)
148
149 self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
150 self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
151 self.stop_timer,
152 self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
153
154 self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
155 self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
156 self.asyncio_timer,
157 self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
158
159 self.tinfo.rwsched_instance.CFRunLoopRun()
160
161 self.assertTrue(test_done())
162
163 def new_tinfo(self, name):
164 """
165 Create a new tasklet info instance with a unique instance_id per test.
166 It is up to each test to use unique names if more that one tasklet info
167 instance is needed.
168
169 @param name - name of the "tasklet"
170 @return - new tasklet info instance
171 """
172 ret = self.rwmain.new_tasklet_info(name, RWSOTestCase.id_cnt)
173
174 log = rift.tasklets.logger_from_tasklet_info(ret)
175 log.setLevel(logging.DEBUG)
176
177 stderr_handler = logging.StreamHandler(stream=sys.stderr)
178 fmt = logging.Formatter(
179 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
180 stderr_handler.setFormatter(fmt)
181 log.addHandler(stderr_handler)
182
183 return ret
184
185 def get_cloud_account_msg(self):
186 cloud_account = launchpadyang.CloudAccount()
187 cloud_account.name = "cloudy"
188 cloud_account.account_type = "mock"
189 cloud_account.mock.username = "rainy"
190 return cloud_account
191
192 def get_compute_pool_msg(self, name, pool_type):
193 pool_config = rmgryang.ResourcePools()
194 pool = pool_config.pools.add()
195 pool.name = name
196 pool.resource_type = "compute"
197 if pool_type == "static":
198 # Need to query CAL for resource
199 pass
200 else:
201 pool.max_size = 10
202 return pool_config
203
204 def get_network_pool_msg(self, name, pool_type):
205 pool_config = rmgryang.ResourcePools()
206 pool = pool_config.pools.add()
207 pool.name = name
208 pool.resource_type = "network"
209 if pool_type == "static":
210 # Need to query CAL for resource
211 pass
212 else:
213 pool.max_size = 4
214 return pool_config
215
216
217 def get_network_reserve_msg(self, xpath):
218 event_id = str(uuid.uuid4())
219 msg = rmgryang.VirtualLinkEventData()
220 msg.event_id = event_id
221 msg.request_info.name = "mynet"
222 msg.request_info.subnet = "1.1.1.0/24"
223 return msg, xpath.format(event_id)
224
225 def get_compute_reserve_msg(self,xpath):
226 event_id = str(uuid.uuid4())
227 msg = rmgryang.VDUEventData()
228 msg.event_id = event_id
229 msg.request_info.name = "mynet"
230 msg.request_info.image_id = "This is a image_id"
231 msg.request_info.vm_flavor.vcpu_count = 4
232 msg.request_info.vm_flavor.memory_mb = 8192*2
233 msg.request_info.vm_flavor.storage_gb = 40
234 c1 = msg.request_info.connection_points.add()
235 c1.name = "myport1"
236 c1.virtual_link_id = "This is a network_id"
237 return msg, xpath.format(event_id)
238
239 def test_create_resource_pools(self):
240 self.log.debug("STARTING - test_create_resource_pools")
241 tinfo = self.new_tinfo('poolconfig')
242 dts = rift.tasklets.DTS(tinfo, self.schema, self.loop)
243 pool_xpath = "C,/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools"
244 pool_records_xpath = "D,/rw-resource-mgr:resource-pool-records"
245 account_xpath = "C,/rw-launchpad:cloud-account"
246 compute_xpath = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id='{}']"
247 network_xpath = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id='{}']"
248
249 @asyncio.coroutine
250 def configure_cloud_account():
251 msg = self.get_cloud_account_msg()
252 self.log.info("Configuring cloud-account: %s",msg)
253 yield from dts.query_create(account_xpath,
254 rwdts.XactFlag.ADVISE,
255 msg)
256 yield from asyncio.sleep(3, loop=self.loop)
257
258 @asyncio.coroutine
259 def configure_compute_resource_pools():
260 msg = self.get_compute_pool_msg("virtual-compute", "dynamic")
261 self.log.info("Configuring compute-resource-pool: %s",msg)
262 yield from dts.query_create(pool_xpath,
263 rwdts.XactFlag.ADVISE,
264 msg)
265 yield from asyncio.sleep(3, loop=self.loop)
266
267
268 @asyncio.coroutine
269 def configure_network_resource_pools():
270 msg = self.get_network_pool_msg("virtual-network", "dynamic")
271 self.log.info("Configuring network-resource-pool: %s",msg)
272 yield from dts.query_create(pool_xpath,
273 rwdts.XactFlag.ADVISE,
274 msg)
275 yield from asyncio.sleep(3, loop=self.loop)
276
277
278 @asyncio.coroutine
279 def verify_resource_pools():
280 self.log.debug("Verifying test_create_resource_pools results")
281 res_iter = yield from dts.query_read(pool_records_xpath,)
282 for result in res_iter:
283 response = yield from result
284 records = response.result.records
285 #self.assertEqual(len(records), 2)
286 #names = [i.name for i in records]
287 #self.assertTrue('virtual-compute' in names)
288 #self.assertTrue('virtual-network' in names)
289 for record in records:
290 self.log.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d",
291 record.name,
292 record.resource_type,
293 record.pool_status,
294 record.max_size,
295 record.busy_resources)
296 @asyncio.coroutine
297 def reserve_network_resources():
298 msg,xpath = self.get_network_reserve_msg(network_xpath)
299 self.log.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath, msg))
300 yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
301 yield from asyncio.sleep(3, loop=self.loop)
302 yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
303
304 @asyncio.coroutine
305 def reserve_compute_resources():
306 msg,xpath = self.get_compute_reserve_msg(compute_xpath)
307 self.log.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath, msg))
308 yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
309 yield from asyncio.sleep(3, loop=self.loop)
310 yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
311
312 @asyncio.coroutine
313 def run_test():
314 yield from self.wait_tasklets()
315 yield from configure_cloud_account()
316 yield from configure_compute_resource_pools()
317 yield from configure_network_resource_pools()
318 yield from verify_resource_pools()
319 yield from reserve_network_resources()
320 yield from reserve_compute_resources()
321
322 future = asyncio.ensure_future(run_test(), loop=self.loop)
323 self.run_until(future.done)
324 if future.exception() is not None:
325 self.log.error("Caught exception during test")
326 raise future.exception()
327
328 self.log.debug("DONE - test_create_resource_pools")
329
330
331 def main():
332 plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins")
333
334 if 'MESSAGE_BROKER_DIR' not in os.environ:
335 os.environ['MESSAGE_BROKER_DIR'] = os.path.join(plugin_dir, 'rwmsgbroker-c')
336
337 if 'ROUTER_DIR' not in os.environ:
338 os.environ['ROUTER_DIR'] = os.path.join(plugin_dir, 'rwdtsrouter-c')
339
340 if 'SO_DIR' not in os.environ:
341 os.environ['SO_DIR'] = os.path.join(plugin_dir, 'rwconmantasklet')
342
343 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
344 unittest.main(testRunner=runner)
345
346 if __name__ == '__main__':
347 main()
348
349 # vim: sw=4