Merge "Fix bug #502"
[osm/N2VC.git] / tests / utils.py
1 #!/usr/bin/env python3
2
3 import logging
4 import n2vc.vnf
5 import pylxd
6 import os
7 import time
8 import uuid
9 import yaml
10
11 # Disable InsecureRequestWarning w/LXD
12 import urllib3
13 urllib3.disable_warnings()
14
15
16 def get_descriptor(descriptor):
17 desc = None
18 try:
19 tmp = yaml.load(descriptor)
20
21 # Remove the envelope
22 root = list(tmp.keys())[0]
23 if root == "nsd:nsd-catalog":
24 desc = tmp['nsd:nsd-catalog']['nsd'][0]
25 elif root == "vnfd:vnfd-catalog":
26 desc = tmp['vnfd:vnfd-catalog']['vnfd'][0]
27 except ValueError:
28 assert False
29 return desc
30
31 def get_n2vc():
32 """Return an instance of N2VC.VNF."""
33 log = logging.getLogger()
34 log.level = logging.DEBUG
35
36 # Extract parameters from the environment in order to run our test
37 vca_host = os.getenv('VCA_HOST', '127.0.0.1')
38 vca_port = os.getenv('VCA_PORT', 17070)
39 vca_user = os.getenv('VCA_USER', 'admin')
40 vca_charms = os.getenv('VCA_CHARMS', None)
41 vca_secret = os.getenv('VCA_SECRET', None)
42 client = n2vc.vnf.N2VC(
43 log=log,
44 server=vca_host,
45 port=vca_port,
46 user=vca_user,
47 secret=vca_secret,
48 artifacts=vca_charms,
49 )
50 return client
51
52 def create_lxd_container(public_key=None):
53 """
54 Returns a container object
55
56 If public_key isn't set, we'll use the Juju ssh key
57 """
58
59 client = get_lxd_client()
60 test_machine = "test-{}-add-manual-machine-ssh".format(
61 uuid.uuid4().hex[-4:]
62 )
63
64 private_key_path, public_key_path = find_juju_ssh_keys()
65 # private_key_path = os.path.expanduser(
66 # "~/.local/share/juju/ssh/juju_id_rsa"
67 # )
68 # public_key_path = os.path.expanduser(
69 # "~/.local/share/juju/ssh/juju_id_rsa.pub"
70 # )
71
72 # Use the self-signed cert generated by lxc on first run
73 crt = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
74 assert os.path.exists(crt)
75
76 key = os.path.expanduser('~/snap/lxd/current/.config/lxc/client.key')
77 assert os.path.exists(key)
78
79 # create profile w/cloud-init and juju ssh key
80 if not public_key:
81 public_key = ""
82 with open(public_key_path, "r") as f:
83 public_key = f.readline()
84
85 profile = client.profiles.create(
86 test_machine,
87 config={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key)},
88 devices={
89 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
90 'eth0': {
91 'nictype': 'bridged',
92 'parent': 'lxdbr0',
93 'type': 'nic'
94 }
95 }
96 )
97
98 # create lxc machine
99 config = {
100 'name': test_machine,
101 'source': {
102 'type': 'image',
103 'alias': 'xenial',
104 'mode': 'pull',
105 'protocol': 'simplestreams',
106 'server': 'https://cloud-images.ubuntu.com/releases',
107 },
108 'profiles': [test_machine],
109 }
110 container = client.containers.create(config, wait=True)
111 container.start(wait=True)
112
113 def wait_for_network(container, timeout=30):
114 """Wait for eth0 to have an ipv4 address."""
115 starttime = time.time()
116 while(time.time() < starttime + timeout):
117 time.sleep(1)
118 if 'eth0' in container.state().network:
119 addresses = container.state().network['eth0']['addresses']
120 if len(addresses) > 0:
121 if addresses[0]['family'] == 'inet':
122 return addresses[0]
123 return None
124
125 host = wait_for_network(container)
126
127 # HACK: We need to give sshd a chance to bind to the interface,
128 # and pylxd's container.execute seems to be broken and fails and/or
129 # hangs trying to properly check if the service is up.
130 time.sleep(5)
131
132 return container
133
134
135 def destroy_lxd_container(container):
136 """Stop and delete a LXD container."""
137 container.stop(wait=True)
138 container.delete()
139
140
141 def find_lxd_config():
142 """Find the LXD configuration directory."""
143 paths = []
144 paths.append(os.path.expanduser("~/.config/lxc"))
145 paths.append(os.path.expanduser("~/snap/lxd/current/.config/lxc"))
146
147 for path in paths:
148 if os.path.exists(path):
149 crt = os.path.expanduser("{}/client.crt".format(path))
150 key = os.path.expanduser("{}/client.key".format(path))
151 if os.path.exists(crt) and os.path.exists(key):
152 return (crt, key)
153 return (None, None)
154
155
156 def find_juju_ssh_keys():
157 """Find the Juju ssh keys."""
158
159 paths = []
160 paths.append(os.path.expanduser("~/.local/share/juju/ssh/"))
161
162 for path in paths:
163 if os.path.exists(path):
164 private = os.path.expanduser("{}/juju_id_rsa".format(path))
165 public = os.path.expanduser("{}/juju_id_rsa.pub".format(path))
166 if os.path.exists(private) and os.path.exists(public):
167 return (private, public)
168 return (None, None)
169
170
171 def get_juju_private_key():
172 keys = find_juju_ssh_keys()
173 return keys[0]
174
175
176 def get_lxd_client(host="127.0.0.1", port="8443", verify=False):
177 """ Get the LXD client."""
178 client = None
179 (crt, key) = find_lxd_config()
180
181 if crt and key:
182 client = pylxd.Client(
183 endpoint="https://{}:{}".format(host, port),
184 cert=(crt, key),
185 verify=verify,
186 )
187
188 return client