4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
30 import gi
.repository
.CF
as cf
31 import gi
.repository
.RwDts
as rwdts
32 import gi
.repository
.RwMain
as rwmain
33 import gi
.repository
.RwManifestYang
as rwmanifest
34 import gi
.repository
.RwConmanYang
as conmanY
35 import gi
.repository
.RwLaunchpadYang
as launchpadyang
39 if sys
.version_info
< (3, 4, 4):
40 asyncio
.ensure_future
= asyncio
.async
43 class RWSOTestCase(unittest
.TestCase
):
45 DTS GI interface unittests
47 Note: Each tests uses a list of asyncio.Events for staging through the
48 test. These are required here because we are bring up each coroutine
49 ("tasklet") at the same time and are not implementing any re-try
50 mechanisms. For instance, this is used in numerous tests to make sure that
51 a publisher is up and ready before the subscriber sends queries. Such
52 event lists should not be used in production software.
61 msgbroker_dir
= os
.environ
.get('MESSAGE_BROKER_DIR')
62 router_dir
= os
.environ
.get('ROUTER_DIR')
63 cm_dir
= os
.environ
.get('SO_DIR')
65 manifest
= rwmanifest
.Manifest()
66 manifest
.init_phase
.settings
.rwdtsrouter
.single_dtsrouter
.enable
= True
68 cls
.rwmain
= rwmain
.Gi
.new(manifest
)
69 cls
.tinfo
= cls
.rwmain
.get_tasklet_info()
71 # Run router in mainq. Eliminates some ill-diagnosed bootstrap races.
72 os
.environ
['RWDTS_ROUTER_MAINQ']='1'
73 cls
.rwmain
.add_tasklet(msgbroker_dir
, 'rwmsgbroker-c')
74 cls
.rwmain
.add_tasklet(router_dir
, 'rwdtsrouter-c')
75 cls
.rwmain
.add_tasklet(cm_dir
, 'rwconmantasklet')
77 cls
.log
= rift
.tasklets
.logger_from_tasklet_info(cls
.tinfo
)
78 cls
.log
.setLevel(logging
.DEBUG
)
80 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
81 fmt
= logging
.Formatter(
82 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
83 stderr_handler
.setFormatter(fmt
)
84 cls
.log
.addHandler(stderr_handler
)
85 cls
.schema
= conmanY
.get_schema()
88 def scheduler_tick(self
, *args
):
89 self
.call_soon(self
.stop
)
92 self
.loop
= asyncio
.new_event_loop()
93 self
.loop
.scheduler_tick
= types
.MethodType(scheduler_tick
, self
.loop
)
94 self
.loop
.set_debug(True)
95 os
.environ
["PYTHONASYNCIODEBUG"] = "1"
96 asyncio_logger
= logging
.getLogger("asyncio")
97 asyncio_logger
.setLevel(logging
.DEBUG
)
99 self
.asyncio_timer
= None
100 self
.stop_timer
= None
104 def wait_tasklets(self
):
105 yield from asyncio
.sleep(1, loop
=self
.loop
)
107 def run_until(self
, test_done
, timeout
=30):
109 Attach the current asyncio event loop to rwsched and then run the
110 scheduler until the test_done function returns True or timeout seconds
113 @param test_done - function which should return True once the test is
114 complete and the scheduler no longer needs to run.
115 @param timeout - maximum number of seconds to run the test.
119 self
.log
.debug('Shutting down loop due to timeout')
121 if self
.asyncio_timer
is not None:
122 self
.tinfo
.rwsched_tasklet
.CFRunLoopTimerRelease(self
.asyncio_timer
)
123 self
.asyncio_timer
= None
125 if self
.stop_timer
is not None:
126 self
.tinfo
.rwsched_tasklet
.CFRunLoopTimerRelease(self
.stop_timer
)
127 self
.stop_timer
= None
129 self
.tinfo
.rwsched_instance
.CFRunLoopStop()
132 self
.loop
.call_later(0.1, self
.loop
.stop
)
133 self
.loop
.run_forever()
137 self
.asyncio_timer
= self
.tinfo
.rwsched_tasklet
.CFRunLoopTimer(
138 cf
.CFAbsoluteTimeGetCurrent(),
143 self
.stop_timer
= self
.tinfo
.rwsched_tasklet
.CFRunLoopTimer(
144 cf
.CFAbsoluteTimeGetCurrent() + timeout
,
149 self
.tinfo
.rwsched_tasklet
.CFRunLoopAddTimer(
150 self
.tinfo
.rwsched_tasklet
.CFRunLoopGetCurrent(),
152 self
.tinfo
.rwsched_instance
.CFRunLoopGetMainMode())
154 self
.tinfo
.rwsched_tasklet
.CFRunLoopAddTimer(
155 self
.tinfo
.rwsched_tasklet
.CFRunLoopGetCurrent(),
157 self
.tinfo
.rwsched_instance
.CFRunLoopGetMainMode())
159 self
.tinfo
.rwsched_instance
.CFRunLoopRun()
161 self
.assertTrue(test_done())
163 def new_tinfo(self
, name
):
165 Create a new tasklet info instance with a unique instance_id per test.
166 It is up to each test to use unique names if more that one tasklet info
169 @param name - name of the "tasklet"
170 @return - new tasklet info instance
172 ret
= self
.rwmain
.new_tasklet_info(name
, RWSOTestCase
.id_cnt
)
174 log
= rift
.tasklets
.logger_from_tasklet_info(ret
)
175 log
.setLevel(logging
.DEBUG
)
177 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
178 fmt
= logging
.Formatter(
179 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
180 stderr_handler
.setFormatter(fmt
)
181 log
.addHandler(stderr_handler
)
185 def get_cloud_account_msg(self
):
186 cloud_account
= launchpadyang
.CloudAccount()
187 cloud_account
.name
= "cloudy"
188 cloud_account
.account_type
= "mock"
189 cloud_account
.mock
.username
= "rainy"
192 def get_compute_pool_msg(self
, name
, pool_type
):
193 pool_config
= rmgryang
.ResourcePools()
194 pool
= pool_config
.pools
.add()
196 pool
.resource_type
= "compute"
197 if pool_type
== "static":
198 # Need to query CAL for resource
204 def get_network_pool_msg(self
, name
, pool_type
):
205 pool_config
= rmgryang
.ResourcePools()
206 pool
= pool_config
.pools
.add()
208 pool
.resource_type
= "network"
209 if pool_type
== "static":
210 # Need to query CAL for resource
217 def get_network_reserve_msg(self
, xpath
):
218 event_id
= str(uuid
.uuid4())
219 msg
= rmgryang
.VirtualLinkEventData()
220 msg
.event_id
= event_id
221 msg
.request_info
.name
= "mynet"
222 msg
.request_info
.subnet
= "1.1.1.0/24"
223 return msg
, xpath
.format(event_id
)
225 def get_compute_reserve_msg(self
,xpath
):
226 event_id
= str(uuid
.uuid4())
227 msg
= rmgryang
.VDUEventData()
228 msg
.event_id
= event_id
229 msg
.request_info
.name
= "mynet"
230 msg
.request_info
.image_id
= "This is a image_id"
231 msg
.request_info
.vm_flavor
.vcpu_count
= 4
232 msg
.request_info
.vm_flavor
.memory_mb
= 8192*2
233 msg
.request_info
.vm_flavor
.storage_gb
= 40
234 c1
= msg
.request_info
.connection_points
.add()
236 c1
.virtual_link_id
= "This is a network_id"
237 return msg
, xpath
.format(event_id
)
239 def test_create_resource_pools(self
):
240 self
.log
.debug("STARTING - test_create_resource_pools")
241 tinfo
= self
.new_tinfo('poolconfig')
242 dts
= rift
.tasklets
.DTS(tinfo
, self
.schema
, self
.loop
)
243 pool_xpath
= "C,/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools"
244 pool_records_xpath
= "D,/rw-resource-mgr:resource-pool-records"
245 account_xpath
= "C,/rw-launchpad:cloud-account"
246 compute_xpath
= "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id='{}']"
247 network_xpath
= "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id='{}']"
250 def configure_cloud_account():
251 msg
= self
.get_cloud_account_msg()
252 self
.log
.info("Configuring cloud-account: %s",msg
)
253 yield from dts
.query_create(account_xpath
,
254 rwdts
.XactFlag
.ADVISE
,
256 yield from asyncio
.sleep(3, loop
=self
.loop
)
259 def configure_compute_resource_pools():
260 msg
= self
.get_compute_pool_msg("virtual-compute", "dynamic")
261 self
.log
.info("Configuring compute-resource-pool: %s",msg
)
262 yield from dts
.query_create(pool_xpath
,
263 rwdts
.XactFlag
.ADVISE
,
265 yield from asyncio
.sleep(3, loop
=self
.loop
)
269 def configure_network_resource_pools():
270 msg
= self
.get_network_pool_msg("virtual-network", "dynamic")
271 self
.log
.info("Configuring network-resource-pool: %s",msg
)
272 yield from dts
.query_create(pool_xpath
,
273 rwdts
.XactFlag
.ADVISE
,
275 yield from asyncio
.sleep(3, loop
=self
.loop
)
279 def verify_resource_pools():
280 self
.log
.debug("Verifying test_create_resource_pools results")
281 res_iter
= yield from dts
.query_read(pool_records_xpath
,)
282 for result
in res_iter
:
283 response
= yield from result
284 records
= response
.result
.records
285 #self.assertEqual(len(records), 2)
286 #names = [i.name for i in records]
287 #self.assertTrue('virtual-compute' in names)
288 #self.assertTrue('virtual-network' in names)
289 for record
in records
:
290 self
.log
.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d",
292 record
.resource_type
,
295 record
.busy_resources
)
297 def reserve_network_resources():
298 msg
,xpath
= self
.get_network_reserve_msg(network_xpath
)
299 self
.log
.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath
, msg
))
300 yield from dts
.query_create(xpath
, rwdts
.XactFlag
.TRACE
, msg
)
301 yield from asyncio
.sleep(3, loop
=self
.loop
)
302 yield from dts
.query_delete(xpath
, rwdts
.XactFlag
.TRACE
)
305 def reserve_compute_resources():
306 msg
,xpath
= self
.get_compute_reserve_msg(compute_xpath
)
307 self
.log
.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath
, msg
))
308 yield from dts
.query_create(xpath
, rwdts
.XactFlag
.TRACE
, msg
)
309 yield from asyncio
.sleep(3, loop
=self
.loop
)
310 yield from dts
.query_delete(xpath
, rwdts
.XactFlag
.TRACE
)
314 yield from self
.wait_tasklets()
315 yield from configure_cloud_account()
316 yield from configure_compute_resource_pools()
317 yield from configure_network_resource_pools()
318 yield from verify_resource_pools()
319 yield from reserve_network_resources()
320 yield from reserve_compute_resources()
322 future
= asyncio
.ensure_future(run_test(), loop
=self
.loop
)
323 self
.run_until(future
.done
)
324 if future
.exception() is not None:
325 self
.log
.error("Caught exception during test")
326 raise future
.exception()
328 self
.log
.debug("DONE - test_create_resource_pools")
332 plugin_dir
= os
.path
.join(os
.environ
["RIFT_INSTALL"], "usr/lib/rift/plugins")
334 if 'MESSAGE_BROKER_DIR' not in os
.environ
:
335 os
.environ
['MESSAGE_BROKER_DIR'] = os
.path
.join(plugin_dir
, 'rwmsgbroker-c')
337 if 'ROUTER_DIR' not in os
.environ
:
338 os
.environ
['ROUTER_DIR'] = os
.path
.join(plugin_dir
, 'rwdtsrouter-c')
340 if 'SO_DIR' not in os
.environ
:
341 os
.environ
['SO_DIR'] = os
.path
.join(plugin_dir
, 'rwconmantasklet')
343 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
344 unittest
.main(testRunner
=runner
)
346 if __name__
== '__main__':