4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
19 @file test_launchpad.py
20 @author Paul Laidler (Paul.Laidler@riftio.com)
22 @brief High-availibility system test that runs ping pong workflow
36 from contextlib
import contextmanager
39 import rift
.auto
.session
40 import rift
.auto
.descriptor
42 gi
.require_version('RwVnfrYang', '1.0')
43 from gi
.repository
import (
52 gi
.require_version('RwKeyspec', '1.0')
53 from gi
.repository
.RwKeyspec
import quoted_key
55 logging
.basicConfig(level
=logging
.DEBUG
)
56 logger
= logging
.getLogger(__name__
)
58 @pytest.mark
.setup('seed_random')
60 def test_seed_random(self
, random_seed
):
61 logger
.info("Seeding number generator with seed {}".format(random_seed
))
62 random
.seed(random_seed
)
64 class MaxRetriesExceededException(Exception):
65 '''Indicates the maximum allowed number of retries has been exceeded for an operation
69 class HAVerifyException(Exception):
70 '''Indicates a failure to verify correct HA behaviour
76 ''' Wrapper around management session, which kills off system components
77 in order to trigger HA functionality
84 DEFAULT_RECOVERY_TIMEOUT
=120
86 def __init__(self
, session
):
87 ''' Create a new HASession instance
92 self
.session
= session
96 def config(self
, *args
, **kwargs
):
97 ''' Context manager to allow HASession to temporarily have its config modified
99 current_config
= self
.get_config()
100 self
.set_config(*args
, **kwargs
)
102 self
.set_config(*current_config
)
104 def get_config(self
):
105 ''' Returns the current HA session config
107 return (self
.attempts
, self
.min_delay
, self
.max_delay
, self
.ha_frequency
, self
.recovery_timeout
)
109 def set_config(self
, attempts
=None, min_delay
=None, max_delay
=None, ha_frequency
=None, recovery_timeout
=None):
110 ''' Set the HA session config, set default values for all config options not provided
113 attempts - Number of times to attempt an operation before failing
114 min_delay - minimum time that must elapse before session is allowed to kill a component
115 max_delay - maximum time that may elapse before killing a component
116 ha_frequency - frequency at which operations are tested for ha
117 recovery_timeout - time allowed for system to recovery after a component is killed
120 attempts
= HASession
.DEFAULT_ATTEMPTS
122 min_delay
= HASession
.DEFAULT_MIN_DELAY
124 max_delay
= HASession
.DEFAULT_MAX_DELAY
126 ha_frequency
= HASession
.DEFAULT_FREQUENCY
127 if not recovery_timeout
:
128 recovery_timeout
= HASession
.DEFAULT_RECOVERY_TIMEOUT
130 self
.attempts
= attempts
131 self
.min_delay
= min_delay
132 self
.max_delay
= max_delay
133 self
.ha_frequency
= ha_frequency
134 self
.recovery_timeout
= recovery_timeout
136 def call(self
, operation
, *args
, **kwargs
):
137 ''' Call an operation using the wrapped management session, then
138 kill off a system component, and verify the operation still succeeds
141 operation - operation to be invoked
143 # Choose to make the normal session call or do the HA test
144 if random
.choice(range(0,int(1/self
.ha_frequency
))) != 0:
145 return operation(*args
, **kwargs
)
147 # Make sure we're starting from a running system
148 rift
.vcs
.vcs
.wait_until_system_started(self
.session
)
150 def choose_any_tasklet(vcs_info
):
151 tasklets
= [component_info
.component_name
for component_info
in vcs_info
.components
.component_info
]
152 return random
.choice(tasklets
)
154 def choose_restartable_tasklet(vcs_info
):
155 restartable_tasklets
= [
156 component_info
.component_name
157 for component_info
in vcs_info
.components
.component_info
158 if component_info
.recovery_action
== 'RESTART'
159 and component_info
.component_type
== 'RWTASKLET'
161 return random
.choice(restartable_tasklets
)
163 vcs_info
= self
.session
.proxy(RwBaseYang
).get('/vcs/info')
164 component_name
= choose_restartable_tasklet(vcs_info
)
166 ssh_cmd
= 'ssh {} -o StrictHostKeyChecking=no -o BatchMode=yes'.format(self
.session
.host
)
167 def get_component_process_pid(component_name
):
168 cmd
= '{} -- \'ps -ef | grep -v "grep" | grep rwmain | grep "{}" | tr -s " " | cut -d " " -f 2\''.format(ssh_cmd
, component_name
)
169 logger
.info("Finding component [{}] pid using cmd: {}".format(component_name
, cmd
))
170 output
= subprocess
.check_output(cmd
, shell
=True)
171 return output
.decode('ascii').strip()
172 process_pid
= get_component_process_pid(component_name
)
173 logger
.info('{} has pid {}'.format(component_name
, process_pid
))
175 # Kick off a background process to kill the tasklet after some delay
176 delay
= self
.min_delay
+ (self
.max_delay
-self
.min_delay
)*random
.random()
177 logger
.info("Killing {} [{}] in {}".format(component_name
, process_pid
, delay
))
178 cmd
= '(sleep {} && {} -- "sudo kill -9 {}") &'.format(delay
, ssh_cmd
, process_pid
)
181 # Invoke session operation
185 while attempt
< self
.attempts
:
187 result
= operation(*args
, **kwargs
)
188 # Possible improvement: implement optional verify step here
191 logger
.error('operation failed - {}'.format(operation
))
193 # If the operation failed, wait until recovery occurs to re-attempt
194 rift
.vcs
.vcs
.wait_until_system_started(self
.session
)
196 if attempt
>= self
.attempts
:
197 raise MaxRetriesExceededException("Killed %s [%d] - Subsequently failed operation : %s %s %s", component_name
, process_pid
, operation
, args
, kwargs
)
199 # Wait until kill has definitely happened
200 elapsed
= now
- time
.time()
201 remaining
= delay
- elapsed
203 time
.sleep(remaining
)
206 # Verify system reaches running status again
207 rift
.vcs
.vcs
.wait_until_system_started(self
.session
)
209 # TODO: verify the tasklet process was actually restarted (got a new pid)
210 new_pid
= get_component_process_pid(component_name
)
211 if process_pid
== new_pid
:
212 raise HAVerifyException("Process pid unchanged : %d == %d ~ didn't die?" % (process_pid
, new_pid
))
217 def ha_session(mgmt_session
):
218 return HASession(mgmt_session
)
220 @pytest.mark
.depends('seed_random')
221 @pytest.mark
.setup('launchpad')
222 @pytest.mark
.incremental
223 class TestLaunchpadSetup
:
224 def test_create_cloud_accounts(self
, ha_session
, mgmt_session
, cloud_module
, cloud_xpath
, cloud_accounts
):
225 '''Configure cloud accounts
228 Cloud name and cloud type details
230 for cloud_account
in cloud_accounts
:
231 xpath
= "{cloud_xpath}[name={cloud_account_name}]".format(
232 cloud_xpath
=cloud_xpath
,
233 cloud_account_name
=quoted_key(cloud_account
.name
)
235 ha_session
.call(mgmt_session
.proxy(cloud_module
).replace_config
, xpath
, cloud_account
)
236 response
= ha_session
.call(mgmt_session
.proxy(cloud_module
).get
, xpath
)
237 assert response
.name
== cloud_account
.name
238 assert response
.account_type
== cloud_account
.account_type
240 @pytest.mark
.teardown('launchpad')
241 @pytest.mark
.incremental
242 class TestLaunchpadTeardown
:
243 def test_delete_cloud_accounts(self
, ha_session
, mgmt_session
, cloud_module
, cloud_xpath
, cloud_accounts
):
244 '''Unconfigure cloud_account'''
245 for cloud_account
in cloud_accounts
:
246 xpath
= "{cloud_xpath}[name={cloud_account_name}]".format(
247 cloud_xpath
=cloud_xpath
,
248 cloud_account_name
=quoted_key(cloud_account
.name
)
250 ha_session
.call(mgmt_session
.proxy(cloud_module
).delete_config
, xpath
)
252 @pytest.mark
.setup('pingpong')
253 @pytest.mark
.depends('launchpad')
254 @pytest.mark
.incremental
255 class TestSetupPingpong(object):
256 def test_onboard(self
, ha_session
, mgmt_session
, descriptors
):
257 for descriptor
in descriptors
:
258 with ha_session
.config(max_delay
=15):
259 ha_session
.call(rift
.auto
.descriptor
.onboard
, mgmt_session
, descriptor
)
261 def test_instantiate(self
, ha_session
, mgmt_session
, cloud_account_name
):
262 catalog
= ha_session
.call(mgmt_session
.proxy(RwProjectNsdYang
).get_config
, '/nsd-catalog')
264 nsr
= rift
.auto
.descriptor
.create_nsr(cloud_account_name
, "pingpong_1", nsd
)
265 ha_session
.call(mgmt_session
.proxy(RwNsrYang
).create_config
, '/ns-instance-config/nsr', nsr
)
267 @pytest.mark
.depends('pingpong')
268 @pytest.mark
.teardown('pingpong')
269 @pytest.mark
.incremental
270 class TestTeardownPingpong(object):
271 def test_teardown(self
, ha_session
, mgmt_session
):
272 ns_instance_config
= ha_session
.call(mgmt_session
.proxy(RwNsrYang
).get_config
, '/ns-instance-config')
273 for nsr
in ns_instance_config
.nsr
:
274 ha_session
.call(mgmt_session
.proxy(RwNsrYang
).delete_config
, "/ns-instance-config/nsr[id={}]".format(quoted_key(nsr
.id)))
277 vnfr_catalog
= ha_session
.call(mgmt_session
.proxy(RwVnfrYang
).get
, '/vnfr-catalog')
278 assert vnfr_catalog
is None or len(vnfr_catalog
.vnfr
) == 0
280 @pytest.mark
.depends('launchpad')
281 @pytest.mark
.incremental
283 def test_account_connection_status(self
, ha_session
, mgmt_session
, cloud_module
, cloud_xpath
, cloud_accounts
):
284 '''Verify connection status on each cloud account
287 Cloud account is successfully connected
289 for cloud_account
in cloud_accounts
:
290 with ha_session
.config(attempts
=2):
292 mgmt_session
.proxy(cloud_module
).wait_for
,
293 '{}[name={}]/connection-status/status'.format(cloud_xpath
, quoted_key(cloud_account
.name
)),
299 @pytest.mark
.depends('pingpong')
300 @pytest.mark
.incremental
302 def test_service_started(self
, ha_session
, mgmt_session
):
303 nsr_opdata
= ha_session
.call(mgmt_session
.proxy(RwNsrYang
).get
, '/ns-instance-opdata')
304 nsrs
= nsr_opdata
.nsr
308 "/ns-instance-opdata/nsr[ns-instance-config-ref={ns_instance_config_ref}]/operational-status"
310 ns_instance_config_ref
=quoted_key(nsr
.ns_instance_config_ref
)
313 with ha_session
.config(attempts
=2, max_delay
=60):
314 ha_session
.call(mgmt_session
.proxy(RwNsrYang
).wait_for
, xpath
, "running", fail_on
=['failed'], timeout
=300)
316 def test_service_configured(self
, ha_session
, mgmt_session
):
317 nsr_opdata
= ha_session
.call(mgmt_session
.proxy(RwNsrYang
).get
, '/ns-instance-opdata')
318 nsrs
= nsr_opdata
.nsr
322 "/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status"
324 quoted_key(nsr
.ns_instance_config_ref
)
327 with ha_session
.config(attempts
=2, max_delay
=60):
328 ha_session
.call(mgmt_session
.proxy(RwNsrYang
).wait_for
, xpath
, "configured", fail_on
=['failed'], timeout
=300)