13 from concurrent
.futures
import CancelledError
14 from functools
import partial
15 from pathlib
import Path
17 import theblues
.charmstore
18 import theblues
.errors
22 from . import tag
, utils
23 from .client
import client
, connector
24 from .client
.client
import ConfigValue
25 from .client
.client
import Value
26 from .constraints
import parse
as parse_constraints
27 from .constraints
import normalize_key
28 from .delta
import get_entity_class
, get_entity_delta
29 from .errors
import JujuAPIError
, JujuError
30 from .exceptions
import DeadEntityException
31 from .placement
import parse
as parse_placement
32 from . import provisioner
35 log
= logging
.getLogger(__name__
)
39 """Wrapper around an observer callable.
41 This wrapper allows filter criteria to be associated with the
42 callable so that it's only called for changes that meet the criteria.
45 def __init__(self
, callable_
, entity_type
, action
, entity_id
, predicate
):
46 self
.callable_
= callable_
47 self
.entity_type
= entity_type
49 self
.entity_id
= entity_id
50 self
.predicate
= predicate
52 self
.entity_id
= str(self
.entity_id
)
53 if not self
.entity_id
.startswith('^'):
54 self
.entity_id
= '^' + self
.entity_id
55 if not self
.entity_id
.endswith('$'):
58 async def __call__(self
, delta
, old
, new
, model
):
59 await self
.callable_(delta
, old
, new
, model
)
61 def cares_about(self
, delta
):
62 """Return True if this observer "cares about" (i.e. wants to be
63 called) for a this delta.
66 if (self
.entity_id
and delta
.get_id() and
67 not re
.match(self
.entity_id
, str(delta
.get_id()))):
70 if self
.entity_type
and self
.entity_type
!= delta
.entity
:
73 if self
.action
and self
.action
!= delta
.type:
76 if self
.predicate
and not self
.predicate(delta
):
84 Base class for creating observers that react to changes in a model.
86 async def __call__(self
, delta
, old
, new
, model
):
87 handler_name
= 'on_{}_{}'.format(delta
.entity
, delta
.type)
88 method
= getattr(self
, handler_name
, self
.on_change
)
89 await method(delta
, old
, new
, model
)
91 async def on_change(self
, delta
, old
, new
, model
):
92 """Generic model-change handler.
94 This should be overridden in a subclass.
96 :param delta: :class:`juju.client.overrides.Delta`
97 :param old: :class:`juju.model.ModelEntity`
98 :param new: :class:`juju.model.ModelEntity`
99 :param model: :class:`juju.model.Model`
106 """Holds the state of the model, including the delta history of all
107 entities in the model.
110 def __init__(self
, model
):
114 def _live_entity_map(self
, entity_type
):
115 """Return an id:Entity map of all the living entities of
116 type ``entity_type``.
120 entity_id
: self
.get_entity(entity_type
, entity_id
)
121 for entity_id
, history
in self
.state
.get(entity_type
, {}).items()
122 if history
[-1] is not None
126 def applications(self
):
127 """Return a map of application-name:Application for all applications
128 currently in the model.
131 return self
._live
_entity
_map
('application')
135 """Return a map of machine-id:Machine for all machines currently in
139 return self
._live
_entity
_map
('machine')
143 """Return a map of unit-id:Unit for all units currently in
147 return self
._live
_entity
_map
('unit')
151 """Return a map of relation-id:Relation for all relations currently in
155 return self
._live
_entity
_map
('relation')
157 def entity_history(self
, entity_type
, entity_id
):
158 """Return the history deque for an entity.
161 return self
.state
[entity_type
][entity_id
]
163 def entity_data(self
, entity_type
, entity_id
, history_index
):
164 """Return the data dict for an entity at a specific index of its
168 return self
.entity_history(entity_type
, entity_id
)[history_index
]
170 def apply_delta(self
, delta
):
171 """Apply delta to our state and return a copy of the
172 affected object as it was before and after the update, e.g.:
174 old_obj, new_obj = self.apply_delta(delta)
176 old_obj may be None if the delta is for the creation of a new object,
177 e.g. a new application or unit is deployed.
179 new_obj will never be None, but may be dead (new_obj.dead == True)
180 if the object was deleted as a result of the delta being applied.
185 .setdefault(delta
.entity
, {})
186 .setdefault(delta
.get_id(), collections
.deque())
189 history
.append(delta
.data
)
190 if delta
.type == 'remove':
193 entity
= self
.get_entity(delta
.entity
, delta
.get_id())
194 return entity
.previous(), entity
197 self
, entity_type
, entity_id
, history_index
=-1, connected
=True):
198 """Return an object instance for the given entity_type and id.
200 By default the object state matches the most recent state from
201 Juju. To get an instance of the object in an older state, pass
202 history_index, an index into the history deque for the entity.
206 if history_index
< 0 and history_index
!= -1:
207 history_index
+= len(self
.entity_history(entity_type
, entity_id
))
208 if history_index
< 0:
212 self
.entity_data(entity_type
, entity_id
, history_index
)
216 entity_class
= get_entity_class(entity_type
)
218 entity_id
, self
.model
, history_index
=history_index
,
223 """An object in the Model tree"""
225 def __init__(self
, entity_id
, model
, history_index
=-1, connected
=True):
226 """Initialize a new entity
228 :param entity_id str: The unique id of the object in the model
229 :param model: The model instance in whose object tree this
231 :history_index int: The index of this object's state in the model's
232 history deque for this entity
233 :connected bool: Flag indicating whether this object gets live updates
237 self
.entity_id
= entity_id
239 self
._history
_index
= history_index
240 self
.connected
= connected
241 self
.connection
= model
.connection()
244 return '<{} entity_id="{}">'.format(type(self
).__name
__,
247 def __getattr__(self
, name
):
248 """Fetch object attributes from the underlying data dict held in the
253 return self
.safe_data
[name
]
255 name
= name
.replace('_', '-')
256 if name
in self
.safe_data
:
257 return self
.safe_data
[name
]
262 return bool(self
.data
)
264 def on_change(self
, callable_
):
265 """Add a change observer to this entity.
268 self
.model
.add_observer(
269 callable_
, self
.entity_type
, 'change', self
.entity_id
)
271 def on_remove(self
, callable_
):
272 """Add a remove observer to this entity.
275 self
.model
.add_observer(
276 callable_
, self
.entity_type
, 'remove', self
.entity_id
)
279 def entity_type(self
):
280 """A string identifying the entity type of this object, e.g.
281 'application' or 'unit', etc.
284 return self
.__class
__.__name
__.lower()
288 """Return True if this object represents the current state of the
289 entity in the underlying model.
291 This will be True except when the object represents an entity at a
292 non-latest state in history, e.g. if the object was obtained by calling
293 .previous() on another object.
296 return self
._history
_index
== -1
300 """Returns True if this entity no longer exists in the underlying
306 self
.model
.state
.entity_data(
307 self
.entity_type
, self
.entity_id
, -1) is None
312 """Returns True if this entity still exists in the underlying
320 """The data dictionary for this entity.
323 return self
.model
.state
.entity_data(
324 self
.entity_type
, self
.entity_id
, self
._history
_index
)
328 """The data dictionary for this entity.
330 If this `ModelEntity` points to the dead state, it will
331 raise `DeadEntityException`.
334 if self
.data
is None:
335 raise DeadEntityException(
336 "Entity {}:{} is dead - its attributes can no longer be "
337 "accessed. Use the .previous() method on this object to get "
338 "a copy of the object at its previous state.".format(
339 self
.entity_type
, self
.entity_id
))
343 """Return a copy of this object as was at its previous state in
346 Returns None if this object is new (and therefore has no history).
348 The returned object is always "disconnected", i.e. does not receive
352 return self
.model
.state
.get_entity(
353 self
.entity_type
, self
.entity_id
, self
._history
_index
- 1,
357 """Return a copy of this object at its next state in
360 Returns None if this object is already the latest.
362 The returned object is "disconnected", i.e. does not receive
363 live updates, unless it is current (latest).
366 if self
._history
_index
== -1:
369 new_index
= self
._history
_index
+ 1
371 new_index
== len(self
.model
.state
.entity_history(
372 self
.entity_type
, self
.entity_id
)) - 1
374 return self
.model
.state
.get_entity(
375 self
.entity_type
, self
.entity_id
, self
._history
_index
- 1,
379 """Return a copy of this object at its current state in the model.
381 Returns self if this object is already the latest.
383 The returned object is always "connected", i.e. receives
384 live updates from the model.
387 if self
._history
_index
== -1:
390 return self
.model
.state
.get_entity(self
.entity_type
, self
.entity_id
)
395 The main API for interacting with a Juju model.
404 """Instantiate a new Model.
406 The connect method will need to be called before this
407 object can be used for anything interesting.
409 If jujudata is None, jujudata.FileJujuData will be used.
411 :param loop: an asyncio event loop
412 :param max_frame_size: See
413 `juju.client.connection.Connection.MAX_FRAME_SIZE`
414 :param bakery_client httpbakery.Client: The bakery client to use
415 for macaroon authorization.
416 :param jujudata JujuData: The source for current controller information
418 self
._connector
= connector
.Connector(
420 max_frame_size
=max_frame_size
,
421 bakery_client
=bakery_client
,
424 self
._observers
= weakref
.WeakValueDictionary()
425 self
.state
= ModelState(self
)
427 self
._watch
_stopping
= asyncio
.Event(loop
=self
._connector
.loop
)
428 self
._watch
_stopped
= asyncio
.Event(loop
=self
._connector
.loop
)
429 self
._watch
_received
= asyncio
.Event(loop
=self
._connector
.loop
)
430 self
._watch
_stopped
.set()
431 self
._charmstore
= CharmStore(self
._connector
.loop
)
433 def is_connected(self
):
434 """Reports whether the Model is currently connected."""
435 return self
._connector
.is_connected()
439 return self
._connector
.loop
441 def connection(self
):
442 """Return the current Connection object. It raises an exception
443 if the Model is disconnected"""
444 return self
._connector
.connection()
446 async def get_controller(self
):
447 """Return a Controller instance for the currently connected model.
450 from juju
.controller
import Controller
451 controller
= Controller(jujudata
=self
._connector
.jujudata
)
452 kwargs
= self
.connection().connect_params()
454 await controller
._connect
_direct
(**kwargs
)
457 async def __aenter__(self
):
461 async def __aexit__(self
, exc_type
, exc
, tb
):
462 await self
.disconnect()
464 async def connect(self
, *args
, **kwargs
):
465 """Connect to a juju model.
467 This supports two calling conventions:
469 The model and (optionally) authentication information can be taken
470 from the data files created by the Juju CLI. This convention will
471 be used if a ``model_name`` is specified, or if the ``endpoint``
472 and ``uuid`` are not.
474 Otherwise, all of the ``endpoint``, ``uuid``, and authentication
475 information (``username`` and ``password``, or ``bakery_client`` and/or
476 ``macaroons``) are required.
478 If a single positional argument is given, it will be assumed to be
479 the ``model_name``. Otherwise, the first positional argument, if any,
480 must be the ``endpoint``.
482 Available parameters are:
484 :param model_name: Format [controller:][user/]model
485 :param str endpoint: The hostname:port of the controller to connect to.
486 :param str uuid: The model UUID to connect to.
487 :param str username: The username for controller-local users (or None
488 to use macaroon-based login.)
489 :param str password: The password for controller-local users.
490 :param str cacert: The CA certificate of the controller
492 :param httpbakery.Client bakery_client: The macaroon bakery client to
493 to use when performing macaroon-based login. Macaroon tokens
494 acquired when logging will be saved to bakery_client.cookies.
495 If this is None, a default bakery_client will be used.
496 :param list macaroons: List of macaroons to load into the
498 :param asyncio.BaseEventLoop loop: The event loop to use for async
500 :param int max_frame_size: The maximum websocket frame size to allow.
502 await self
.disconnect()
503 if 'endpoint' not in kwargs
and len(args
) < 2:
504 if args
and 'model_name' in kwargs
:
505 raise TypeError('connect() got multiple values for model_name')
509 model_name
= kwargs
.pop('model_name', None)
510 await self
._connector
.connect_model(model_name
, **kwargs
)
512 if 'model_name' in kwargs
:
513 raise TypeError('connect() got values for both '
514 'model_name and endpoint')
515 if args
and 'endpoint' in kwargs
:
516 raise TypeError('connect() got multiple values for endpoint')
517 if len(args
) < 2 and 'uuid' not in kwargs
:
518 raise TypeError('connect() missing value for uuid')
519 has_userpass
= (len(args
) >= 4 or
520 {'username', 'password'}.issubset(kwargs
))
521 has_macaroons
= (len(args
) >= 6 or not
522 {'bakery_client', 'macaroons'}.isdisjoint(kwargs
))
523 if not (has_userpass
or has_macaroons
):
524 raise TypeError('connect() missing auth params')
536 for i
, arg
in enumerate(args
):
537 kwargs
[arg_names
[i
]] = arg
538 if not {'endpoint', 'uuid'}.issubset(kwargs
):
539 raise ValueError('endpoint and uuid are required '
540 'if model_name not given')
541 if not ({'username', 'password'}.issubset(kwargs
) or
542 {'bakery_client', 'macaroons'}.intersection(kwargs
)):
543 raise ValueError('Authentication parameters are required '
544 'if model_name not given')
545 await self
._connector
.connect(**kwargs
)
546 await self
._after
_connect
()
548 async def connect_model(self
, model_name
):
550 .. deprecated:: 0.6.2
551 Use ``connect(model_name=model_name)`` instead.
553 return await self
.connect(model_name
=model_name
)
555 async def connect_current(self
):
557 .. deprecated:: 0.6.2
558 Use ``connect()`` instead.
560 return await self
.connect()
562 async def _connect_direct(self
, **kwargs
):
563 await self
.disconnect()
564 await self
._connector
.connect(**kwargs
)
565 await self
._after
_connect
()
567 async def _after_connect(self
):
570 # Wait for the first packet of data from the AllWatcher,
571 # which contains all information on the model.
572 # TODO this means that we can't do anything until
573 # we've received all the model data, which might be
574 # a whole load of unneeded data if all the client wants
575 # to do is make one RPC call.
576 await self
._watch
_received
.wait()
578 await self
.get_info()
580 async def disconnect(self
):
581 """Shut down the watcher task and close websockets.
584 if not self
._watch
_stopped
.is_set():
585 log
.debug('Stopping watcher task')
586 self
._watch
_stopping
.set()
587 await self
._watch
_stopped
.wait()
588 self
._watch
_stopping
.clear()
590 if self
.is_connected():
591 log
.debug('Closing model connection')
592 await self
._connector
.disconnect()
595 async def add_local_charm_dir(self
, charm_dir
, series
):
596 """Upload a local charm to the model.
598 This will automatically generate an archive from
601 :param charm_dir: Path to the charm directory
602 :param series: Charm series
605 fh
= tempfile
.NamedTemporaryFile()
606 CharmArchiveGenerator(charm_dir
).make_archive(fh
.name
)
609 self
.add_local_charm
, fh
, series
, os
.stat(fh
.name
).st_size
)
610 charm_url
= await self
._connector
.loop
.run_in_executor(None, func
)
612 log
.debug('Uploaded local charm: %s -> %s', charm_dir
, charm_url
)
615 def add_local_charm(self
, charm_file
, series
, size
=None):
616 """Upload a local charm archive to the model.
618 Returns the 'local:...' url that should be used to deploy the charm.
620 :param charm_file: Path to charm zip archive
621 :param series: Charm series
622 :param size: Size of the archive, in bytes
623 :return str: 'local:...' url for deploying the charm
624 :raises: :class:`JujuError` if the upload fails
626 Uses an https endpoint at the same host:port as the wss.
627 Supports large file uploads.
631 This method will block. Consider using :meth:`add_local_charm_dir`
635 conn
, headers
, path_prefix
= self
.connection().https_connection()
636 path
= "%s/charms?series=%s" % (path_prefix
, series
)
637 headers
['Content-Type'] = 'application/zip'
639 headers
['Content-Length'] = size
640 conn
.request("POST", path
, charm_file
, headers
)
641 response
= conn
.getresponse()
642 result
= response
.read().decode()
643 if not response
.status
== 200:
644 raise JujuError(result
)
645 result
= json
.loads(result
)
646 return result
['charm-url']
648 def all_units_idle(self
):
649 """Return True if all units are idle.
652 for unit
in self
.units
.values():
653 unit_status
= unit
.data
['agent-status']['current']
654 if unit_status
!= 'idle':
658 async def reset(self
, force
=False):
659 """Reset the model to a clean state.
661 :param bool force: Force-terminate machines.
663 This returns only after the model has reached a clean state. "Clean"
664 means no applications or machines exist in the model.
667 log
.debug('Resetting model')
668 for app
in self
.applications
.values():
670 for machine
in self
.machines
.values():
671 await machine
.destroy(force
=force
)
672 await self
.block_until(
673 lambda: len(self
.machines
) == 0
676 async def block_until(self
, *conditions
, timeout
=None, wait_period
=0.5):
677 """Return only after all conditions are true.
679 Raises `websockets.ConnectionClosed` if disconnected.
682 return not (self
.is_connected() and self
.connection().is_open
)
685 return _disconnected() or all(c() for c
in conditions
)
687 await utils
.block_until(done
,
689 wait_period
=wait_period
,
692 raise websockets
.ConnectionClosed(1006, 'no reason')
695 def applications(self
):
696 """Return a map of application-name:Application for all applications
697 currently in the model.
700 return self
.state
.applications
704 """Return a map of machine-id:Machine for all machines currently in
708 return self
.state
.machines
712 """Return a map of unit-id:Unit for all units currently in
716 return self
.state
.units
720 """Return a list of all Relations currently in the model.
723 return list(self
.state
.relations
.values())
725 async def get_info(self
):
726 """Return a client.ModelInfo object for this Model.
728 Retrieves latest info for this Model from the api server. The
729 return value is cached on the Model.info attribute so that the
730 valued may be accessed again without another api call, if
733 This method is called automatically when the Model is connected,
734 resulting in Model.info being initialized without requiring an
735 explicit call to this method.
738 facade
= client
.ClientFacade
.from_connection(self
.connection())
740 self
._info
= await facade
.ModelInfo()
741 log
.debug('Got ModelInfo: %s', vars(self
.info
))
747 """Return the cached client.ModelInfo object for this Model.
749 If Model.get_info() has not been called, this will return None.
754 self
, callable_
, entity_type
=None, action
=None, entity_id
=None,
756 """Register an "on-model-change" callback
758 Once the model is connected, ``callable_``
759 will be called each time the model changes. ``callable_`` should
760 be Awaitable and accept the following positional arguments:
762 delta - An instance of :class:`juju.delta.EntityDelta`
763 containing the raw delta data recv'd from the Juju
766 old_obj - If the delta modifies an existing object in the model,
767 old_obj will be a copy of that object, as it was before the
768 delta was applied. Will be None if the delta creates a new
771 new_obj - A copy of the new or updated object, after the delta
772 is applied. Will be None if the delta removes an entity
775 model - The :class:`Model` itself.
777 Events for which ``callable_`` is called can be specified by passing
778 entity_type, action, and/or entitiy_id filter criteria, e.g.::
782 entity_type='application', action='add', entity_id='ubuntu')
784 For more complex filtering conditions, pass a predicate function. It
785 will be called with a delta as its only argument. If the predicate
786 function returns True, the ``callable_`` will be called.
789 observer
= _Observer(
790 callable_
, entity_type
, action
, entity_id
, predicate
)
791 self
._observers
[observer
] = callable_
794 """Start an asynchronous watch against this model.
796 See :meth:`add_observer` to register an onchange callback.
799 async def _all_watcher():
801 allwatcher
= client
.AllWatcherFacade
.from_connection(
803 while not self
._watch
_stopping
.is_set():
805 results
= await utils
.run_with_interrupt(
807 self
._watch
_stopping
,
808 self
._connector
.loop
)
809 except JujuAPIError
as e
:
810 if 'watcher was stopped' not in str(e
):
812 if self
._watch
_stopping
.is_set():
813 # this shouldn't ever actually happen, because
814 # the event should trigger before the controller
815 # has a chance to tell us the watcher is stopped
816 # but handle it gracefully, just in case
818 # controller stopped our watcher for some reason
819 # but we're not actually stopping, so just restart it
821 'Watcher: watcher stopped, restarting')
824 except websockets
.ConnectionClosed
:
825 monitor
= self
.connection().monitor
826 if monitor
.status
== monitor
.ERROR
:
827 # closed unexpectedly, try to reopen
829 'Watcher: connection closed, reopening')
830 await self
.connection().reconnect()
831 if monitor
.status
!= monitor
.CONNECTED
:
832 # reconnect failed; abort and shutdown
833 log
.error('Watcher: automatic reconnect '
834 'failed; stopping watcher')
839 # closed on request, go ahead and shutdown
841 if self
._watch
_stopping
.is_set():
843 await allwatcher
.Stop()
844 except websockets
.ConnectionClosed
:
845 pass # can't stop on a closed conn
847 for delta
in results
.deltas
:
848 delta
= get_entity_delta(delta
)
849 old_obj
, new_obj
= self
.state
.apply_delta(delta
)
850 await self
._notify
_observers
(delta
, old_obj
, new_obj
)
851 self
._watch
_received
.set()
852 except CancelledError
:
855 log
.exception('Error in watcher')
858 self
._watch
_stopped
.set()
860 log
.debug('Starting watcher task')
861 self
._watch
_received
.clear()
862 self
._watch
_stopping
.clear()
863 self
._watch
_stopped
.clear()
864 self
._connector
.loop
.create_task(_all_watcher())
866 async def _notify_observers(self
, delta
, old_obj
, new_obj
):
867 """Call observing callbacks, notifying them of a change in model state
869 :param delta: The raw change from the watcher
870 (:class:`juju.client.overrides.Delta`)
871 :param old_obj: The object in the model that this delta updates.
873 :param new_obj: The object in the model that is created or updated
874 by applying this delta.
877 if new_obj
and not old_obj
:
881 'Model changed: %s %s %s',
882 delta
.entity
, delta
.type, delta
.get_id())
884 for o
in self
._observers
:
885 if o
.cares_about(delta
):
886 asyncio
.ensure_future(o(delta
, old_obj
, new_obj
, self
),
887 loop
=self
._connector
.loop
)
889 async def _wait(self
, entity_type
, entity_id
, action
, predicate
=None):
891 Block the calling routine until a given action has happened to the
894 :param entity_type: The entity's type.
895 :param entity_id: The entity's id.
896 :param action: the type of action (e.g., 'add', 'change', or 'remove')
897 :param predicate: optional callable that must take as an
898 argument a delta, and must return a boolean, indicating
899 whether the delta contains the specific action we're looking
900 for. For example, you might check to see whether a 'change'
901 has a 'completed' status. See the _Observer class for details.
904 q
= asyncio
.Queue(loop
=self
._connector
.loop
)
906 async def callback(delta
, old
, new
, model
):
907 await q
.put(delta
.get_id())
909 self
.add_observer(callback
, entity_type
, action
, entity_id
, predicate
)
910 entity_id
= await q
.get()
911 # object might not be in the entity_map if we were waiting for a
913 return self
.state
._live
_entity
_map
(entity_type
).get(entity_id
)
915 async def _wait_for_new(self
, entity_type
, entity_id
):
916 """Wait for a new object to appear in the Model and return it.
918 Waits for an object of type ``entity_type`` with id ``entity_id``
919 to appear in the model. This is similar to watching for the
920 object using ``block_until``, but uses the watcher rather than
924 # if the entity is already in the model, just return it
925 if entity_id
in self
.state
._live
_entity
_map
(entity_type
):
926 return self
.state
._live
_entity
_map
(entity_type
)[entity_id
]
927 return await self
._wait
(entity_type
, entity_id
, None)
929 async def wait_for_action(self
, action_id
):
930 """Given an action, wait for it to complete."""
932 if action_id
.startswith("action-"):
933 # if we've been passed action.tag, transform it into the
934 # id that the api deltas will use.
935 action_id
= action_id
[7:]
937 def predicate(delta
):
938 return delta
.data
['status'] in ('completed', 'failed')
940 return await self
._wait
('action', action_id
, None, predicate
)
942 async def add_machine(
943 self
, spec
=None, constraints
=None, disks
=None, series
=None):
944 """Start a new, empty machine and optionally a container, or add a
945 container to a machine.
947 :param str spec: Machine specification
950 (None) - starts a new machine
951 'lxd' - starts a new machine with one lxd container
952 'lxd:4' - starts a new lxd container on machine 4
953 'ssh:user@10.10.0.3:/path/to/private/key' - manually provision
954 a machine with ssh and the private key used for authentication
955 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
956 'maas2.name' - acquire machine maas2.name on MAAS
958 :param dict constraints: Machine constraints, which can contain the
979 :param list disks: List of disk constraint dictionaries, which can
980 contain the following keys::
994 :param str series: Series, e.g. 'xenial'
996 Supported container types are: lxd, kvm
998 When deploying a container to an existing machine, constraints cannot
1002 params
= client
.AddMachineParams()
1005 if spec
.startswith("ssh:"):
1006 placement
, target
, private_key_path
= spec
.split(":")
1007 user
, host
= target
.split("@")
1009 sshProvisioner
= provisioner
.SSHProvisioner(
1012 private_key_path
=private_key_path
,
1015 params
= sshProvisioner
.provision_machine()
1017 placement
= parse_placement(spec
)
1019 params
.placement
= placement
[0]
1021 params
.jobs
= ['JobHostUnits']
1024 params
.constraints
= client
.Value
.from_json(constraints
)
1028 client
.Constraints
.from_json(o
) for o
in disks
]
1031 params
.series
= series
1033 # Submit the request.
1034 client_facade
= client
.ClientFacade
.from_connection(self
.connection())
1035 results
= await client_facade
.AddMachines([params
])
1036 error
= results
.machines
[0].error
1038 raise ValueError("Error adding machine: %s" % error
.message
)
1039 machine_id
= results
.machines
[0].machine
1042 if spec
.startswith("ssh:"):
1043 # Need to run this after AddMachines has been called,
1044 # as we need the machine_id
1045 await sshProvisioner
.install_agent(
1051 log
.debug('Added new machine %s', machine_id
)
1052 return await self
._wait
_for
_new
('machine', machine_id
)
1054 async def add_relation(self
, relation1
, relation2
):
1055 """Add a relation between two applications.
1057 :param str relation1: '<application>[:<relation_name>]'
1058 :param str relation2: '<application>[:<relation_name>]'
1061 connection
= self
.connection()
1062 app_facade
= client
.ApplicationFacade
.from_connection(connection
)
1065 'Adding relation %s <-> %s', relation1
, relation2
)
1067 def _find_relation(*specs
):
1068 for rel
in self
.relations
:
1069 if rel
.matches(*specs
):
1074 result
= await app_facade
.AddRelation([relation1
, relation2
])
1075 except JujuAPIError
as e
:
1076 if 'relation already exists' not in e
.message
:
1078 rel
= _find_relation(relation1
, relation2
)
1081 raise JujuError('Relation {} {} exists but not in model'.format(
1082 relation1
, relation2
))
1084 specs
= ['{}:{}'.format(app
, data
['name'])
1085 for app
, data
in result
.endpoints
.items()]
1087 await self
.block_until(lambda: _find_relation(*specs
) is not None)
1088 return _find_relation(*specs
)
1090 def add_space(self
, name
, *cidrs
):
1091 """Add a new network space.
1093 Adds a new space with the given name and associates the given
1094 (optional) list of existing subnet CIDRs with it.
1096 :param str name: Name of the space
1097 :param \*cidrs: Optional list of existing subnet CIDRs
1100 raise NotImplementedError()
1102 async def add_ssh_key(self
, user
, key
):
1103 """Add a public SSH key to this model.
1105 :param str user: The username of the user
1106 :param str key: The public ssh key
1109 key_facade
= client
.KeyManagerFacade
.from_connection(self
.connection())
1110 return await key_facade
.AddKeys([key
], user
)
1111 add_ssh_keys
= add_ssh_key
1113 def add_subnet(self
, cidr_or_id
, space
, *zones
):
1114 """Add an existing subnet to this model.
1116 :param str cidr_or_id: CIDR or provider ID of the existing subnet
1117 :param str space: Network space with which to associate
1118 :param str \*zones: Zone(s) in which the subnet resides
1121 raise NotImplementedError()
1123 def get_backups(self
):
1124 """Retrieve metadata for backups in this model.
1127 raise NotImplementedError()
1129 def block(self
, *commands
):
1130 """Add a new block to this model.
1132 :param str \*commands: The commands to block. Valid values are
1133 'all-changes', 'destroy-model', 'remove-object'
1136 raise NotImplementedError()
1138 def get_blocks(self
):
1139 """List blocks for this model.
1142 raise NotImplementedError()
1144 def get_cached_images(self
, arch
=None, kind
=None, series
=None):
1145 """Return a list of cached OS images.
1147 :param str arch: Filter by image architecture
1148 :param str kind: Filter by image kind, e.g. 'lxd'
1149 :param str series: Filter by image series, e.g. 'xenial'
1152 raise NotImplementedError()
1154 def create_backup(self
, note
=None, no_download
=False):
1155 """Create a backup of this model.
1157 :param str note: A note to store with the backup
1158 :param bool no_download: Do not download the backup archive
1159 :return str: Path to downloaded archive
1162 raise NotImplementedError()
1164 def create_storage_pool(self
, name
, provider_type
, **pool_config
):
1165 """Create or define a storage pool.
1167 :param str name: Name to give the storage pool
1168 :param str provider_type: Pool provider type
1169 :param \*\*pool_config: key/value pool configuration pairs
1172 raise NotImplementedError()
1175 self
, no_tail
=False, exclude_module
=None, include_module
=None,
1176 include
=None, level
=None, limit
=0, lines
=10, replay
=False,
1178 """Get log messages for this model.
1180 :param bool no_tail: Stop after returning existing log messages
1181 :param list exclude_module: Do not show log messages for these logging
1183 :param list include_module: Only show log messages for these logging
1185 :param list include: Only show log messages for these entities
1186 :param str level: Log level to show, valid options are 'TRACE',
1187 'DEBUG', 'INFO', 'WARNING', 'ERROR,
1188 :param int limit: Return this many of the most recent (possibly
1189 filtered) lines are shown
1190 :param int lines: Yield this many of the most recent lines, and keep
1192 :param bool replay: Yield the entire log, and keep yielding
1193 :param list exclude: Do not show log messages for these entities
1196 raise NotImplementedError()
1198 def _get_series(self
, entity_url
, entity
):
1199 # try to get the series from the provided charm URL
1200 if entity_url
.startswith('cs:'):
1201 parts
= entity_url
[3:].split('/')
1203 parts
= entity_url
.split('/')
1204 if parts
[0].startswith('~'):
1207 # series was specified in the URL
1209 # series was not supplied at all, so use the newest
1210 # supported series according to the charm store
1211 ss
= entity
['Meta']['supported-series']
1212 return ss
['SupportedSeries'][0]
1215 self
, entity_url
, application_name
=None, bind
=None, budget
=None,
1216 channel
=None, config
=None, constraints
=None, force
=False,
1217 num_units
=1, plan
=None, resources
=None, series
=None, storage
=None,
1219 """Deploy a new service or bundle.
1221 :param str entity_url: Charm or bundle url
1222 :param str application_name: Name to give the service
1223 :param dict bind: <charm endpoint>:<network space> pairs
1224 :param dict budget: <budget name>:<limit> pairs
1225 :param str channel: Charm store channel from which to retrieve
1226 the charm or bundle, e.g. 'edge'
1227 :param dict config: Charm configuration dictionary
1228 :param constraints: Service constraints
1229 :type constraints: :class:`juju.Constraints`
1230 :param bool force: Allow charm to be deployed to a machine running
1231 an unsupported series
1232 :param int num_units: Number of units to deploy
1233 :param str plan: Plan under which to deploy charm
1234 :param dict resources: <resource name>:<file path> pairs
1235 :param str series: Series on which to deploy
1236 :param dict storage: Storage constraints TODO how do these look?
1237 :param to: Placement directive as a string. For example:
1239 '23' - place on machine 23
1240 'lxd:7' - place in new lxd container on machine 7
1241 '24/lxd/3' - place in container 3 on machine 24
1243 If None, a new machine is provisioned.
1248 - support local resources
1253 k
: client
.Constraints(**v
)
1254 for k
, v
in storage
.items()
1257 entity_path
= Path(entity_url
.replace('local:', ''))
1258 bundle_path
= entity_path
/ 'bundle.yaml'
1259 metadata_path
= entity_path
/ 'metadata.yaml'
1262 entity_url
.startswith('local:') or
1263 entity_path
.is_dir() or
1264 entity_path
.is_file()
1267 entity_id
= entity_url
.replace('local:', '')
1269 entity
= await self
.charmstore
.entity(entity_url
, channel
=channel
)
1270 entity_id
= entity
['Id']
1272 client_facade
= client
.ClientFacade
.from_connection(self
.connection())
1274 is_bundle
= ((is_local
and
1275 (entity_id
.endswith('.yaml') and entity_path
.exists()) or
1276 bundle_path
.exists()) or
1277 (not is_local
and 'bundle/' in entity_id
))
1280 handler
= BundleHandler(self
)
1281 await handler
.fetch_plan(entity_id
)
1282 await handler
.execute_plan()
1283 extant_apps
= {app
for app
in self
.applications
}
1284 pending_apps
= set(handler
.applications
) - extant_apps
1286 # new apps will usually be in the model by now, but if some
1287 # haven't made it yet we'll need to wait on them to be added
1288 await asyncio
.gather(*[
1289 asyncio
.ensure_future(
1290 self
._wait
_for
_new
('application', app_name
),
1291 loop
=self
._connector
.loop
)
1292 for app_name
in pending_apps
1293 ], loop
=self
._connector
.loop
)
1294 return [app
for name
, app
in self
.applications
.items()
1295 if name
in handler
.applications
]
1298 if not application_name
:
1299 application_name
= entity
['Meta']['charm-metadata']['Name']
1301 series
= self
._get
_series
(entity_url
, entity
)
1302 await client_facade
.AddCharm(channel
, entity_id
)
1303 # XXX: we're dropping local resources here, but we don't
1304 # actually support them yet anyway
1305 resources
= await self
._add
_store
_resources
(application_name
,
1309 if not application_name
:
1310 metadata
= yaml
.load(metadata_path
.read_text())
1311 application_name
= metadata
['name']
1312 # We have a local charm dir that needs to be uploaded
1313 charm_dir
= os
.path
.abspath(
1314 os
.path
.expanduser(entity_id
))
1315 series
= series
or get_charm_series(charm_dir
)
1318 "Couldn't determine series for charm at {}. "
1319 "Pass a 'series' kwarg to Model.deploy().".format(
1321 entity_id
= await self
.add_local_charm_dir(charm_dir
, series
)
1322 return await self
._deploy
(
1323 charm_url
=entity_id
,
1324 application
=application_name
,
1326 config
=config
or {},
1327 constraints
=constraints
,
1328 endpoint_bindings
=bind
,
1329 resources
=resources
,
1332 num_units
=num_units
,
1333 placement
=parse_placement(to
)
1336 async def _add_store_resources(self
, application
, entity_url
, entity
=None):
1338 # avoid extra charm store call if one was already made
1339 entity
= await self
.charmstore
.entity(entity_url
)
1342 'description': resource
['Description'],
1343 'fingerprint': resource
['Fingerprint'],
1344 'name': resource
['Name'],
1345 'path': resource
['Path'],
1346 'revision': resource
['Revision'],
1347 'size': resource
['Size'],
1348 'type_': resource
['Type'],
1350 } for resource
in entity
['Meta']['resources']
1356 resources_facade
= client
.ResourcesFacade
.from_connection(
1358 response
= await resources_facade
.AddPendingResources(
1359 tag
.application(application
),
1361 [client
.CharmResource(**resource
) for resource
in resources
])
1362 resource_map
= {resource
['name']: pid
1364 in zip(resources
, response
.pending_ids
)}
1367 async def _deploy(self
, charm_url
, application
, series
, config
,
1368 constraints
, endpoint_bindings
, resources
, storage
,
1369 channel
=None, num_units
=None, placement
=None):
1370 """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
1372 log
.info('Deploying %s', charm_url
)
1374 # stringify all config values for API, and convert to YAML
1375 config
= {k
: str(v
) for k
, v
in config
.items()}
1376 config
= yaml
.dump({application
: config
},
1377 default_flow_style
=False)
1379 app_facade
= client
.ApplicationFacade
.from_connection(
1382 app
= client
.ApplicationDeploy(
1383 charm_url
=charm_url
,
1384 application
=application
,
1388 constraints
=parse_constraints(constraints
),
1389 endpoint_bindings
=endpoint_bindings
,
1390 num_units
=num_units
,
1391 resources
=resources
,
1395 result
= await app_facade
.Deploy([app
])
1396 errors
= [r
.error
.message
for r
in result
.results
if r
.error
]
1398 raise JujuError('\n'.join(errors
))
1399 return await self
._wait
_for
_new
('application', application
)
1401 async def destroy(self
):
1402 """Terminate all machines and resources for this model.
1403 Is already implemented in controller.py.
1405 raise NotImplementedError()
1407 async def destroy_unit(self
, *unit_names
):
1408 """Destroy units by name.
1411 connection
= self
.connection()
1412 app_facade
= client
.ApplicationFacade
.from_connection(connection
)
1415 'Destroying unit%s %s',
1416 's' if len(unit_names
) == 1 else '',
1417 ' '.join(unit_names
))
1419 return await app_facade
.DestroyUnits(list(unit_names
))
1420 destroy_units
= destroy_unit
1422 def get_backup(self
, archive_id
):
1423 """Download a backup archive file.
1425 :param str archive_id: The id of the archive to download
1426 :return str: Path to the archive file
1429 raise NotImplementedError()
1432 self
, num_controllers
=0, constraints
=None, series
=None, to
=None):
1433 """Ensure sufficient controllers exist to provide redundancy.
1435 :param int num_controllers: Number of controllers to make available
1436 :param constraints: Constraints to apply to the controller machines
1437 :type constraints: :class:`juju.Constraints`
1438 :param str series: Series of the controller machines
1439 :param list to: Placement directives for controller machines, e.g.::
1442 'lxc:7' - new lxc container on machine 7
1443 '24/lxc/3' - lxc container 3 or machine 24
1445 If None, a new machine is provisioned.
1448 raise NotImplementedError()
1450 async def get_config(self
):
1451 """Return the configuration settings for this model.
1453 :returns: A ``dict`` mapping keys to `ConfigValue` instances,
1454 which have `source` and `value` attributes.
1456 config_facade
= client
.ModelConfigFacade
.from_connection(
1459 result
= await config_facade
.ModelGet()
1460 config
= result
.config
1461 for key
, value
in config
.items():
1462 config
[key
] = ConfigValue
.from_json(value
)
1465 async def get_constraints(self
):
1466 """Return the machine constraints for this model.
1468 :returns: A ``dict`` of constraints.
1471 client_facade
= client
.ClientFacade
.from_connection(self
.connection())
1472 result
= await client_facade
.GetModelConstraints()
1474 # GetModelConstraints returns GetConstraintsResults which has a 'constraints'
1475 # attribute. If no constraints have been set GetConstraintsResults.constraints
1476 # is None. Otherwise GetConstraintsResults.constraints has an attribute for each
1477 # possible constraint, each of these in turn will be None if they have not been
1479 if result
.constraints
:
1480 constraint_types
= [a
for a
in dir(result
.constraints
)
1481 if a
in Value
._toSchema
.keys()]
1482 for constraint
in constraint_types
:
1483 value
= getattr(result
.constraints
, constraint
)
1484 if value
is not None:
1485 constraints
[constraint
] = getattr(result
.constraints
, constraint
)
1488 def import_ssh_key(self
, identity
):
1489 """Add a public SSH key from a trusted indentity source to this model.
1491 :param str identity: User identity in the form <lp|gh>:<username>
1494 raise NotImplementedError()
1495 import_ssh_keys
= import_ssh_key
1497 async def get_machines(self
):
1498 """Return list of machines in this model.
1501 return list(self
.state
.machines
.keys())
1503 def get_shares(self
):
1504 """Return list of all users with access to this model.
1507 raise NotImplementedError()
1509 def get_spaces(self
):
1510 """Return list of all known spaces, including associated subnets.
1513 raise NotImplementedError()
1515 async def get_ssh_key(self
, raw_ssh
=False):
1516 """Return known SSH keys for this model.
1517 :param bool raw_ssh: if True, returns the raw ssh key,
1518 else it's fingerprint
1521 key_facade
= client
.KeyManagerFacade
.from_connection(self
.connection())
1522 entity
= {'tag': tag
.model(self
.info
.uuid
)}
1523 entities
= client
.Entities([entity
])
1524 return await key_facade
.ListKeys(entities
, raw_ssh
)
1525 get_ssh_keys
= get_ssh_key
1527 def get_storage(self
, filesystem
=False, volume
=False):
1528 """Return details of storage instances.
1530 :param bool filesystem: Include filesystem storage
1531 :param bool volume: Include volume storage
1534 raise NotImplementedError()
1536 def get_storage_pools(self
, names
=None, providers
=None):
1537 """Return list of storage pools.
1539 :param list names: Only include pools with these names
1540 :param list providers: Only include pools for these providers
1543 raise NotImplementedError()
1545 def get_subnets(self
, space
=None, zone
=None):
1546 """Return list of known subnets.
1548 :param str space: Only include subnets in this space
1549 :param str zone: Only include subnets in this zone
1552 raise NotImplementedError()
1554 def remove_blocks(self
):
1555 """Remove all blocks from this model.
1558 raise NotImplementedError()
1560 def remove_backup(self
, backup_id
):
1563 :param str backup_id: The id of the backup to remove
1566 raise NotImplementedError()
1568 def remove_cached_images(self
, arch
=None, kind
=None, series
=None):
1569 """Remove cached OS images.
1571 :param str arch: Architecture of the images to remove
1572 :param str kind: Image kind to remove, e.g. 'lxd'
1573 :param str series: Image series to remove, e.g. 'xenial'
1576 raise NotImplementedError()
1578 def remove_machine(self
, *machine_ids
):
1579 """Remove a machine from this model.
1581 :param str \*machine_ids: Ids of the machines to remove
1584 raise NotImplementedError()
1585 remove_machines
= remove_machine
1587 async def remove_ssh_key(self
, user
, key
):
1588 """Remove a public SSH key(s) from this model.
1590 :param str key: Full ssh key
1591 :param str user: Juju user to which the key is registered
1594 key_facade
= client
.KeyManagerFacade
.from_connection(self
.connection())
1595 key
= base64
.b64decode(bytes(key
.strip().split()[1].encode('ascii')))
1596 key
= hashlib
.md5(key
).hexdigest()
1597 key
= ':'.join(a
+ b
for a
, b
in zip(key
[::2], key
[1::2]))
1598 await key_facade
.DeleteKeys([key
], user
)
1599 remove_ssh_keys
= remove_ssh_key
1602 self
, bootstrap
=False, constraints
=None, archive
=None,
1603 backup_id
=None, upload_tools
=False):
1604 """Restore a backup archive to a new controller.
1606 :param bool bootstrap: Bootstrap a new state machine
1607 :param constraints: Model constraints
1608 :type constraints: :class:`juju.Constraints`
1609 :param str archive: Path to backup archive to restore
1610 :param str backup_id: Id of backup to restore
1611 :param bool upload_tools: Upload tools if bootstrapping a new machine
1614 raise NotImplementedError()
1616 def retry_provisioning(self
):
1617 """Retry provisioning for failed machines.
1620 raise NotImplementedError()
1622 def run(self
, command
, timeout
=None):
1623 """Run command on all machines in this model.
1625 :param str command: The command to run
1626 :param int timeout: Time to wait before command is considered failed
1629 raise NotImplementedError()
1631 async def set_config(self
, config
):
1632 """Set configuration keys on this model.
1634 :param dict config: Mapping of config keys to either string values or
1635 `ConfigValue` instances, as returned by `get_config`.
1637 config_facade
= client
.ModelConfigFacade
.from_connection(
1640 for key
, value
in config
.items():
1641 if isinstance(value
, ConfigValue
):
1642 config
[key
] = value
.value
1643 await config_facade
.ModelSet(config
)
1645 async def set_constraints(self
, constraints
):
1646 """Set machine constraints on this model.
1648 :param dict config: Mapping of model constraints
1650 client_facade
= client
.ClientFacade
.from_connection(self
.connection())
1651 await client_facade
.SetModelConstraints(
1653 constraints
=constraints
)
1655 async def get_action_output(self
, action_uuid
, wait
=None):
1656 """Get the results of an action by ID.
1658 :param str action_uuid: Id of the action
1659 :param int wait: Time in seconds to wait for action to complete.
1660 :return dict: Output from action
1661 :raises: :class:`JujuError` if invalid action_uuid
1663 action_facade
= client
.ActionFacade
.from_connection(
1666 entity
= [{'tag': tag
.action(action_uuid
)}]
1667 # Cannot use self.wait_for_action as the action event has probably
1668 # already happened and self.wait_for_action works by processing
1669 # model deltas and checking if they match our type. If the action
1670 # has already occured then the delta has gone.
1672 async def _wait_for_action_status():
1674 action_output
= await action_facade
.Actions(entity
)
1675 if action_output
.results
[0].status
in ('completed', 'failed'):
1678 await asyncio
.sleep(1)
1679 await asyncio
.wait_for(
1680 _wait_for_action_status(),
1682 action_output
= await action_facade
.Actions(entity
)
1683 # ActionResult.output is None if the action produced no output
1684 if action_output
.results
[0].output
is None:
1687 output
= action_output
.results
[0].output
1690 async def get_action_status(self
, uuid_or_prefix
=None, name
=None):
1691 """Get the status of all actions, filtered by ID, ID prefix, or name.
1693 :param str uuid_or_prefix: Filter by action uuid or prefix
1694 :param str name: Filter by action name
1699 action_facade
= client
.ActionFacade
.from_connection(
1703 name_results
= await action_facade
.FindActionsByNames([name
])
1704 action_results
.extend(name_results
.actions
[0].actions
)
1706 # Collect list of actions matching uuid or prefix
1707 matching_actions
= await action_facade
.FindActionTagsByPrefix(
1710 for actions
in matching_actions
.matches
.values():
1711 entities
= [{'tag': a
.tag
} for a
in actions
]
1712 # Get action results matching action tags
1713 uuid_results
= await action_facade
.Actions(entities
)
1714 action_results
.extend(uuid_results
.results
)
1715 for a
in action_results
:
1716 results
[tag
.untag('action-', a
.action
.tag
)] = a
.status
1719 def get_budget(self
, budget_name
):
1720 """Get budget usage info.
1722 :param str budget_name: Name of budget
1725 raise NotImplementedError()
1727 async def get_status(self
, filters
=None, utc
=False):
1728 """Return the status of the model.
1730 :param str filters: Optional list of applications, units, or machines
1731 to include, which can use wildcards ('*').
1732 :param bool utc: Display time as UTC in RFC3339 format
1735 client_facade
= client
.ClientFacade
.from_connection(self
.connection())
1736 return await client_facade
.FullStatus(filters
)
1739 self
, all_
=False, destination
=None, dry_run
=False, public
=False,
1740 source
=None, stream
=None, version
=None):
1741 """Copy Juju tools into this model.
1743 :param bool all_: Copy all versions, not just the latest
1744 :param str destination: Path to local destination directory
1745 :param bool dry_run: Don't do the actual copy
1746 :param bool public: Tools are for a public cloud, so generate mirrors
1748 :param str source: Path to local source directory
1749 :param str stream: Simplestreams stream for which to sync metadata
1750 :param str version: Copy a specific major.minor version
1753 raise NotImplementedError()
1755 def unblock(self
, *commands
):
1756 """Unblock an operation that would alter this model.
1758 :param str \*commands: The commands to unblock. Valid values are
1759 'all-changes', 'destroy-model', 'remove-object'
1762 raise NotImplementedError()
1764 def unset_config(self
, *keys
):
1765 """Unset configuration on this model.
1767 :param str \*keys: The keys to unset
1770 raise NotImplementedError()
1772 def upgrade_gui(self
):
1773 """Upgrade the Juju GUI for this model.
1776 raise NotImplementedError()
1779 self
, dry_run
=False, reset_previous_upgrade
=False,
1780 upload_tools
=False, version
=None):
1781 """Upgrade Juju on all machines in a model.
1783 :param bool dry_run: Don't do the actual upgrade
1784 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1786 :param bool upload_tools: Upload local version of tools
1787 :param str version: Upgrade to a specific version
1790 raise NotImplementedError()
1792 def upload_backup(self
, archive_path
):
1793 """Store a backup archive remotely in Juju.
1795 :param str archive_path: Path to local archive
1798 raise NotImplementedError()
1801 def charmstore(self
):
1802 return self
._charmstore
1804 async def get_metrics(self
, *tags
):
1805 """Retrieve metrics.
1807 :param str \*tags: Tags of entities from which to retrieve metrics.
1808 No tags retrieves the metrics of all units in the model.
1809 :return: Dictionary of unit_name:metrics
1812 log
.debug("Retrieving metrics for %s",
1813 ', '.join(tags
) if tags
else "all units")
1815 metrics_facade
= client
.MetricsDebugFacade
.from_connection(
1818 entities
= [client
.Entity(tag
) for tag
in tags
]
1819 metrics_result
= await metrics_facade
.GetMetrics(entities
)
1821 metrics
= collections
.defaultdict(list)
1823 for entity_metrics
in metrics_result
.results
:
1824 error
= entity_metrics
.error
1826 if "is not a valid tag" in error
:
1827 raise ValueError(error
.message
)
1829 raise Exception(error
.message
)
1831 for metric
in entity_metrics
.metrics
:
1832 metrics
[metric
.unit
].append(vars(metric
))
1837 def get_charm_series(path
):
1838 """Inspects the charm directory at ``path`` and returns a default
1839 series from its metadata.yaml (the first item in the 'series' list).
1841 Returns None if no series can be determined.
1844 md
= Path(path
) / "metadata.yaml"
1847 data
= yaml
.load(md
.open())
1848 series
= data
.get('series')
1849 return series
[0] if series
else None
1852 class BundleHandler
:
1854 Handle bundles by using the API to translate bundle YAML into a plan of
1855 steps and then dispatching each of those using the API.
1857 def __init__(self
, model
):
1859 self
.charmstore
= model
.charmstore
1861 self
.references
= {}
1862 self
._units
_by
_app
= {}
1863 for unit_name
, unit
in model
.units
.items():
1864 app_units
= self
._units
_by
_app
.setdefault(unit
.application
, [])
1865 app_units
.append(unit_name
)
1866 self
.client_facade
= client
.ClientFacade
.from_connection(
1868 self
.app_facade
= client
.ApplicationFacade
.from_connection(
1870 self
.ann_facade
= client
.AnnotationsFacade
.from_connection(
1873 async def _handle_local_charms(self
, bundle
):
1874 """Search for references to local charms (i.e. filesystem paths)
1875 in the bundle. Upload the local charms to the model, and replace
1876 the filesystem paths with appropriate 'local:' paths in the bundle.
1878 Return the modified bundle.
1880 :param dict bundle: Bundle dictionary
1881 :return: Modified bundle dictionary
1886 default_series
= bundle
.get('series')
1887 apps_dict
= bundle
.get('applications', bundle
.get('services', {}))
1888 for app_name
in self
.applications
:
1889 app_dict
= apps_dict
[app_name
]
1890 charm_dir
= os
.path
.abspath(os
.path
.expanduser(app_dict
['charm']))
1891 if not os
.path
.isdir(charm_dir
):
1894 app_dict
.get('series') or
1896 get_charm_series(charm_dir
)
1900 "Couldn't determine series for charm at {}. "
1901 "Add a 'series' key to the bundle.".format(charm_dir
))
1903 # Keep track of what we need to update. We keep a list of apps
1904 # that need to be updated, and a corresponding list of args
1905 # needed to update those apps.
1906 apps
.append(app_name
)
1907 args
.append((charm_dir
, series
))
1910 # If we have apps to update, spawn all the coroutines concurrently
1911 # and wait for them to finish.
1912 charm_urls
= await asyncio
.gather(*[
1913 self
.model
.add_local_charm_dir(*params
)
1915 ], loop
=self
.model
.loop
)
1916 # Update the 'charm:' entry for each app with the new 'local:' url.
1917 for app_name
, charm_url
in zip(apps
, charm_urls
):
1918 apps_dict
[app_name
]['charm'] = charm_url
1922 async def fetch_plan(self
, entity_id
):
1923 is_local
= not entity_id
.startswith('cs:')
1925 if is_local
and os
.path
.isfile(entity_id
):
1926 bundle_yaml
= Path(entity_id
).read_text()
1927 elif is_local
and os
.path
.isdir(entity_id
):
1928 bundle_yaml
= (Path(entity_id
) / "bundle.yaml").read_text()
1930 bundle_yaml
= await self
.charmstore
.files(entity_id
,
1931 filename
='bundle.yaml',
1933 self
.bundle
= yaml
.safe_load(bundle_yaml
)
1934 self
.bundle
= await self
._handle
_local
_charms
(self
.bundle
)
1936 self
.plan
= await self
.client_facade
.GetBundleChanges(
1937 yaml
.dump(self
.bundle
))
1939 if self
.plan
.errors
:
1940 raise JujuError(self
.plan
.errors
)
1942 async def execute_plan(self
):
1943 for step
in self
.plan
.changes
:
1944 method
= getattr(self
, step
.method
)
1945 result
= await method(*step
.args
)
1946 self
.references
[step
.id_
] = result
1949 def applications(self
):
1950 apps_dict
= self
.bundle
.get('applications',
1951 self
.bundle
.get('services', {}))
1952 return list(apps_dict
.keys())
1954 def resolve(self
, reference
):
1955 if reference
and reference
.startswith('$'):
1956 reference
= self
.references
[reference
[1:]]
1959 async def addCharm(self
, charm
, series
):
1961 :param charm string:
1962 Charm holds the URL of the charm to be added.
1964 :param series string:
1965 Series holds the series of the charm to be added
1966 if the charm default is not sufficient.
1968 # We don't add local charms because they've already been added
1969 # by self._handle_local_charms
1970 if charm
.startswith('local:'):
1973 entity_id
= await self
.charmstore
.entityId(charm
)
1974 log
.debug('Adding %s', entity_id
)
1975 await self
.client_facade
.AddCharm(None, entity_id
)
1978 async def addMachines(self
, params
=None):
1981 Dictionary specifying the machine to add. All keys are optional.
1984 series: string specifying the machine OS series.
1986 constraints: string holding machine constraints, if any. We'll
1987 parse this into the json friendly dict that the juju api
1990 container_type: string holding the type of the container (for
1991 instance ""lxd" or kvm"). It is not specified for top level
1994 parent_id: string holding a placeholder pointing to another
1995 machine change or to a unit change. This value is only
1996 specified in the case this machine is a container, in
1997 which case also ContainerType is set.
2000 params
= params
or {}
2003 params
= {normalize_key(k
): params
[k
] for k
in params
.keys()}
2005 # Fix up values, as necessary.
2006 if 'parent_id' in params
:
2007 if params
['parent_id'].startswith('$addUnit'):
2008 unit
= self
.resolve(params
['parent_id'])[0]
2009 params
['parent_id'] = unit
.machine
.entity_id
2011 params
['parent_id'] = self
.resolve(params
['parent_id'])
2013 params
['constraints'] = parse_constraints(
2014 params
.get('constraints'))
2015 params
['jobs'] = params
.get('jobs', ['JobHostUnits'])
2017 if params
.get('container_type') == 'lxc':
2018 log
.warning('Juju 2.0 does not support lxc containers. '
2019 'Converting containers to lxd.')
2020 params
['container_type'] = 'lxd'
2022 # Submit the request.
2023 params
= client
.AddMachineParams(**params
)
2024 results
= await self
.client_facade
.AddMachines([params
])
2025 error
= results
.machines
[0].error
2027 raise ValueError("Error adding machine: %s" % error
.message
)
2028 machine
= results
.machines
[0].machine
2029 log
.debug('Added new machine %s', machine
)
2032 async def addRelation(self
, endpoint1
, endpoint2
):
2034 :param endpoint1 string:
2035 :param endpoint2 string:
2036 Endpoint1 and Endpoint2 hold relation endpoints in the
2037 "application:interface" form, where the application is always a
2038 placeholder pointing to an application change, and the interface is
2039 optional. Examples are "$deploy-42:web" or just "$deploy-42".
2041 endpoints
= [endpoint1
, endpoint2
]
2042 # resolve indirect references
2043 for i
in range(len(endpoints
)):
2044 parts
= endpoints
[i
].split(':')
2045 parts
[0] = self
.resolve(parts
[0])
2046 endpoints
[i
] = ':'.join(parts
)
2048 log
.info('Relating %s <-> %s', *endpoints
)
2049 return await self
.model
.add_relation(*endpoints
)
2051 async def deploy(self
, charm
, series
, application
, options
, constraints
,
2052 storage
, endpoint_bindings
, resources
):
2054 :param charm string:
2055 Charm holds the URL of the charm to be used to deploy this
2058 :param series string:
2059 Series holds the series of the application to be deployed
2060 if the charm default is not sufficient.
2062 :param application string:
2063 Application holds the application name.
2065 :param options map[string]interface{}:
2066 Options holds application options.
2068 :param constraints string:
2069 Constraints holds the optional application constraints.
2071 :param storage map[string]string:
2072 Storage holds the optional storage constraints.
2074 :param endpoint_bindings map[string]string:
2075 EndpointBindings holds the optional endpoint bindings
2077 :param resources map[string]int:
2078 Resources identifies the revision to use for each resource
2079 of the application's charm.
2081 # resolve indirect references
2082 charm
= self
.resolve(charm
)
2083 # the bundle plan doesn't actually do anything with resources, even
2084 # though it ostensibly gives us something (None) for that param
2085 if not charm
.startswith('local:'):
2086 resources
= await self
.model
._add
_store
_resources
(application
,
2088 await self
.model
._deploy
(
2090 application
=application
,
2093 constraints
=constraints
,
2094 endpoint_bindings
=endpoint_bindings
,
2095 resources
=resources
,
2100 async def addUnit(self
, application
, to
):
2102 :param application string:
2103 Application holds the application placeholder name for which a unit
2107 To holds the optional location where to add the unit, as a
2108 placeholder pointing to another unit change or to a machine change.
2110 application
= self
.resolve(application
)
2111 placement
= self
.resolve(to
)
2112 if self
._units
_by
_app
.get(application
):
2113 # enough units for this application already exist;
2114 # claim one, and carry on
2115 # NB: this should probably honor placement, but the juju client
2116 # doesn't, so we're not bothering, either
2117 unit_name
= self
._units
_by
_app
[application
].pop()
2118 log
.debug('Reusing unit %s for %s', unit_name
, application
)
2119 return self
.model
.units
[unit_name
]
2121 log
.debug('Adding new unit for %s%s', application
,
2122 ' to %s' % placement
if placement
else '')
2123 return await self
.model
.applications
[application
].add_unit(
2128 async def expose(self
, application
):
2130 :param application string:
2131 Application holds the placeholder name of the application that must
2134 application
= self
.resolve(application
)
2135 log
.info('Exposing %s', application
)
2136 return await self
.model
.applications
[application
].expose()
2138 async def setAnnotations(self
, id_
, entity_type
, annotations
):
2141 Id is the placeholder for the application or machine change
2142 corresponding to the entity to be annotated.
2144 :param entity_type EntityType:
2145 EntityType holds the type of the entity, "application" or
2148 :param annotations map[string]string:
2149 Annotations holds the annotations as key/value pairs.
2151 entity_id
= self
.resolve(id_
)
2153 entity
= self
.model
.state
.get_entity(entity_type
, entity_id
)
2155 entity
= await self
.model
._wait
_for
_new
(entity_type
, entity_id
)
2156 return await entity
.set_annotations(annotations
)
2161 Async wrapper around theblues.charmstore.CharmStore
2163 def __init__(self
, loop
):
2165 self
._cs
= theblues
.charmstore
.CharmStore(timeout
=5)
2167 def __getattr__(self
, name
):
2169 Wrap method calls in coroutines that use run_in_executor to make them
2172 attr
= getattr(self
._cs
, name
)
2173 if not callable(attr
):
2174 wrapper
= partial(getattr, self
._cs
, name
)
2175 setattr(self
, name
, wrapper
)
2177 async def coro(*args
, **kwargs
):
2178 method
= partial(attr
, *args
, **kwargs
)
2179 for attempt
in range(1, 4):
2181 return await self
.loop
.run_in_executor(None, method
)
2182 except theblues
.errors
.ServerError
:
2185 await asyncio
.sleep(1, loop
=self
.loop
)
2186 setattr(self
, name
, coro
)
2191 class CharmArchiveGenerator
:
2193 Create a Zip archive of a local charm directory for upload to a controller.
2195 This is used automatically by
2196 `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
2198 def __init__(self
, path
):
2199 self
.path
= os
.path
.abspath(os
.path
.expanduser(path
))
2201 def make_archive(self
, path
):
2202 """Create archive of directory and write to ``path``.
2204 :param path: Path to archive
2208 * build/\* - This is used for packing the charm itself and any
2210 * \*/.\* - Hidden files are all ignored for now. This will most
2211 likely be changed into a specific ignore list
2215 zf
= zipfile
.ZipFile(path
, 'w', zipfile
.ZIP_DEFLATED
)
2216 for dirpath
, dirnames
, filenames
in os
.walk(self
.path
):
2217 relative_path
= dirpath
[len(self
.path
) + 1:]
2218 if relative_path
and not self
._ignore
(relative_path
):
2219 zf
.write(dirpath
, relative_path
)
2220 for name
in filenames
:
2221 archive_name
= os
.path
.join(relative_path
, name
)
2222 if not self
._ignore
(archive_name
):
2223 real_path
= os
.path
.join(dirpath
, name
)
2224 self
._check
_type
(real_path
)
2225 if os
.path
.islink(real_path
):
2226 self
._check
_link
(real_path
)
2227 self
._write
_symlink
(
2228 zf
, os
.readlink(real_path
), archive_name
)
2230 zf
.write(real_path
, archive_name
)
2234 def _check_type(self
, path
):
2238 if stat
.S_ISDIR(s
.st_mode
) or stat
.S_ISREG(s
.st_mode
):
2240 raise ValueError("Invalid Charm at % %s" % (
2241 path
, "Invalid file type for a charm"))
2243 def _check_link(self
, path
):
2244 link_path
= os
.readlink(path
)
2245 if link_path
[0] == "/":
2247 "Invalid Charm at %s: %s" % (
2248 path
, "Absolute links are invalid"))
2249 path_dir
= os
.path
.dirname(path
)
2250 link_path
= os
.path
.join(path_dir
, link_path
)
2251 if not link_path
.startswith(os
.path
.abspath(self
.path
)):
2253 "Invalid charm at %s %s" % (
2254 path
, "Only internal symlinks are allowed"))
2256 def _write_symlink(self
, zf
, link_target
, link_path
):
2257 """Package symlinks with appropriate zipfile metadata."""
2258 info
= zipfile
.ZipInfo()
2259 info
.filename
= link_path
2260 info
.create_system
= 3
2261 # Magic code for symlinks / py2/3 compat
2262 # 27166663808 = (stat.S_IFLNK | 0755) << 16
2263 info
.external_attr
= 2716663808
2264 zf
.writestr(info
, link_target
)
2266 def _ignore(self
, path
):
2267 if path
== "build" or path
.startswith("build/"):
2269 if path
.startswith('.'):