4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
30 import gi
.repository
.CF
as cf
31 import gi
.repository
.RwDts
as rwdts
32 import gi
.repository
.RwMain
as rwmain
33 import gi
.repository
.RwManifestYang
as rwmanifest
34 import gi
.repository
.RwConmanYang
as conmanY
35 import gi
.repository
.RwLaunchpadYang
as launchpadyang
36 gi
.require_version('RwKeyspec', '1.0')
37 from gi
.repository
.RwKeyspec
import quoted_key
41 if sys
.version_info
< (3, 4, 4):
42 asyncio
.ensure_future
= asyncio
.async
45 class RWSOTestCase(unittest
.TestCase
):
47 DTS GI interface unittests
49 Note: Each tests uses a list of asyncio.Events for staging through the
50 test. These are required here because we are bring up each coroutine
51 ("tasklet") at the same time and are not implementing any re-try
52 mechanisms. For instance, this is used in numerous tests to make sure that
53 a publisher is up and ready before the subscriber sends queries. Such
54 event lists should not be used in production software.
63 msgbroker_dir
= os
.environ
.get('MESSAGE_BROKER_DIR')
64 router_dir
= os
.environ
.get('ROUTER_DIR')
65 cm_dir
= os
.environ
.get('SO_DIR')
67 manifest
= rwmanifest
.Manifest()
68 manifest
.init_phase
.settings
.rwdtsrouter
.single_dtsrouter
.enable
= True
70 cls
.rwmain
= rwmain
.Gi
.new(manifest
)
71 cls
.tinfo
= cls
.rwmain
.get_tasklet_info()
73 # Run router in mainq. Eliminates some ill-diagnosed bootstrap races.
74 os
.environ
['RWDTS_ROUTER_MAINQ']='1'
75 cls
.rwmain
.add_tasklet(msgbroker_dir
, 'rwmsgbroker-c')
76 cls
.rwmain
.add_tasklet(router_dir
, 'rwdtsrouter-c')
77 cls
.rwmain
.add_tasklet(cm_dir
, 'rwconmantasklet')
79 cls
.log
= rift
.tasklets
.logger_from_tasklet_info(cls
.tinfo
)
80 cls
.log
.setLevel(logging
.DEBUG
)
82 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
83 fmt
= logging
.Formatter(
84 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
85 stderr_handler
.setFormatter(fmt
)
86 cls
.log
.addHandler(stderr_handler
)
87 cls
.schema
= conmanY
.get_schema()
90 def scheduler_tick(self
, *args
):
91 self
.call_soon(self
.stop
)
94 self
.loop
= asyncio
.new_event_loop()
95 self
.loop
.scheduler_tick
= types
.MethodType(scheduler_tick
, self
.loop
)
96 self
.loop
.set_debug(True)
97 os
.environ
["PYTHONASYNCIODEBUG"] = "1"
98 asyncio_logger
= logging
.getLogger("asyncio")
99 asyncio_logger
.setLevel(logging
.DEBUG
)
101 self
.asyncio_timer
= None
102 self
.stop_timer
= None
106 def wait_tasklets(self
):
107 yield from asyncio
.sleep(1, loop
=self
.loop
)
109 def run_until(self
, test_done
, timeout
=30):
111 Attach the current asyncio event loop to rwsched and then run the
112 scheduler until the test_done function returns True or timeout seconds
115 @param test_done - function which should return True once the test is
116 complete and the scheduler no longer needs to run.
117 @param timeout - maximum number of seconds to run the test.
121 self
.log
.debug('Shutting down loop due to timeout')
123 if self
.asyncio_timer
is not None:
124 self
.tinfo
.rwsched_tasklet
.CFRunLoopTimerRelease(self
.asyncio_timer
)
125 self
.asyncio_timer
= None
127 if self
.stop_timer
is not None:
128 self
.tinfo
.rwsched_tasklet
.CFRunLoopTimerRelease(self
.stop_timer
)
129 self
.stop_timer
= None
131 self
.tinfo
.rwsched_instance
.CFRunLoopStop()
134 self
.loop
.call_later(0.1, self
.loop
.stop
)
135 self
.loop
.run_forever()
139 self
.asyncio_timer
= self
.tinfo
.rwsched_tasklet
.CFRunLoopTimer(
140 cf
.CFAbsoluteTimeGetCurrent(),
145 self
.stop_timer
= self
.tinfo
.rwsched_tasklet
.CFRunLoopTimer(
146 cf
.CFAbsoluteTimeGetCurrent() + timeout
,
151 self
.tinfo
.rwsched_tasklet
.CFRunLoopAddTimer(
152 self
.tinfo
.rwsched_tasklet
.CFRunLoopGetCurrent(),
154 self
.tinfo
.rwsched_instance
.CFRunLoopGetMainMode())
156 self
.tinfo
.rwsched_tasklet
.CFRunLoopAddTimer(
157 self
.tinfo
.rwsched_tasklet
.CFRunLoopGetCurrent(),
159 self
.tinfo
.rwsched_instance
.CFRunLoopGetMainMode())
161 self
.tinfo
.rwsched_instance
.CFRunLoopRun()
163 self
.assertTrue(test_done())
165 def new_tinfo(self
, name
):
167 Create a new tasklet info instance with a unique instance_id per test.
168 It is up to each test to use unique names if more that one tasklet info
171 @param name - name of the "tasklet"
172 @return - new tasklet info instance
174 ret
= self
.rwmain
.new_tasklet_info(name
, RWSOTestCase
.id_cnt
)
176 log
= rift
.tasklets
.logger_from_tasklet_info(ret
)
177 log
.setLevel(logging
.DEBUG
)
179 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
180 fmt
= logging
.Formatter(
181 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:%(filename)s:%(lineno)d) - %(message)s')
182 stderr_handler
.setFormatter(fmt
)
183 log
.addHandler(stderr_handler
)
187 def get_cloud_account_msg(self
):
188 cloud_account
= launchpadyang
.YangData_RwProject_Project_CloudAccounts_CloudAccountList()
189 cloud_account
.name
= "cloudy"
190 cloud_account
.account_type
= "mock"
191 cloud_account
.mock
.username
= "rainy"
194 def get_compute_pool_msg(self
, name
, pool_type
):
195 pool_config
= rmgryang
.YangData_RwProject_Project_ResourceMgrConfig_ResourcePools()
196 pool
= pool_config
.pools
.add()
198 pool
.resource_type
= "compute"
199 if pool_type
== "static":
200 # Need to query CAL for resource
206 def get_network_pool_msg(self
, name
, pool_type
):
207 pool_config
= rmgryang
.YangData_RwProject_Project_ResourceMgrConfig_ResourcePools()
208 pool
= pool_config
.pools
.add()
210 pool
.resource_type
= "network"
211 if pool_type
== "static":
212 # Need to query CAL for resource
219 def get_network_reserve_msg(self
, xpath
):
220 event_id
= str(uuid
.uuid4())
221 msg
= rmgryang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData()
222 msg
.event_id
= event_id
223 msg
.request_info
.name
= "mynet"
224 msg
.request_info
.subnet
= "1.1.1.0/24"
225 return msg
, xpath
.format(quoted_key(event_id
))
227 def get_compute_reserve_msg(self
,xpath
):
228 event_id
= str(uuid
.uuid4())
229 msg
= rmgryang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData()
230 msg
.event_id
= event_id
231 msg
.request_info
.name
= "mynet"
232 msg
.request_info
.image_id
= "This is a image_id"
233 msg
.request_info
.vm_flavor
.vcpu_count
= 4
234 msg
.request_info
.vm_flavor
.memory_mb
= 8192*2
235 msg
.request_info
.vm_flavor
.storage_gb
= 40
236 c1
= msg
.request_info
.connection_points
.add()
238 c1
.virtual_link_id
= "This is a network_id"
239 return msg
, xpath
.format(quoted_key(event_id
))
241 def test_create_resource_pools(self
):
242 self
.log
.debug("STARTING - test_create_resource_pools")
243 tinfo
= self
.new_tinfo('poolconfig')
244 dts
= rift
.tasklets
.DTS(tinfo
, self
.schema
, self
.loop
)
245 pool_xpath
= "C,/rw-project:project/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools"
246 pool_records_xpath
= "D,/rw-project:project/rw-resource-mgr:resource-pool-records"
247 account_xpath
= "C,/rw-launchpad:cloud-account"
248 compute_xpath
= "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data[event-id={}]"
249 network_xpath
= "D,/rw-project:project/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data[event-id={}]"
252 def configure_cloud_account():
253 msg
= self
.get_cloud_account_msg()
254 self
.log
.info("Configuring cloud-account: %s",msg
)
255 yield from dts
.query_create(account_xpath
,
256 rwdts
.XactFlag
.ADVISE
,
258 yield from asyncio
.sleep(3, loop
=self
.loop
)
261 def configure_compute_resource_pools():
262 msg
= self
.get_compute_pool_msg("virtual-compute", "dynamic")
263 self
.log
.info("Configuring compute-resource-pool: %s",msg
)
264 yield from dts
.query_create(pool_xpath
,
265 rwdts
.XactFlag
.ADVISE
,
267 yield from asyncio
.sleep(3, loop
=self
.loop
)
271 def configure_network_resource_pools():
272 msg
= self
.get_network_pool_msg("virtual-network", "dynamic")
273 self
.log
.info("Configuring network-resource-pool: %s",msg
)
274 yield from dts
.query_create(pool_xpath
,
275 rwdts
.XactFlag
.ADVISE
,
277 yield from asyncio
.sleep(3, loop
=self
.loop
)
281 def verify_resource_pools():
282 self
.log
.debug("Verifying test_create_resource_pools results")
283 res_iter
= yield from dts
.query_read(pool_records_xpath
,)
284 for result
in res_iter
:
285 response
= yield from result
286 records
= response
.result
.records
287 #self.assertEqual(len(records), 2)
288 #names = [i.name for i in records]
289 #self.assertTrue('virtual-compute' in names)
290 #self.assertTrue('virtual-network' in names)
291 for record
in records
:
292 self
.log
.debug("Received Pool Record, Name: %s, Resource Type: %s, Pool Status: %s, Pool Size: %d, Busy Resources: %d",
294 record
.resource_type
,
297 record
.busy_resources
)
299 def reserve_network_resources():
300 msg
,xpath
= self
.get_network_reserve_msg(network_xpath
)
301 self
.log
.debug("Sending create event to network-event xpath %s with msg: %s" % (xpath
, msg
))
302 yield from dts
.query_create(xpath
, rwdts
.XactFlag
.TRACE
, msg
)
303 yield from asyncio
.sleep(3, loop
=self
.loop
)
304 yield from dts
.query_delete(xpath
, rwdts
.XactFlag
.TRACE
)
307 def reserve_compute_resources():
308 msg
,xpath
= self
.get_compute_reserve_msg(compute_xpath
)
309 self
.log
.debug("Sending create event to compute-event xpath %s with msg: %s" % (xpath
, msg
))
310 yield from dts
.query_create(xpath
, rwdts
.XactFlag
.TRACE
, msg
)
311 yield from asyncio
.sleep(3, loop
=self
.loop
)
312 yield from dts
.query_delete(xpath
, rwdts
.XactFlag
.TRACE
)
316 yield from self
.wait_tasklets()
317 yield from configure_cloud_account()
318 yield from configure_compute_resource_pools()
319 yield from configure_network_resource_pools()
320 yield from verify_resource_pools()
321 yield from reserve_network_resources()
322 yield from reserve_compute_resources()
324 future
= asyncio
.ensure_future(run_test(), loop
=self
.loop
)
325 self
.run_until(future
.done
)
326 if future
.exception() is not None:
327 self
.log
.error("Caught exception during test")
328 raise future
.exception()
330 self
.log
.debug("DONE - test_create_resource_pools")
334 plugin_dir
= os
.path
.join(os
.environ
["RIFT_INSTALL"], "usr/lib/rift/plugins")
336 if 'MESSAGE_BROKER_DIR' not in os
.environ
:
337 os
.environ
['MESSAGE_BROKER_DIR'] = os
.path
.join(plugin_dir
, 'rwmsgbroker-c')
339 if 'ROUTER_DIR' not in os
.environ
:
340 os
.environ
['ROUTER_DIR'] = os
.path
.join(plugin_dir
, 'rwdtsrouter-c')
342 if 'SO_DIR' not in os
.environ
:
343 os
.environ
['SO_DIR'] = os
.path
.join(plugin_dir
, 'rwconmantasklet')
345 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
346 unittest
.main(testRunner
=runner
)
348 if __name__
== '__main__':