13 # Disable InsecureRequestWarning w/LXD
15 urllib3
.disable_warnings()
17 here
= os
.path
.dirname(os
.path
.realpath(__file__
))
21 return "{}/charms".format(here
)
25 return "{}/charms/layers".format(here
)
28 def parse_metrics(application
, results
):
29 """Parse the returned metrics into a dict."""
31 # We'll receive the results for all units, to look for the one we want
32 # Caveat: we're grabbing results from the first unit of the application,
33 # which is enough for testing, since we're only deploying a single unit.
36 if unit
.startswith(application
):
37 for result
in results
[unit
]:
38 retval
[result
['key']] = result
['value']
41 def collect_metrics(application
):
42 """Invoke Juju's metrics collector.
44 Caveat: this shells out to the `juju collect-metrics` command, rather than
45 making an API call. At the time of writing, that API is not exposed through
50 logging
.debug("Collecting metrics")
51 subprocess
.check_call(['juju', 'collect-metrics', application
])
52 except subprocess
.CalledProcessError
as e
:
53 raise Exception("Unable to collect metrics: {}".format(e
))
56 def build_charm(charm
):
57 """Build a test charm.
59 Builds one of the charms in tests/charms/layers and returns the path
60 to the compiled charm. The calling test is responsible for removing
61 the charm artifact during cleanup.
63 # stream_handler = logging.StreamHandler(sys.stdout)
64 # log.addHandler(stream_handler)
66 # Make sure the charm snap is installed
68 logging
.debug("Looking for charm-tools")
69 subprocess
.check_call(['which', 'charm'])
70 except subprocess
.CalledProcessError
as e
:
71 raise Exception("charm snap not installed.")
74 builds
= get_charm_path()
76 cmd
= "charm build {}/{} -o {}/".format(
81 subprocess
.check_call(shlex
.split(cmd
))
82 return "{}/{}".format(builds
, charm
)
83 except subprocess
.CalledProcessError
as e
:
84 raise Exception("charm build failed: {}.".format(e
))
89 def get_descriptor(descriptor
):
92 tmp
= yaml
.load(descriptor
)
95 root
= list(tmp
.keys())[0]
96 if root
== "nsd:nsd-catalog":
97 desc
= tmp
['nsd:nsd-catalog']['nsd'][0]
98 elif root
== "vnfd:vnfd-catalog":
99 desc
= tmp
['vnfd:vnfd-catalog']['vnfd'][0]
105 """Return an instance of N2VC.VNF."""
106 log
= logging
.getLogger()
107 log
.level
= logging
.DEBUG
109 # Extract parameters from the environment in order to run our test
110 vca_host
= os
.getenv('VCA_HOST', '127.0.0.1')
111 vca_port
= os
.getenv('VCA_PORT', 17070)
112 vca_user
= os
.getenv('VCA_USER', 'admin')
113 vca_charms
= os
.getenv('VCA_CHARMS', None)
114 vca_secret
= os
.getenv('VCA_SECRET', None)
115 client
= n2vc
.vnf
.N2VC(
121 artifacts
=vca_charms
,
125 def create_lxd_container(public_key
=None):
127 Returns a container object
129 If public_key isn't set, we'll use the Juju ssh key
132 client
= get_lxd_client()
133 test_machine
= "test-{}-add-manual-machine-ssh".format(
134 uuid
.uuid4().hex[-4:]
137 private_key_path
, public_key_path
= find_juju_ssh_keys()
138 # private_key_path = os.path.expanduser(
139 # "~/.local/share/juju/ssh/juju_id_rsa"
141 # public_key_path = os.path.expanduser(
142 # "~/.local/share/juju/ssh/juju_id_rsa.pub"
145 # Use the self-signed cert generated by lxc on first run
146 crt
= os
.path
.expanduser('~/snap/lxd/current/.config/lxc/client.crt')
147 assert os
.path
.exists(crt
)
149 key
= os
.path
.expanduser('~/snap/lxd/current/.config/lxc/client.key')
150 assert os
.path
.exists(key
)
152 # create profile w/cloud-init and juju ssh key
155 with
open(public_key_path
, "r") as f
:
156 public_key
= f
.readline()
158 profile
= client
.profiles
.create(
160 config
={'user.user-data': '#cloud-config\nssh_authorized_keys:\n- {}'.format(public_key
)},
162 'root': {'path': '/', 'pool': 'default', 'type': 'disk'},
164 'nictype': 'bridged',
173 'name': test_machine
,
178 'protocol': 'simplestreams',
179 'server': 'https://cloud-images.ubuntu.com/releases',
181 'profiles': [test_machine
],
183 container
= client
.containers
.create(config
, wait
=True)
184 container
.start(wait
=True)
186 def wait_for_network(container
, timeout
=30):
187 """Wait for eth0 to have an ipv4 address."""
188 starttime
= time
.time()
189 while(time
.time() < starttime
+ timeout
):
191 if 'eth0' in container
.state().network
:
192 addresses
= container
.state().network
['eth0']['addresses']
193 if len(addresses
) > 0:
194 if addresses
[0]['family'] == 'inet':
198 host
= wait_for_network(container
)
200 # HACK: We need to give sshd a chance to bind to the interface,
201 # and pylxd's container.execute seems to be broken and fails and/or
202 # hangs trying to properly check if the service is up.
208 def destroy_lxd_container(container
):
209 """Stop and delete a LXD container."""
210 container
.stop(wait
=True)
214 def find_lxd_config():
215 """Find the LXD configuration directory."""
217 paths
.append(os
.path
.expanduser("~/.config/lxc"))
218 paths
.append(os
.path
.expanduser("~/snap/lxd/current/.config/lxc"))
221 if os
.path
.exists(path
):
222 crt
= os
.path
.expanduser("{}/client.crt".format(path
))
223 key
= os
.path
.expanduser("{}/client.key".format(path
))
224 if os
.path
.exists(crt
) and os
.path
.exists(key
):
229 def find_juju_ssh_keys():
230 """Find the Juju ssh keys."""
233 paths
.append(os
.path
.expanduser("~/.local/share/juju/ssh/"))
236 if os
.path
.exists(path
):
237 private
= os
.path
.expanduser("{}/juju_id_rsa".format(path
))
238 public
= os
.path
.expanduser("{}/juju_id_rsa.pub".format(path
))
239 if os
.path
.exists(private
) and os
.path
.exists(public
):
240 return (private
, public
)
244 def get_juju_private_key():
245 keys
= find_juju_ssh_keys()
249 def get_lxd_client(host
="127.0.0.1", port
="8443", verify
=False):
250 """ Get the LXD client."""
252 (crt
, key
) = find_lxd_config()
255 client
= pylxd
.Client(
256 endpoint
="https://{}:{}".format(host
, port
),