4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
19 @file test_failover.py
20 @brief System test of stopping launchpad on master and
21 validating configuration on standby
30 from gi
.repository
import RwVnfdYang
31 from gi
.repository
import RwVnfrYang
33 import rift
.auto
.proxy
34 from rift
.auto
.session
import NetconfSession
36 def yield_vnfd_vnfr_pairs(proxy
, nsr
=None):
38 Yields tuples of vnfd & vnfr entries.
41 proxy (callable): Launchpad proxy
42 nsr (optional): If specified, only the vnfr & vnfd records of the NSR
46 Tuple: VNFD and its corresponding VNFR entry
48 def get_vnfd(vnfd_id
):
49 xpath
= "/vnfd-catalog/vnfd[id='{}']".format(vnfd_id
)
50 return proxy(RwVnfdYang
).get(xpath
)
52 vnfr
= "/vnfr-catalog/vnfr"
54 vnfrs
= proxy(RwVnfrYang
).get(vnfr
, list_obj
=True)
56 for vnfr
in vnfrs
.vnfr
:
59 const_vnfr_ids
= [const_vnfr
.vnfr_id
for const_vnfr
in nsr
.constituent_vnfr_ref
]
60 if vnfr
.id not in const_vnfr_ids
:
63 vnfd
= get_vnfd(vnfr
.vnfd
.id)
66 def check_configuration_on_standby(standby_ip
):
67 print ("Start- check_configuration_on_standby")
68 mgmt_session
= NetconfSession(standby_ip
)
69 mgmt_session
.connect()
70 print ("Connected to proxy")
72 vnf_tuple
= list(yield_vnfd_vnfr_pairs(mgmt_session
.proxy
))
73 assert len(vnf_tuple
) == 2
75 if __name__
== "__main__":
76 parser
= argparse
.ArgumentParser(description
='Test launchpad failover')
77 parser
.add_argument("--master-ip", action
="store", dest
="master_ip")
78 parser
.add_argument("--standby-ip", action
="store", dest
="standby_ip")
80 args
= parser
.parse_args()
82 # 60 seconds should be more than enough time for Agent to be able
83 # to make confd as the new Master
85 print ("Try fetching configuration from the old standby or the new Master\n")
86 check_configuration_on_standby(args
.standby_ip
)