Implement Model.add_machine()
[osm/N2VC.git] / juju / model.py
index 4a8bc03..ef6bc85 100644 (file)
@@ -1,9 +1,13 @@
 import asyncio
 import collections
+import json
 import logging
 import os
 import re
+import stat
+import tempfile
 import weakref
+import zipfile
 from concurrent.futures import CancelledError
 from functools import partial
 from pathlib import Path
@@ -18,7 +22,8 @@ from .constraints import parse as parse_constraints, normalize_key
 from .delta import get_entity_delta
 from .delta import get_entity_class
 from .exceptions import DeadEntityException
-from .errors import JujuAPIError
+from .errors import JujuError, JujuAPIError
+from .placement import parse as parse_placement
 
 log = logging.getLogger(__name__)
 
@@ -74,6 +79,14 @@ class ModelObserver(object):
         await method(delta, old, new, model)
 
     async def on_change(self, delta, old, new, model):
+        """Generic model-change handler.
+
+        :param delta: :class:`juju.client.overrides.Delta`
+        :param old: :class:`juju.model.ModelEntity`
+        :param new: :class:`juju.model.ModelEntity`
+        :param model: :class:`juju.model.Model`
+
+        """
         pass
 
 
@@ -411,6 +424,59 @@ class Model(object):
             await self.connection.close()
             self.connection = None
 
+    async def add_local_charm_dir(self, charm_dir, series):
+        """Upload a local charm to the model.
+
+        This will automatically generate an archive from
+        the charm dir.
+
+        :param charm_dir: Path to the charm directory
+        :param series: Charm series
+
+        """
+        fh = tempfile.NamedTemporaryFile()
+        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
+        with fh:
+            func = partial(
+                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
+            charm_url = await self.loop.run_in_executor(None, func)
+
+        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
+        return charm_url
+
+    def add_local_charm(self, charm_file, series, size=None):
+        """Upload a local charm archive to the model.
+
+        Returns the 'local:...' url that should be used to deploy the charm.
+
+        :param charm_file: Path to charm zip archive
+        :param series: Charm series
+        :param size: Size of the archive, in bytes
+        :return str: 'local:...' url for deploying the charm
+        :raises: :class:`JujuError` if the upload fails
+
+        Uses an https endpoint at the same host:port as the wss.
+        Supports large file uploads.
+
+        .. warning::
+
+           This method will block. Consider using :meth:`add_local_charm_dir`
+           instead.
+
+        """
+        conn, headers, path_prefix = self.connection.https_connection()
+        path = "%s/charms?series=%s" % (path_prefix, series)
+        headers['Content-Type'] = 'application/zip'
+        if size:
+            headers['Content-Length'] = size
+        conn.request("POST", path, charm_file, headers)
+        response = conn.getresponse()
+        result = response.read().decode()
+        if not response.status == 200:
+            raise JujuError(result)
+        result = json.loads(result)
+        return result['charm-url']
+
     def all_units_idle(self):
         """Return True if all units are idle.
 
@@ -654,9 +720,8 @@ class Model(object):
 
         return await self._wait('action', action_id, 'change', predicate)
 
-    def add_machine(
-            self, spec=None, constraints=None, disks=None, series=None,
-            count=1):
+    async def add_machine(
+            self, spec=None, constraints=None, disks=None, series=None):
         """Start a new, empty machine and optionally a container, or add a
         container to a machine.
 
@@ -664,25 +729,64 @@ class Model(object):
             Examples::
 
                 (None) - starts a new machine
-                'lxc' - starts a new machine with on lxc container
-                'lxc:4' - starts a new lxc container on machine 4
+                'lxd' - starts a new machine with one lxd container
+                'lxd:4' - starts a new lxd container on machine 4
                 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
                 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
                 'maas2.name' - acquire machine maas2.name on MAAS
-        :param constraints: Machine constraints
-        :type constraints: :class:`juju.Constraints`
-        :param list disks: List of disk :class:`constraints <juju.Constraints>`
-        :param str series: Series
-        :param int count: Number of machines to deploy
 
-        Supported container types are: lxc, lxd, kvm
+        :param dict constraints: Machine constraints
+            Example::
+
+                constraints={
+                    'mem': 256 * MB,
+                }
+
+        :param list disks: List of disk constraint dictionaries
+            Example::
+
+                disks=[{
+                    'pool': 'rootfs',
+                    'size': 10 * GB,
+                    'count': 1,
+                }]
+
+        :param str series: Series, e.g. 'xenial'
+
+        Supported container types are: lxd, kvm
 
         When deploying a container to an existing machine, constraints cannot
         be used.
 
         """
-        pass
-    add_machines = add_machine
+        params = client.AddMachineParams()
+        params.jobs = ['JobHostUnits']
+
+        if spec:
+            placement = parse_placement(spec)
+            if placement:
+                params.placement = placement[0]
+
+        if constraints:
+            params.constraints = client.Value.from_json(constraints)
+
+        if disks:
+            params.disks = [
+                client.Constraints.from_json(o) for o in disks]
+
+        if series:
+            params.series = series
+
+        # Submit the request.
+        client_facade = client.ClientFacade()
+        client_facade.connect(self.connection)
+        results = await client_facade.AddMachines([params])
+        error = results.machines[0].error
+        if error:
+            raise ValueError("Error adding machine: %s", error.message)
+        machine_id = results.machines[0].machine
+        log.debug('Added new machine %s', machine_id)
+        return await self._wait_for_new('machine', machine_id)
 
     async def add_relation(self, relation1, relation2):
         """Add a relation between two applications.
@@ -845,11 +949,11 @@ class Model(object):
         :param dict resources: <resource name>:<file path> pairs
         :param str series: Series on which to deploy
         :param dict storage: Storage constraints TODO how do these look?
-        :param str to: Placement directive, e.g.::
+        :param to: Placement directive as a string. For example:
 
-            '23' - machine 23
-            'lxc:7' - new lxc container on machine 7
-            '24/lxc/3' - lxc container 3 or machine 24
+            '23' - place on machine 23
+            'lxd:7' - place in new lxd container on machine 7
+            '24/lxd/3' - place in container 3 on machine 24
 
             If None, a new machine is provisioned.
 
@@ -862,9 +966,7 @@ class Model(object):
 
         """
         if to:
-            placement = [
-                client.Placement(**p) for p in to
-            ]
+            placement = parse_placement(to)
         else:
             placement = []
 
@@ -874,8 +976,10 @@ class Model(object):
                 for k, v in storage.items()
             }
 
-        is_local = not entity_url.startswith('cs:') and \
+        is_local = (
+            entity_url.startswith('local:') or
             os.path.isdir(entity_url)
+        )
         entity_id = await self.charmstore.entityId(entity_url) \
             if not is_local else entity_url
 
@@ -908,7 +1012,20 @@ class Model(object):
             log.debug(
                 'Deploying %s', entity_id)
 
-            await client_facade.AddCharm(channel, entity_id)
+            if not is_local:
+                await client_facade.AddCharm(channel, entity_id)
+            elif not entity_id.startswith('local:'):
+                # We have a local charm dir that needs to be uploaded
+                charm_dir = os.path.abspath(
+                    os.path.expanduser(entity_id))
+                series = series or get_charm_series(charm_dir)
+                if not series:
+                    raise JujuError(
+                        "Couldn't determine series for charm at {}. "
+                        "Pass a 'series' kwarg to Model.deploy().".format(
+                            charm_dir))
+                entity_id = await self.add_local_charm_dir(charm_dir, series)
+
             app = client.ApplicationDeploy(
                 application=application_name,
                 channel=channel,
@@ -917,13 +1034,16 @@ class Model(object):
                 constraints=parse_constraints(constraints),
                 endpoint_bindings=bind,
                 num_units=num_units,
-                placement=placement,
                 resources=resources,
                 series=series,
                 storage=storage,
             )
+            app.placement = placement
 
-            await app_facade.Deploy([app])
+            result = await app_facade.Deploy([app])
+            errors = [r.error.message for r in result.results if r.error]
+            if errors:
+                raise JujuError('\n'.join(errors))
             return await self._wait_for_new('application', application_name)
 
     def destroy(self):
@@ -944,7 +1064,7 @@ class Model(object):
             's' if len(unit_names) == 1 else '',
             ' '.join(unit_names))
 
-        return await app_facade.Destroy(self.name)
+        return await app_facade.DestroyUnits(list(unit_names))
     destroy_units = destroy_unit
 
     def get_backup(self, archive_id):
@@ -1264,6 +1384,8 @@ class Model(object):
 
         :param str \*tags: Tags of entities from which to retrieve metrics.
             No tags retrieves the metrics of all units in the model.
+        :return: Dictionary of unit_name:metrics
+
         """
         log.debug("Retrieving metrics for %s",
                   ', '.join(tags) if tags else "all units")
@@ -1290,6 +1412,21 @@ class Model(object):
         return metrics
 
 
+def get_charm_series(path):
+    """Inspects the charm directory at ``path`` and returns a default
+    series from its metadata.yaml (the first item in the 'series' list).
+
+    Returns None if no series can be determined.
+
+    """
+    md = Path(path) / "metadata.yaml"
+    if not md.exists():
+        return None
+    data = yaml.load(md.open())
+    series = data.get('series')
+    return series[0] if series else None
+
+
 class BundleHandler(object):
     """
     Handle bundles by using the API to translate bundle YAML into a plan of
@@ -1311,6 +1448,54 @@ class BundleHandler(object):
         self.ann_facade = client.AnnotationsFacade()
         self.ann_facade.connect(model.connection)
 
+    async def _handle_local_charms(self, bundle):
+        """Search for references to local charms (i.e. filesystem paths)
+        in the bundle. Upload the local charms to the model, and replace
+        the filesystem paths with appropriate 'local:' paths in the bundle.
+
+        Return the modified bundle.
+
+        :param dict bundle: Bundle dictionary
+        :return: Modified bundle dictionary
+
+        """
+        apps, args = [], []
+
+        default_series = bundle.get('series')
+        for app_name in self.applications:
+            app_dict = bundle['services'][app_name]
+            charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
+            if not os.path.isdir(charm_dir):
+                continue
+            series = (
+                app_dict.get('series') or
+                default_series or
+                get_charm_series(charm_dir)
+            )
+            if not series:
+                raise JujuError(
+                    "Couldn't determine series for charm at {}. "
+                    "Add a 'series' key to the bundle.".format(charm_dir))
+
+            # Keep track of what we need to update. We keep a list of apps
+            # that need to be updated, and a corresponding list of args
+            # needed to update those apps.
+            apps.append(app_name)
+            args.append((charm_dir, series))
+
+        if apps:
+            # If we have apps to update, spawn all the coroutines concurrently
+            # and wait for them to finish.
+            charm_urls = await asyncio.gather(*[
+                self.model.add_local_charm_dir(*params)
+                for params in args
+            ])
+            # Update the 'charm:' entry for each app with the new 'local:' url.
+            for app_name, charm_url in zip(apps, charm_urls):
+                bundle['services'][app_name]['charm'] = charm_url
+
+        return bundle
+
     async def fetch_plan(self, entity_id):
         is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
         if is_local:
@@ -1320,7 +1505,13 @@ class BundleHandler(object):
                                                       filename='bundle.yaml',
                                                       read_file=True)
         self.bundle = yaml.safe_load(bundle_yaml)
-        self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
+        self.bundle = await self._handle_local_charms(self.bundle)
+
+        self.plan = await self.client_facade.GetBundleChanges(
+            yaml.dump(self.bundle))
+
+        if self.plan.errors:
+            raise JujuError('\n'.join(self.plan.errors))
 
     async def execute_plan(self):
         for step in self.plan.changes:
@@ -1346,6 +1537,11 @@ class BundleHandler(object):
             Series holds the series of the charm to be added
             if the charm default is not sufficient.
         """
+        # We don't add local charms because they've already been added
+        # by self._handle_local_charms
+        if charm.startswith('local:'):
+            return charm
+
         entity_id = await self.charmstore.entityId(charm)
         log.debug('Adding %s', entity_id)
         await self.client_facade.AddCharm(None, entity_id)
@@ -1555,3 +1751,79 @@ class CharmStore(object):
             setattr(self, name, coro)
             wrapper = coro
         return wrapper
+
+
+class CharmArchiveGenerator(object):
+    def __init__(self, path):
+        self.path = os.path.abspath(os.path.expanduser(path))
+
+    def make_archive(self, path):
+        """Create archive of directory and write to ``path``.
+
+        :param path: Path to archive
+
+        Ignored::
+
+            * build/\* - This is used for packing the charm itself and any
+                          similar tasks.
+            * \*/.\*    - Hidden files are all ignored for now.  This will most
+                          likely be changed into a specific ignore list
+                          (.bzr, etc)
+
+        """
+        zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+        for dirpath, dirnames, filenames in os.walk(self.path):
+            relative_path = dirpath[len(self.path) + 1:]
+            if relative_path and not self._ignore(relative_path):
+                zf.write(dirpath, relative_path)
+            for name in filenames:
+                archive_name = os.path.join(relative_path, name)
+                if not self._ignore(archive_name):
+                    real_path = os.path.join(dirpath, name)
+                    self._check_type(real_path)
+                    if os.path.islink(real_path):
+                        self._check_link(real_path)
+                        self._write_symlink(
+                            zf, os.readlink(real_path), archive_name)
+                    else:
+                        zf.write(real_path, archive_name)
+        zf.close()
+        return path
+
+    def _check_type(self, path):
+        """Check the path
+        """
+        s = os.stat(path)
+        if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
+            return path
+        raise ValueError("Invalid Charm at % %s" % (
+            path, "Invalid file type for a charm"))
+
+    def _check_link(self, path):
+        link_path = os.readlink(path)
+        if link_path[0] == "/":
+            raise ValueError(
+                "Invalid Charm at %s: %s" % (
+                    path, "Absolute links are invalid"))
+        path_dir = os.path.dirname(path)
+        link_path = os.path.join(path_dir, link_path)
+        if not link_path.startswith(os.path.abspath(self.path)):
+            raise ValueError(
+                "Invalid charm at %s %s" % (
+                    path, "Only internal symlinks are allowed"))
+
+    def _write_symlink(self, zf, link_target, link_path):
+        """Package symlinks with appropriate zipfile metadata."""
+        info = zipfile.ZipInfo()
+        info.filename = link_path
+        info.create_system = 3
+        # Magic code for symlinks / py2/3 compat
+        # 27166663808 = (stat.S_IFLNK | 0755) << 16
+        info.external_attr = 2716663808
+        zf.writestr(info, link_target)
+
+    def _ignore(self, path):
+        if path == "build" or path.startswith("build/"):
+            return True
+        if path.startswith('.'):
+            return True