9f9000efab1a315e619bca14de515b23caaea46c
11 # Disable InsecureRequestWarning w/LXD
13 urllib3
.disable_warnings()
16 def get_descriptor(descriptor
):
19 tmp
= yaml
.load(descriptor
)
22 root
= list(tmp
.keys())[0]
23 if root
== "nsd:nsd-catalog":
24 desc
= tmp
['nsd:nsd-catalog']['nsd'][0]
25 elif root
== "vnfd:vnfd-catalog":
26 desc
= tmp
['vnfd:vnfd-catalog']['vnfd'][0]
32 """Return an instance of N2VC.VNF."""
33 log
= logging
.getLogger()
34 log
.level
= logging
.DEBUG
36 # Extract parameters from the environment in order to run our test
37 vca_host
= os
.getenv('VCA_HOST', '127.0.0.1')
38 vca_port
= os
.getenv('VCA_PORT', 17070)
39 vca_user
= os
.getenv('VCA_USER', 'admin')
40 vca_charms
= os
.getenv('VCA_CHARMS', None)
41 vca_secret
= os
.getenv('VCA_SECRET', None)
42 client
= n2vc
.vnf
.N2VC(
52 def create_lxd_container(public_key
=None):
54 Returns a container object
56 If public_key isn't set, we'll use the Juju ssh key
59 client
= get_lxd_client()
60 test_machine
= "test-{}-add-manual-machine-ssh".format(
64 private_key_path
, public_key_path
= find_juju_ssh_keys()
65 # private_key_path = os.path.expanduser(
66 # "~/.local/share/juju/ssh/juju_id_rsa"
68 # public_key_path = os.path.expanduser(
69 # "~/.local/share/juju/ssh/juju_id_rsa.pub"
72 # Use the self-signed cert generated by lxc on first run
73 crt
= os
.path
.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
74 assert os
.path
.exists(crt
)
76 key
= os
.path
.expanduser('~/snap/lxd/current/.config/lxc/client.key')
77 assert os
.path
.exists(key
)
79 # create profile w/cloud-init and juju ssh key
82 with
open(public_key_path
, "r") as f
:
83 public_key
= f
.readline()
85 profile
= client
.profiles
.create(
87 config
={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key
)},
89 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
100 'name': test_machine
,
105 'protocol': 'simplestreams',
106 'server': 'https://cloud-images.ubuntu.com/releases',
108 'profiles': [test_machine
],
110 container
= client
.containers
.create(config
, wait
=True)
111 container
.start(wait
=True)
113 def wait_for_network(container
, timeout
=30):
114 """Wait for eth0 to have an ipv4 address."""
115 starttime
= time
.time()
116 while(time
.time() < starttime
+ timeout
):
118 if 'eth0' in container
.state().network
:
119 addresses
= container
.state().network
['eth0']['addresses']
120 if len(addresses
) > 0:
121 if addresses
[0]['family'] == 'inet':
125 host
= wait_for_network(container
)
127 # HACK: We need to give sshd a chance to bind to the interface,
128 # and pylxd's container.execute seems to be broken and fails and/or
129 # hangs trying to properly check if the service is up.
135 def destroy_lxd_container(container
):
136 """Stop and delete a LXD container."""
137 container
.stop(wait
=True)
141 def find_lxd_config():
142 """Find the LXD configuration directory."""
144 paths
.append(os
.path
.expanduser("~/.config/lxc"))
145 paths
.append(os
.path
.expanduser("~/snap/lxd/current/.config/lxc"))
148 if os
.path
.exists(path
):
149 crt
= os
.path
.expanduser("{}/client.crt".format(path
))
150 key
= os
.path
.expanduser("{}/client.key".format(path
))
151 if os
.path
.exists(crt
) and os
.path
.exists(key
):
156 def find_juju_ssh_keys():
157 """Find the Juju ssh keys."""
160 paths
.append(os
.path
.expanduser("~/.local/share/juju/ssh/"))
163 if os
.path
.exists(path
):
164 private
= os
.path
.expanduser("{}/juju_id_rsa".format(path
))
165 public
= os
.path
.expanduser("{}/juju_id_rsa.pub".format(path
))
166 if os
.path
.exists(private
) and os
.path
.exists(public
):
167 return (private
, public
)
171 def get_juju_private_key():
172 keys
= find_juju_ssh_keys()
176 def get_lxd_client(host
="127.0.0.1", port
="8443", verify
=False):
177 """ Get the LXD client."""
179 (crt
, key
) = find_lxd_config()
182 client
= pylxd
.Client(
183 endpoint
="https://{}:{}".format(host
, port
),