update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwcm / test / rwso_test.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import asyncio
21 import gi
22 import logging
23 import os
24 import sys
25 import types
26 import unittest
27 import uuid
28 import xmlrunner
29
30 import gi.repository.CF as cf
31 import gi.repository.RwDts as rwdts
32 import gi.repository.RwMain as rwmain
33 import gi.repository.RwManifestYang as rwmanifest
34 import gi.repository.RwConmanYang as conmanY
35 import gi.repository.RwLaunchpadYang as launchpadyang
36 gi.require_version('RwKeyspec', '1.0')
37 from gi.repository.RwKeyspec import quoted_key
38
39 import rift.tasklets
40
41 if sys.version_info < (3, 4, 4):
42 asyncio.ensure_future = asyncio.async
43
44
45 class RWSOTestCase(unittest.TestCase):
46 """
47 DTS GI interface unittests
48
49 Note: Each tests uses a list of asyncio.Events for staging through the
50 test. These are required here because we are bring up each coroutine
51 ("tasklet") at the same time and are not implementing any re-try
52 mechanisms. For instance, this is used in numerous tests to make sure that
53 a publisher is up and ready before the subscriber sends queries. Such
54 event lists should not be used in production software.
55 """
56 rwmain = None
57 tinfo = None
58 schema = None
59 id_cnt = 0
60
61 @classmethod
62 def setUpClass(cls):
63 msgbroker_dir = os.environ.get('MESSAGE_BROKER_DIR')
64 router_dir = os.environ.get('ROUTER_DIR')
65 cm_dir = os.environ.get('SO_DIR')
66
67 manifest = rwmanifest.Manifest()
68 manifest.init_phase.settings.rwdtsrouter.single_dtsrouter.enable = True
69
70 cls.rwmain = rwmain.Gi.new(manifest)
71 cls.tinfo = cls.rwmain.get_tasklet_info()
72
73 # Run router in mainq. Eliminates some ill-diagnosed bootstrap races.
74 os.environ['RWDTS_ROUTER_MAINQ']='1'
75 cls.rwmain.add_tasklet(msgbroker_dir, 'rwmsgbroker-c')
76 cls.rwmain.add_tasklet(router_dir, 'rwdtsrouter-c')
77 cls.rwmain.add_tasklet(cm_dir, 'rwconmantasklet')
78
79 cls.log = rift.tasklets.logger_from_tasklet_info(cls.tinfo)
80 cls.log.setLevel(logging.DEBUG)
81
82 stderr_handler = logging.StreamHandler(stream=sys.stderr)
83 fmt = logging.Formatter(
84 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
85 stderr_handler.setFormatter(fmt)
86 cls.log.addHandler(stderr_handler)
87 cls.schema = conmanY.get_schema()
88
89 def setUp(self):
90 def scheduler_tick(self, *args):
91 self.call_soon(self.stop)
92 self.run_forever()
93
94 self.loop = asyncio.new_event_loop()
95 self.loop.scheduler_tick = types.MethodType(scheduler_tick, self.loop)
96 self.loop.set_debug(True)
97 os.environ["PYTHONASYNCIODEBUG"] = "1"
98 asyncio_logger = logging.getLogger("asyncio")
99 asyncio_logger.setLevel(logging.DEBUG)
100
101 self.asyncio_timer = None
102 self.stop_timer = None
103 self.id_cnt += 1
104
105 @asyncio.coroutine
106 def wait_tasklets(self):
107 yield from asyncio.sleep(1, loop=self.loop)
108
109 def run_until(self, test_done, timeout=30):
110 """
111 Attach the current asyncio event loop to rwsched and then run the
112 scheduler until the test_done function returns True or timeout seconds
113 pass.
114
115 @param test_done - function which should return True once the test is
116 complete and the scheduler no longer needs to run.
117 @param timeout - maximum number of seconds to run the test.
118 """
119 def shutdown(*args):
120 if args:
121 self.log.debug('Shutting down loop due to timeout')
122
123 if self.asyncio_timer is not None:
124 self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.asyncio_timer)
125 self.asyncio_timer = None
126
127 if self.stop_timer is not None:
128 self.tinfo.rwsched_tasklet.CFRunLoopTimerRelease(self.stop_timer)
129 self.stop_timer = None
130
131 self.tinfo.rwsched_instance.CFRunLoopStop()
132
133 def tick(*args):
134 self.loop.call_later(0.1, self.loop.stop)
135 self.loop.run_forever()
136 if test_done():
137 shutdown()
138
139 self.asyncio_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
140 cf.CFAbsoluteTimeGetCurrent(),
141 0.1,
142 tick,
143 None)
144
145 self.stop_timer = self.tinfo.rwsched_tasklet.CFRunLoopTimer(
146 cf.CFAbsoluteTimeGetCurrent() + timeout,
147 0,
148 shutdown,
149 None)
150
151 self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
152 self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
153 self.stop_timer,
154 self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
155
156 self.tinfo.rwsched_tasklet.CFRunLoopAddTimer(
157 self.tinfo.rwsched_tasklet.CFRunLoopGetCurrent(),
158 self.asyncio_timer,
159 self.tinfo.rwsched_instance.CFRunLoopGetMainMode())
160
161 self.tinfo.rwsched_instance.CFRunLoopRun()
162
163 self.assertTrue(test_done())
164
165 def new_tinfo(self, name):
166 """
167 Create a new tasklet info instance with a unique instance_id per test.
168 It is up to each test to use unique names if more that one tasklet info
169 instance is needed.
170
171 @param name - name of the "tasklet"
172 @return - new tasklet info instance
173 """
174 ret = self.rwmain.new_tasklet_info(name, RWSOTestCase.id_cnt)
175
176 log = rift.tasklets.logger_from_tasklet_info(ret)
177 log.setLevel(logging.DEBUG)
178
179 stderr_handler = logging.StreamHandler(stream=sys.stderr)
180 fmt = logging.Formatter(
181 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
182 stderr_handler.setFormatter(fmt)
183 log.addHandler(stderr_handler)
184
185 return ret
186
187 def get_cloud_account_msg(self):
188 cloud_account = launchpadyang.YangData_RwProject_Project_CloudAccounts_CloudAccountList()
189 cloud_account.name = "cloudy"
190 cloud_account.account_type = "mock"
191 cloud_account.mock.username = "rainy"
192 return cloud_account
193
194 def get_compute_pool_msg(self, name, pool_type):
195 pool_config = rmgryang.YangData_RwProject_Project_ResourceMgrConfig_ResourcePools()
196 pool = pool_config.pools.add()
197 pool.name = name
198 pool.resource_type = "compute"
199 if pool_type == "static":
200 # Need to query CAL for resource
201 pass
202 else:
203 pool.max_size = 10
204 return pool_config
205
206 def get_network_pool_msg(self, name, pool_type):
207 pool_config = rmgryang.YangData_RwProject_Project_ResourceMgrConfig_ResourcePools()
208 pool = pool_config.pools.add()
209 pool.name = name
210 pool.resource_type = "network"
211 if pool_type == "static":
212 # Need to query CAL for resource
213 pass
214 else:
215 pool.max_size = 4
216 return pool_config
217
218
219 def get_network_reserve_msg(self, xpath):
220 event_id = str(uuid.uuid4())
221 msg = rmgryang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData()
222 msg.event_id = event_id
223 msg.request_info.name = "mynet"
224 msg.request_info.subnet = "1.1.1.0/24"
225 return msg, xpath.format(quoted_key(event_id))
226
227 def get_compute_reserve_msg(self,xpath):
228 event_id = str(uuid.uuid4())
229 msg = rmgryang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData()
230 msg.event_id = event_id
231 msg.request_info.name = "mynet"
232 msg.request_info.image_id = "This is a image_id"
233 msg.request_info.vm_flavor.vcpu_count = 4
234 msg.request_info.vm_flavor.memory_mb = 8192*2
235 msg.request_info.vm_flavor.storage_gb = 40
236 c1 = msg.request_info.connection_points.add()
237 c1.name = "myport1"
238 c1.virtual_link_id = "This is a network_id"
239 return msg, xpath.format(quoted_key(event_id))
240
241 def test_create_resource_pools(self):
242 self.log.debug("STARTING - test_create_resource_pools")
243 tinfo = self.new_tinfo('poolconfig')
244 dts = rift.tasklets.DTS(tinfo, self.schema, self.loop)
245 pool_xpath = "C,/rw-project:project/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools"
246 pool_records_xpath = "D,/rw-project:project/rw-resource-mgr:resource-pool-records"
247 account_xpath = "C,/rw-launchpad:cloud-account"
248 compute_xpath = "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id={}]"
249 network_xpath = "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id={}]"
250
251 @asyncio.coroutine
252 def configure_cloud_account():
253 msg = self.get_cloud_account_msg()
254 self.log.info("Configuring cloud-account: %s",msg)
255 yield from dts.query_create(account_xpath,
256 rwdts.XactFlag.ADVISE,
257 msg)
258 yield from asyncio.sleep(3, loop=self.loop)
259
260 @asyncio.coroutine
261 def configure_compute_resource_pools():
262 msg = self.get_compute_pool_msg("virtual-compute", "dynamic")
263 self.log.info("Configuring compute-resource-pool: %s",msg)
264 yield from dts.query_create(pool_xpath,
265 rwdts.XactFlag.ADVISE,
266 msg)
267 yield from asyncio.sleep(3, loop=self.loop)
268
269
270 @asyncio.coroutine
271 def configure_network_resource_pools():
272 msg = self.get_network_pool_msg("virtual-network", "dynamic")
273 self.log.info("Configuring network-resource-pool: %s",msg)
274 yield from dts.query_create(pool_xpath,
275 rwdts.XactFlag.ADVISE,
276 msg)
277 yield from asyncio.sleep(3, loop=self.loop)
278
279
280 @asyncio.coroutine
281 def verify_resource_pools():
282 self.log.debug("Verifying test_create_resource_pools results")
283 res_iter = yield from dts.query_read(pool_records_xpath,)
284 for result in res_iter:
285 response = yield from result
286 records = response.result.records
287 #self.assertEqual(len(records), 2)
288 #names = [i.name for i in records]
289 #self.assertTrue('virtual-compute' in names)
290 #self.assertTrue('virtual-network' in names)
291 for record in records:
292 self.log.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d",
293 record.name,
294 record.resource_type,
295 record.pool_status,
296 record.max_size,
297 record.busy_resources)
298 @asyncio.coroutine
299 def reserve_network_resources():
300 msg,xpath = self.get_network_reserve_msg(network_xpath)
301 self.log.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath, msg))
302 yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
303 yield from asyncio.sleep(3, loop=self.loop)
304 yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
305
306 @asyncio.coroutine
307 def reserve_compute_resources():
308 msg,xpath = self.get_compute_reserve_msg(compute_xpath)
309 self.log.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath, msg))
310 yield from dts.query_create(xpath, rwdts.XactFlag.TRACE, msg)
311 yield from asyncio.sleep(3, loop=self.loop)
312 yield from dts.query_delete(xpath, rwdts.XactFlag.TRACE)
313
314 @asyncio.coroutine
315 def run_test():
316 yield from self.wait_tasklets()
317 yield from configure_cloud_account()
318 yield from configure_compute_resource_pools()
319 yield from configure_network_resource_pools()
320 yield from verify_resource_pools()
321 yield from reserve_network_resources()
322 yield from reserve_compute_resources()
323
324 future = asyncio.ensure_future(run_test(), loop=self.loop)
325 self.run_until(future.done)
326 if future.exception() is not None:
327 self.log.error("Caught exception during test")
328 raise future.exception()
329
330 self.log.debug("DONE - test_create_resource_pools")
331
332
333 def main():
334 plugin_dir = os.path.join(os.environ["RIFT_INSTALL"], "usr/lib/rift/plugins")
335
336 if 'MESSAGE_BROKER_DIR' not in os.environ:
337 os.environ['MESSAGE_BROKER_DIR'] = os.path.join(plugin_dir, 'rwmsgbroker-c')
338
339 if 'ROUTER_DIR' not in os.environ:
340 os.environ['ROUTER_DIR'] = os.path.join(plugin_dir, 'rwdtsrouter-c')
341
342 if 'SO_DIR' not in os.environ:
343 os.environ['SO_DIR'] = os.path.join(plugin_dir, 'rwconmantasklet')
344
345 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
346 unittest.main(testRunner=runner)
347
348 if __name__ == '__main__':
349 main()
350
351 # vim: sw=4