add function to create datacenter at RO in test_cirros
[osm/devops.git] / test / test_cirros.py
1 #!/usr/bin/env python3
2 # Copyright 2016 RIFT.IO Inc
3 # Copyright 2016 Telefónica Investigación y Desarrollo S.A.U.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import json
18 import logging
19 import pytest
20 import requests
21 import subprocess
22 import time
23
24 import certs
25
26 logger = logging.getLogger(__name__)
27
28
29 def chomp_json(string):
30 return ''.join([line.strip() for line in string.splitlines()])
31
32
33 @pytest.fixture(scope='session')
34 def session(request, so_user, so_pass):
35 client_session = requests.session()
36 client_session.auth = (so_user, so_pass)
37 client_session.headers = {
38 "Accept": "application/vnd.yang.data+json",
39 "Content-Type": "application/vnd.yang.data+json",
40 }
41 client_session.verify = False
42 _, cert, key = certs.get_bootstrap_cert_and_key()
43 client_session.cert = (cert, key)
44 return client_session
45
46
47 @pytest.fixture(scope='session')
48 def fetch_packages():
49 """ Fetch NSD/VNFD packages"""
50 wget_command = 'wget --no-parent -r https://osm-download.etsi.org/ftp/osm-1.0-one/vnf-packages'
51 subprocess.check_call(wget_command, shell=True)
52
53
54 @pytest.fixture(scope='session')
55 def rest_endpoint(so_host, so_port):
56 return 'https://{host}:{port}'.format(host=so_host, port=so_port)
57
58
59 def test_so_started(session, rest_endpoint):
60 ''' Get contents of /vcs/info/components and verify all components have started successfully
61 '''
62 uri = "{endpoint}/api/operational/vcs/info/components".format(endpoint=rest_endpoint)
63 response = session.request("GET", uri)
64 vcs_info = json.loads(response.text)
65 for component in vcs_info['rw-base:components']['component_info']:
66 assert component['state'] == 'RUNNING'
67
68
69 @pytest.fixture(scope='session')
70 def cirros_vnfd_pkg(package_location):
71 '''cirros vnfd package location'''
72 return "%s/cirros_vnf.tar.gz" % (package_location)
73
74
75 @pytest.fixture(scope='session')
76 def cirros_nsd_pkg(package_location):
77 '''cirros nsd package location'''
78 return "%s/cirros_2vnf_ns.tar.gz" % (package_location)
79
80
81 def test_onboard_cirros_descriptors(session, so_host, cirros_vnfd_pkg,
82 cirros_nsd_pkg, rest_endpoint):
83 ''' Onboard Cirros NSD/VNFD descriptors
84 '''
85 onboard_command = 'onboard_pkg -s {host} -u {cirros_vnfd_pkg}'.format(
86 host=so_host,
87 cirros_vnfd_pkg=cirros_vnfd_pkg,
88 )
89 subprocess.check_call(onboard_command, shell=True)
90
91 onboard_command = 'onboard_pkg -s {host} -u {cirros_nsd_pkg}'.format(
92 host=so_host,
93 cirros_nsd_pkg=cirros_nsd_pkg,
94 )
95 subprocess.check_call(onboard_command, shell=True)
96
97
98 def test_instantiate_cirros(session, so_host, data_center_id, rest_endpoint):
99 ''' Instantiate an instance of cirros from descriptors
100 '''
101 uri = "{endpoint}/api/operational/nsd-catalog".format(endpoint=rest_endpoint)
102 response = session.request("GET", uri)
103 catalog = json.loads(response.text)
104 cirros_nsd = None
105 for nsd in catalog['nsd:nsd-catalog']['nsd']:
106 if nsd['name'] == 'cirros_2vnf_nsd':
107 cirros_nsd = nsd
108 break
109 assert cirros_nsd is not None
110
111 instantiate_command = 'onboard_pkg -s {host} -i instance-0 -d {nsd_id} -D {data_center_id}'.format(
112 host=so_host,
113 nsd_id=cirros_nsd['id'],
114 data_center_id=data_center_id
115 )
116 subprocess.check_call(instantiate_command, shell=True)
117
118 def wait_for_cirros_ns(instance_name, timeout=600, retry_interval=5):
119 start_time = time.time()
120 while True:
121 uri = "{endpoint}/api/operational/ns-instance-opdata".format(endpoint=rest_endpoint)
122 response = session.request("GET", uri)
123 print(response, response.text)
124 opdata = json.loads(response.text)
125
126 nsr = None
127 for instance in opdata['nsr:ns-instance-opdata']['nsr']:
128 if instance['name-ref'] == instance_name:
129 nsr = instance
130
131 assert nsr is not None, response.text
132 assert nsr['operational-status'] not in ['failed']
133 assert nsr['config-status'] not in ['failed']
134
135 if nsr['operational-status'] in ['running'] and nsr['config-status'] in ['configured']:
136 break
137
138 time_elapsed = time.time() - start_time
139 time_remaining = timeout - time_elapsed
140 assert time_remaining > 0
141 time.sleep(min(time_remaining, retry_interval))
142
143 wait_for_cirros_ns('instance-0')
144
145
146 @pytest.fixture(scope='session')
147 def test_add_datacenter(name, url, vim_type, tenant_id=None, tenant_name=None, user=None, password=None,
148 description=None, config=None):
149 ''' Add a datacenter to RO
150 '''
151 onboard_command = \
152 'lxc exec RO --env OPENMANO_TENANT=osm -- openmano datacenter-create "{name}" "{url}" --type={vimtype}'.format(
153 name=name, url=url, vimtype=vim_type)
154 if description:
155 onboard_command += ' --description="{}"'.format(description)
156 out = subprocess.check_output(onboard_command, shell=True)
157 assert out
158 datacenter_id = out.split()[0]
159
160 onboard_command = 'lxc exec RO --env OPENMANO_TENANT=osm -- openmano datacenter-attach {id}'.format(
161 id=datacenter_id)
162 if tenant_id:
163 onboard_command += " --vim_tenant_id=" + tenant_id
164 if tenant_name:
165 onboard_command += " --vim_tenant_name=" + tenant_name
166 if user:
167 onboard_command += " --user=" + user
168 if password:
169 onboard_command += " --password=" + password
170 if config:
171 onboard_command += " --config=" + yaml.safe_dump(config)
172
173 subprocess.check_call(onboard_command, shell=True)
174 return datacenter_id
175
176 @pytest.fixture(scope='session')
177 def get_datacenter_id(name):
178 ''' Get the id of a datacenter
179 '''
180 onboard_command = \
181 'lxc exec RO --env OPENMANO_TENANT=osm -- openmano datacenter-list {name}'.format(name=name)
182 out = subprocess.check_output(onboard_command, shell=True)
183 assert(out)
184 datacenter_id = out.split()[0]
185 return datacenter_id