Clarified that params are optional.
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import collections
3 import logging
4 import re
5 import weakref
6 from concurrent.futures import CancelledError
7 from functools import partial
8
9 import yaml
10 from theblues import charmstore
11
12 from .client import client
13 from .client import watcher
14 from .client import connection
15 from .constraints import parse as parse_constraints
16 from .delta import get_entity_delta
17 from .delta import get_entity_class
18 from .exceptions import DeadEntityException
19 from .errors import JujuAPIError
20
21 log = logging.getLogger(__name__)
22
23
24 class _Observer(object):
25 """Wrapper around an observer callable.
26
27 This wrapper allows filter criteria to be associated with the
28 callable so that it's only called for changes that meet the criteria.
29
30 """
31 def __init__(self, callable_, entity_type, action, entity_id, predicate):
32 self.callable_ = callable_
33 self.entity_type = entity_type
34 self.action = action
35 self.entity_id = entity_id
36 self.predicate = predicate
37 if self.entity_id:
38 self.entity_id = str(self.entity_id)
39 if not self.entity_id.startswith('^'):
40 self.entity_id = '^' + self.entity_id
41 if not self.entity_id.endswith('$'):
42 self.entity_id += '$'
43
44 async def __call__(self, delta, old, new, model):
45 await self.callable_(delta, old, new, model)
46
47 def cares_about(self, delta):
48 """Return True if this observer "cares about" (i.e. wants to be
49 called) for a this delta.
50
51 """
52 if (self.entity_id and delta.get_id() and
53 not re.match(self.entity_id, str(delta.get_id()))):
54 return False
55
56 if self.entity_type and self.entity_type != delta.entity:
57 return False
58
59 if self.action and self.action != delta.type:
60 return False
61
62 if self.predicate and not self.predicate(delta):
63 return False
64
65 return True
66
67
68 class ModelObserver(object):
69 async def __call__(self, delta, old, new, model):
70 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
71 method = getattr(self, handler_name, self.on_change)
72 await method(delta, old, new, model)
73
74 async def on_change(self, delta, old, new, model):
75 pass
76
77
78 class ModelState(object):
79 """Holds the state of the model, including the delta history of all
80 entities in the model.
81
82 """
83 def __init__(self, model):
84 self.model = model
85 self.state = dict()
86
87 def _live_entity_map(self, entity_type):
88 """Return an id:Entity map of all the living entities of
89 type ``entity_type``.
90
91 """
92 return {
93 entity_id: self.get_entity(entity_type, entity_id)
94 for entity_id, history in self.state.get(entity_type, {}).items()
95 if history[-1] is not None
96 }
97
98 @property
99 def applications(self):
100 """Return a map of application-name:Application for all applications
101 currently in the model.
102
103 """
104 return self._live_entity_map('application')
105
106 @property
107 def machines(self):
108 """Return a map of machine-id:Machine for all machines currently in
109 the model.
110
111 """
112 return self._live_entity_map('machine')
113
114 @property
115 def units(self):
116 """Return a map of unit-id:Unit for all units currently in
117 the model.
118
119 """
120 return self._live_entity_map('unit')
121
122 def entity_history(self, entity_type, entity_id):
123 """Return the history deque for an entity.
124
125 """
126 return self.state[entity_type][entity_id]
127
128 def entity_data(self, entity_type, entity_id, history_index):
129 """Return the data dict for an entity at a specific index of its
130 history.
131
132 """
133 return self.entity_history(entity_type, entity_id)[history_index]
134
135 def apply_delta(self, delta):
136 """Apply delta to our state and return a copy of the
137 affected object as it was before and after the update, e.g.:
138
139 old_obj, new_obj = self.apply_delta(delta)
140
141 old_obj may be None if the delta is for the creation of a new object,
142 e.g. a new application or unit is deployed.
143
144 new_obj will never be None, but may be dead (new_obj.dead == True)
145 if the object was deleted as a result of the delta being applied.
146
147 """
148 history = (
149 self.state
150 .setdefault(delta.entity, {})
151 .setdefault(delta.get_id(), collections.deque())
152 )
153
154 history.append(delta.data)
155 if delta.type == 'remove':
156 history.append(None)
157
158 entity = self.get_entity(delta.entity, delta.get_id())
159 return entity.previous(), entity
160
161 def get_entity(
162 self, entity_type, entity_id, history_index=-1, connected=True):
163 """Return an object instance for the given entity_type and id.
164
165 By default the object state matches the most recent state from
166 Juju. To get an instance of the object in an older state, pass
167 history_index, an index into the history deque for the entity.
168
169 """
170
171 if history_index < 0 and history_index != -1:
172 history_index += len(self.entity_history(entity_type, entity_id))
173 if history_index < 0:
174 return None
175
176 try:
177 self.entity_data(entity_type, entity_id, history_index)
178 except IndexError:
179 return None
180
181 entity_class = get_entity_class(entity_type)
182 return entity_class(
183 entity_id, self.model, history_index=history_index,
184 connected=connected)
185
186
187 class ModelEntity(object):
188 """An object in the Model tree"""
189
190 def __init__(self, entity_id, model, history_index=-1, connected=True):
191 """Initialize a new entity
192
193 :param entity_id str: The unique id of the object in the model
194 :param model: The model instance in whose object tree this
195 entity resides
196 :history_index int: The index of this object's state in the model's
197 history deque for this entity
198 :connected bool: Flag indicating whether this object gets live updates
199 from the model.
200
201 """
202 self.entity_id = entity_id
203 self.model = model
204 self._history_index = history_index
205 self.connected = connected
206 self.connection = model.connection
207
208 def __getattr__(self, name):
209 """Fetch object attributes from the underlying data dict held in the
210 model.
211
212 """
213 if self.data is None:
214 raise DeadEntityException(
215 "Entity {}:{} is dead - its attributes can no longer be "
216 "accessed. Use the .previous() method on this object to get "
217 "a copy of the object at its previous state.".format(
218 self.entity_type, self.entity_id))
219 return self.data[name]
220
221 def __bool__(self):
222 return bool(self.data)
223
224 def on_change(self, callable_):
225 """Add a change observer to this entity.
226
227 """
228 self.model.add_observer(
229 callable_, self.entity_type, 'change', self.entity_id)
230
231 def on_remove(self, callable_):
232 """Add a remove observer to this entity.
233
234 """
235 self.model.add_observer(
236 callable_, self.entity_type, 'remove', self.entity_id)
237
238 @property
239 def entity_type(self):
240 """A string identifying the entity type of this object, e.g.
241 'application' or 'unit', etc.
242
243 """
244 return self.__class__.__name__.lower()
245
246 @property
247 def current(self):
248 """Return True if this object represents the current state of the
249 entity in the underlying model.
250
251 This will be True except when the object represents an entity at a
252 non-latest state in history, e.g. if the object was obtained by calling
253 .previous() on another object.
254
255 """
256 return self._history_index == -1
257
258 @property
259 def dead(self):
260 """Returns True if this entity no longer exists in the underlying
261 model.
262
263 """
264 return (
265 self.data is None or
266 self.model.state.entity_data(
267 self.entity_type, self.entity_id, -1) is None
268 )
269
270 @property
271 def alive(self):
272 """Returns True if this entity still exists in the underlying
273 model.
274
275 """
276 return not self.dead
277
278 @property
279 def data(self):
280 """The data dictionary for this entity.
281
282 """
283 return self.model.state.entity_data(
284 self.entity_type, self.entity_id, self._history_index)
285
286 def previous(self):
287 """Return a copy of this object as was at its previous state in
288 history.
289
290 Returns None if this object is new (and therefore has no history).
291
292 The returned object is always "disconnected", i.e. does not receive
293 live updates.
294
295 """
296 return self.model.state.get_entity(
297 self.entity_type, self.entity_id, self._history_index - 1,
298 connected=False)
299
300 def next(self):
301 """Return a copy of this object at its next state in
302 history.
303
304 Returns None if this object is already the latest.
305
306 The returned object is "disconnected", i.e. does not receive
307 live updates, unless it is current (latest).
308
309 """
310 if self._history_index == -1:
311 return None
312
313 new_index = self._history_index + 1
314 connected = (
315 new_index == len(self.model.state.entity_history(
316 self.entity_type, self.entity_id)) - 1
317 )
318 return self.model.state.get_entity(
319 self.entity_type, self.entity_id, self._history_index - 1,
320 connected=connected)
321
322 def latest(self):
323 """Return a copy of this object at its current state in the model.
324
325 Returns self if this object is already the latest.
326
327 The returned object is always "connected", i.e. receives
328 live updates from the model.
329
330 """
331 if self._history_index == -1:
332 return self
333
334 return self.model.state.get_entity(self.entity_type, self.entity_id)
335
336
337 class Model(object):
338 def __init__(self, loop=None):
339 """Instantiate a new connected Model.
340
341 :param loop: an asyncio event loop
342
343 """
344 self.loop = loop or asyncio.get_event_loop()
345 self.connection = None
346 self.observers = weakref.WeakValueDictionary()
347 self.state = ModelState(self)
348 self._watcher_task = None
349 self._watch_shutdown = asyncio.Event(loop=loop)
350 self._watch_received = asyncio.Event(loop=loop)
351 self._charmstore = CharmStore(self.loop)
352
353 async def connect(self, *args, **kw):
354 """Connect to an arbitrary Juju model.
355
356 args and kw are passed through to Connection.connect()
357
358 """
359 self.connection = await connection.Connection.connect(*args, **kw)
360 self._watch()
361 await self._watch_received.wait()
362
363 async def connect_current(self):
364 """Connect to the current Juju model.
365
366 """
367 self.connection = await connection.Connection.connect_current()
368 self._watch()
369 await self._watch_received.wait()
370
371 async def connect_model(self, arg):
372 """Connect to a specific Juju model.
373 :param arg: <controller>:<user/model>
374
375 """
376 self.connection = await connection.Connection.connect_model(arg)
377 self._watch()
378 await self._watch_received.wait()
379
380 async def disconnect(self):
381 """Shut down the watcher task and close websockets.
382
383 """
384 self._stop_watching()
385 if self.connection and self.connection.is_open:
386 await self._watch_shutdown.wait()
387 log.debug('Closing model connection')
388 await self.connection.close()
389 self.connection = None
390
391 def all_units_idle(self):
392 """Return True if all units are idle.
393
394 """
395 for unit in self.units.values():
396 unit_status = unit.data['agent-status']['current']
397 if unit_status != 'idle':
398 return False
399 return True
400
401 async def reset(self, force=False):
402 """Reset the model to a clean state.
403
404 :param bool force: Force-terminate machines.
405
406 This returns only after the model has reached a clean state. "Clean"
407 means no applications or machines exist in the model.
408
409 """
410 log.debug('Resetting model')
411 for app in self.applications.values():
412 await app.destroy()
413 for machine in self.machines.values():
414 await machine.destroy(force=force)
415 await self.block_until(
416 lambda: len(self.machines) == 0
417 )
418
419 async def block_until(self, *conditions, timeout=None):
420 """Return only after all conditions are true.
421
422 """
423 async def _block():
424 while not all(c() for c in conditions):
425 await asyncio.sleep(0)
426 await asyncio.wait_for(_block(), timeout)
427
428 @property
429 def applications(self):
430 """Return a map of application-name:Application for all applications
431 currently in the model.
432
433 """
434 return self.state.applications
435
436 @property
437 def machines(self):
438 """Return a map of machine-id:Machine for all machines currently in
439 the model.
440
441 """
442 return self.state.machines
443
444 @property
445 def units(self):
446 """Return a map of unit-id:Unit for all units currently in
447 the model.
448
449 """
450 return self.state.units
451
452 def add_observer(
453 self, callable_, entity_type=None, action=None, entity_id=None,
454 predicate=None):
455 """Register an "on-model-change" callback
456
457 Once the model is connected, ``callable_``
458 will be called each time the model changes. callable_ should
459 be Awaitable and accept the following positional arguments:
460
461 delta - An instance of :class:`juju.delta.EntityDelta`
462 containing the raw delta data recv'd from the Juju
463 websocket.
464
465 old_obj - If the delta modifies an existing object in the model,
466 old_obj will be a copy of that object, as it was before the
467 delta was applied. Will be None if the delta creates a new
468 entity in the model.
469
470 new_obj - A copy of the new or updated object, after the delta
471 is applied. Will be None if the delta removes an entity
472 from the model.
473
474 model - The :class:`Model` itself.
475
476 Events for which ``callable_`` is called can be specified by passing
477 entity_type, action, and/or id_ filter criteria, e.g.:
478
479 add_observer(
480 myfunc, entity_type='application', action='add', id_='ubuntu')
481
482 For more complex filtering conditions, pass a predicate function. It
483 will be called with a delta as its only argument. If the predicate
484 function returns True, the callable_ will be called.
485
486 """
487 observer = _Observer(
488 callable_, entity_type, action, entity_id, predicate)
489 self.observers[observer] = callable_
490
491 def _watch(self):
492 """Start an asynchronous watch against this model.
493
494 See :meth:`add_observer` to register an onchange callback.
495
496 """
497 async def _start_watch():
498 self._watch_shutdown.clear()
499 try:
500 allwatcher = watcher.AllWatcher()
501 self._watch_conn = await self.connection.clone()
502 allwatcher.connect(self._watch_conn)
503 while True:
504 results = await allwatcher.Next()
505 for delta in results.deltas:
506 delta = get_entity_delta(delta)
507 old_obj, new_obj = self.state.apply_delta(delta)
508 # XXX: Might not want to shield at this level
509 # We are shielding because when the watcher is
510 # canceled (on disconnect()), we don't want all of
511 # its children (every observer callback) to be
512 # canceled with it. So we shield them. But this means
513 # they can *never* be canceled.
514 await asyncio.shield(
515 self._notify_observers(delta, old_obj, new_obj))
516 self._watch_received.set()
517 except CancelledError:
518 log.debug('Closing watcher connection')
519 await self._watch_conn.close()
520 self._watch_shutdown.set()
521 self._watch_conn = None
522
523 log.debug('Starting watcher task')
524 self._watcher_task = self.loop.create_task(_start_watch())
525
526 def _stop_watching(self):
527 """Stop the asynchronous watch against this model.
528
529 """
530 log.debug('Stopping watcher task')
531 if self._watcher_task:
532 self._watcher_task.cancel()
533
534 async def _notify_observers(self, delta, old_obj, new_obj):
535 """Call observing callbacks, notifying them of a change in model state
536
537 :param delta: The raw change from the watcher
538 (:class:`juju.client.overrides.Delta`)
539 :param old_obj: The object in the model that this delta updates.
540 May be None.
541 :param new_obj: The object in the model that is created or updated
542 by applying this delta.
543
544 """
545 if new_obj and not old_obj:
546 delta.type = 'add'
547
548 log.debug(
549 'Model changed: %s %s %s',
550 delta.entity, delta.type, delta.get_id())
551
552 for o in self.observers:
553 if o.cares_about(delta):
554 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
555
556 async def _wait(self, entity_type, entity_id, action, predicate=None):
557 """
558 Block the calling routine until a given action has happened to the
559 given entity
560
561 :param entity_type: The entity's type.
562 :param entity_id: The entity's id.
563 :param action: the type of action (e.g., 'add' or 'change')
564 :param predicate: optional callable that must take as an
565 argument a delta, and must return a boolean, indicating
566 whether the delta contains the specific action we're looking
567 for. For example, you might check to see whether a 'change'
568 has a 'completed' status. See the _Observer class for details.
569
570 """
571 q = asyncio.Queue(loop=self.loop)
572
573 async def callback(delta, old, new, model):
574 await q.put(delta.get_id())
575
576 self.add_observer(callback, entity_type, action, entity_id, predicate)
577 entity_id = await q.get()
578 return self.state._live_entity_map(entity_type)[entity_id]
579
580 async def _wait_for_new(self, entity_type, entity_id, predicate=None):
581 """Wait for a new object to appear in the Model and return it.
582
583 Waits for an object of type ``entity_type`` with id ``entity_id``.
584
585 This coroutine blocks until the new object appears in the model.
586
587 """
588 return await self._wait(entity_type, entity_id, 'add', predicate)
589
590 async def wait_for_action(self, action_id):
591 """Given an action, wait for it to complete."""
592
593 if action_id.startswith("action-"):
594 # if we've been passed action.tag, transform it into the
595 # id that the api deltas will use.
596 action_id = action_id[7:]
597
598 def predicate(delta):
599 return delta.data['status'] in ('completed', 'failed')
600
601 return await self._wait('action', action_id, 'change', predicate)
602
603 def add_machine(
604 self, spec=None, constraints=None, disks=None, series=None,
605 count=1):
606 """Start a new, empty machine and optionally a container, or add a
607 container to a machine.
608
609 :param str spec: Machine specification
610 Examples::
611
612 (None) - starts a new machine
613 'lxc' - starts a new machine with on lxc container
614 'lxc:4' - starts a new lxc container on machine 4
615 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
616 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
617 'maas2.name' - acquire machine maas2.name on MAAS
618 :param constraints: Machine constraints
619 :type constraints: :class:`juju.Constraints`
620 :param list disks: List of disk :class:`constraints <juju.Constraints>`
621 :param str series: Series
622 :param int count: Number of machines to deploy
623
624 Supported container types are: lxc, lxd, kvm
625
626 When deploying a container to an existing machine, constraints cannot
627 be used.
628
629 """
630 pass
631 add_machines = add_machine
632
633 async def add_relation(self, relation1, relation2):
634 """Add a relation between two applications.
635
636 :param str relation1: '<application>[:<relation_name>]'
637 :param str relation2: '<application>[:<relation_name>]'
638
639 """
640 app_facade = client.ApplicationFacade()
641 app_facade.connect(self.connection)
642
643 log.debug(
644 'Adding relation %s <-> %s', relation1, relation2)
645
646 try:
647 result = await app_facade.AddRelation([relation1, relation2])
648 except JujuAPIError as e:
649 if 'relation already exists' not in e.message:
650 raise
651 log.debug(
652 'Relation %s <-> %s already exists', relation1, relation2)
653 # TODO: if relation already exists we should return the
654 # Relation ModelEntity here
655 return None
656
657 def predicate(delta):
658 endpoints = {}
659 for endpoint in delta.data['endpoints']:
660 endpoints[endpoint['application-name']] = endpoint['relation']
661 return endpoints == result.endpoints
662
663 return await self._wait_for_new('relation', None, predicate)
664
665 def add_space(self, name, *cidrs):
666 """Add a new network space.
667
668 Adds a new space with the given name and associates the given
669 (optional) list of existing subnet CIDRs with it.
670
671 :param str name: Name of the space
672 :param \*cidrs: Optional list of existing subnet CIDRs
673
674 """
675 pass
676
677 def add_ssh_key(self, key):
678 """Add a public SSH key to this model.
679
680 :param str key: The public ssh key
681
682 """
683 pass
684 add_ssh_keys = add_ssh_key
685
686 def add_subnet(self, cidr_or_id, space, *zones):
687 """Add an existing subnet to this model.
688
689 :param str cidr_or_id: CIDR or provider ID of the existing subnet
690 :param str space: Network space with which to associate
691 :param str \*zones: Zone(s) in which the subnet resides
692
693 """
694 pass
695
696 def get_backups(self):
697 """Retrieve metadata for backups in this model.
698
699 """
700 pass
701
702 def block(self, *commands):
703 """Add a new block to this model.
704
705 :param str \*commands: The commands to block. Valid values are
706 'all-changes', 'destroy-model', 'remove-object'
707
708 """
709 pass
710
711 def get_blocks(self):
712 """List blocks for this model.
713
714 """
715 pass
716
717 def get_cached_images(self, arch=None, kind=None, series=None):
718 """Return a list of cached OS images.
719
720 :param str arch: Filter by image architecture
721 :param str kind: Filter by image kind, e.g. 'lxd'
722 :param str series: Filter by image series, e.g. 'xenial'
723
724 """
725 pass
726
727 def create_backup(self, note=None, no_download=False):
728 """Create a backup of this model.
729
730 :param str note: A note to store with the backup
731 :param bool no_download: Do not download the backup archive
732 :return str: Path to downloaded archive
733
734 """
735 pass
736
737 def create_storage_pool(self, name, provider_type, **pool_config):
738 """Create or define a storage pool.
739
740 :param str name: Name to give the storage pool
741 :param str provider_type: Pool provider type
742 :param \*\*pool_config: key/value pool configuration pairs
743
744 """
745 pass
746
747 def debug_log(
748 self, no_tail=False, exclude_module=None, include_module=None,
749 include=None, level=None, limit=0, lines=10, replay=False,
750 exclude=None):
751 """Get log messages for this model.
752
753 :param bool no_tail: Stop after returning existing log messages
754 :param list exclude_module: Do not show log messages for these logging
755 modules
756 :param list include_module: Only show log messages for these logging
757 modules
758 :param list include: Only show log messages for these entities
759 :param str level: Log level to show, valid options are 'TRACE',
760 'DEBUG', 'INFO', 'WARNING', 'ERROR,
761 :param int limit: Return this many of the most recent (possibly
762 filtered) lines are shown
763 :param int lines: Yield this many of the most recent lines, and keep
764 yielding
765 :param bool replay: Yield the entire log, and keep yielding
766 :param list exclude: Do not show log messages for these entities
767
768 """
769 pass
770
771 async def deploy(
772 self, entity_url, service_name=None, bind=None, budget=None,
773 channel=None, config=None, constraints=None, force=False,
774 num_units=1, plan=None, resources=None, series=None, storage=None,
775 to=None):
776 """Deploy a new service or bundle.
777
778 :param str entity_url: Charm or bundle url
779 :param str service_name: Name to give the service
780 :param dict bind: <charm endpoint>:<network space> pairs
781 :param dict budget: <budget name>:<limit> pairs
782 :param str channel: Charm store channel from which to retrieve
783 the charm or bundle, e.g. 'development'
784 :param dict config: Charm configuration dictionary
785 :param constraints: Service constraints
786 :type constraints: :class:`juju.Constraints`
787 :param bool force: Allow charm to be deployed to a machine running
788 an unsupported series
789 :param int num_units: Number of units to deploy
790 :param str plan: Plan under which to deploy charm
791 :param dict resources: <resource name>:<file path> pairs
792 :param str series: Series on which to deploy
793 :param dict storage: Storage constraints TODO how do these look?
794 :param str to: Placement directive, e.g.::
795
796 '23' - machine 23
797 'lxc:7' - new lxc container on machine 7
798 '24/lxc/3' - lxc container 3 or machine 24
799
800 If None, a new machine is provisioned.
801
802
803 TODO::
804
805 - service_name is required; fill this in automatically if not
806 provided by caller
807 - series is required; how do we pick a default?
808
809 """
810 if to:
811 placement = [
812 client.Placement(**p) for p in to
813 ]
814 else:
815 placement = []
816
817 if storage:
818 storage = {
819 k: client.Constraints(**v)
820 for k, v in storage.items()
821 }
822
823 entity_id = await self.charmstore.entityId(entity_url)
824
825 app_facade = client.ApplicationFacade()
826 client_facade = client.ClientFacade()
827 app_facade.connect(self.connection)
828 client_facade.connect(self.connection)
829
830 if 'bundle/' in entity_id:
831 handler = BundleHandler(self)
832 await handler.fetch_plan(entity_id)
833 await handler.execute_plan()
834 extant_apps = {app for app in self.applications}
835 pending_apps = set(handler.applications) - extant_apps
836 if pending_apps:
837 # new apps will usually be in the model by now, but if some
838 # haven't made it yet we'll need to wait on them to be added
839 await asyncio.gather(*[
840 asyncio.ensure_future(
841 self.model._wait_for_new('application', app_name))
842 for app_name in pending_apps
843 ])
844 return [app for name, app in self.applications.items()
845 if name in handler.applications]
846 else:
847 log.debug(
848 'Deploying %s', entity_id)
849
850 await client_facade.AddCharm(channel, entity_id)
851 app = client.ApplicationDeploy(
852 application=service_name,
853 channel=channel,
854 charm_url=entity_id,
855 config=config,
856 constraints=constraints,
857 endpoint_bindings=bind,
858 num_units=num_units,
859 placement=placement,
860 resources=resources,
861 series=series,
862 storage=storage,
863 )
864
865 await app_facade.Deploy([app])
866 return await self._wait_for_new('application', service_name)
867
868 def destroy(self):
869 """Terminate all machines and resources for this model.
870
871 """
872 pass
873
874 async def destroy_unit(self, *unit_names):
875 """Destroy units by name.
876
877 """
878 app_facade = client.ApplicationFacade()
879 app_facade.connect(self.connection)
880
881 log.debug(
882 'Destroying unit%s %s',
883 's' if len(unit_names) == 1 else '',
884 ' '.join(unit_names))
885
886 return await app_facade.Destroy(self.name)
887 destroy_units = destroy_unit
888
889 def get_backup(self, archive_id):
890 """Download a backup archive file.
891
892 :param str archive_id: The id of the archive to download
893 :return str: Path to the archive file
894
895 """
896 pass
897
898 def enable_ha(
899 self, num_controllers=0, constraints=None, series=None, to=None):
900 """Ensure sufficient controllers exist to provide redundancy.
901
902 :param int num_controllers: Number of controllers to make available
903 :param constraints: Constraints to apply to the controller machines
904 :type constraints: :class:`juju.Constraints`
905 :param str series: Series of the controller machines
906 :param list to: Placement directives for controller machines, e.g.::
907
908 '23' - machine 23
909 'lxc:7' - new lxc container on machine 7
910 '24/lxc/3' - lxc container 3 or machine 24
911
912 If None, a new machine is provisioned.
913
914 """
915 pass
916
917 def get_config(self):
918 """Return the configuration settings for this model.
919
920 """
921 pass
922
923 def get_constraints(self):
924 """Return the machine constraints for this model.
925
926 """
927 pass
928
929 def grant(self, username, acl='read'):
930 """Grant a user access to this model.
931
932 :param str username: Username
933 :param str acl: Access control ('read' or 'write')
934
935 """
936 pass
937
938 def import_ssh_key(self, identity):
939 """Add a public SSH key from a trusted indentity source to this model.
940
941 :param str identity: User identity in the form <lp|gh>:<username>
942
943 """
944 pass
945 import_ssh_keys = import_ssh_key
946
947 def get_machines(self, machine, utc=False):
948 """Return list of machines in this model.
949
950 :param str machine: Machine id, e.g. '0'
951 :param bool utc: Display time as UTC in RFC3339 format
952
953 """
954 pass
955
956 def get_shares(self):
957 """Return list of all users with access to this model.
958
959 """
960 pass
961
962 def get_spaces(self):
963 """Return list of all known spaces, including associated subnets.
964
965 """
966 pass
967
968 def get_ssh_key(self):
969 """Return known SSH keys for this model.
970
971 """
972 pass
973 get_ssh_keys = get_ssh_key
974
975 def get_storage(self, filesystem=False, volume=False):
976 """Return details of storage instances.
977
978 :param bool filesystem: Include filesystem storage
979 :param bool volume: Include volume storage
980
981 """
982 pass
983
984 def get_storage_pools(self, names=None, providers=None):
985 """Return list of storage pools.
986
987 :param list names: Only include pools with these names
988 :param list providers: Only include pools for these providers
989
990 """
991 pass
992
993 def get_subnets(self, space=None, zone=None):
994 """Return list of known subnets.
995
996 :param str space: Only include subnets in this space
997 :param str zone: Only include subnets in this zone
998
999 """
1000 pass
1001
1002 def remove_blocks(self):
1003 """Remove all blocks from this model.
1004
1005 """
1006 pass
1007
1008 def remove_backup(self, backup_id):
1009 """Delete a backup.
1010
1011 :param str backup_id: The id of the backup to remove
1012
1013 """
1014 pass
1015
1016 def remove_cached_images(self, arch=None, kind=None, series=None):
1017 """Remove cached OS images.
1018
1019 :param str arch: Architecture of the images to remove
1020 :param str kind: Image kind to remove, e.g. 'lxd'
1021 :param str series: Image series to remove, e.g. 'xenial'
1022
1023 """
1024 pass
1025
1026 def remove_machine(self, *machine_ids):
1027 """Remove a machine from this model.
1028
1029 :param str \*machine_ids: Ids of the machines to remove
1030
1031 """
1032 pass
1033 remove_machines = remove_machine
1034
1035 def remove_ssh_key(self, *keys):
1036 """Remove a public SSH key(s) from this model.
1037
1038 :param str \*keys: Keys to remove
1039
1040 """
1041 pass
1042 remove_ssh_keys = remove_ssh_key
1043
1044 def restore_backup(
1045 self, bootstrap=False, constraints=None, archive=None,
1046 backup_id=None, upload_tools=False):
1047 """Restore a backup archive to a new controller.
1048
1049 :param bool bootstrap: Bootstrap a new state machine
1050 :param constraints: Model constraints
1051 :type constraints: :class:`juju.Constraints`
1052 :param str archive: Path to backup archive to restore
1053 :param str backup_id: Id of backup to restore
1054 :param bool upload_tools: Upload tools if bootstrapping a new machine
1055
1056 """
1057 pass
1058
1059 def retry_provisioning(self):
1060 """Retry provisioning for failed machines.
1061
1062 """
1063 pass
1064
1065 def revoke(self, username, acl='read'):
1066 """Revoke a user's access to this model.
1067
1068 :param str username: Username to revoke
1069 :param str acl: Access control ('read' or 'write')
1070
1071 """
1072 pass
1073
1074 def run(self, command, timeout=None):
1075 """Run command on all machines in this model.
1076
1077 :param str command: The command to run
1078 :param int timeout: Time to wait before command is considered failed
1079
1080 """
1081 pass
1082
1083 def set_config(self, **config):
1084 """Set configuration keys on this model.
1085
1086 :param \*\*config: Config key/values
1087
1088 """
1089 pass
1090
1091 def set_constraints(self, constraints):
1092 """Set machine constraints on this model.
1093
1094 :param :class:`juju.Constraints` constraints: Machine constraints
1095
1096 """
1097 pass
1098
1099 def get_action_output(self, action_uuid, wait=-1):
1100 """Get the results of an action by ID.
1101
1102 :param str action_uuid: Id of the action
1103 :param int wait: Time in seconds to wait for action to complete
1104
1105 """
1106 pass
1107
1108 def get_action_status(self, uuid_or_prefix=None, name=None):
1109 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1110
1111 :param str uuid_or_prefix: Filter by action uuid or prefix
1112 :param str name: Filter by action name
1113
1114 """
1115 pass
1116
1117 def get_budget(self, budget_name):
1118 """Get budget usage info.
1119
1120 :param str budget_name: Name of budget
1121
1122 """
1123 pass
1124
1125 def get_status(self, filter_=None, utc=False):
1126 """Return the status of the model.
1127
1128 :param str filter_: Service or unit name or wildcard ('*')
1129 :param bool utc: Display time as UTC in RFC3339 format
1130
1131 """
1132 pass
1133 status = get_status
1134
1135 def sync_tools(
1136 self, all_=False, destination=None, dry_run=False, public=False,
1137 source=None, stream=None, version=None):
1138 """Copy Juju tools into this model.
1139
1140 :param bool all_: Copy all versions, not just the latest
1141 :param str destination: Path to local destination directory
1142 :param bool dry_run: Don't do the actual copy
1143 :param bool public: Tools are for a public cloud, so generate mirrors
1144 information
1145 :param str source: Path to local source directory
1146 :param str stream: Simplestreams stream for which to sync metadata
1147 :param str version: Copy a specific major.minor version
1148
1149 """
1150 pass
1151
1152 def unblock(self, *commands):
1153 """Unblock an operation that would alter this model.
1154
1155 :param str \*commands: The commands to unblock. Valid values are
1156 'all-changes', 'destroy-model', 'remove-object'
1157
1158 """
1159 pass
1160
1161 def unset_config(self, *keys):
1162 """Unset configuration on this model.
1163
1164 :param str \*keys: The keys to unset
1165
1166 """
1167 pass
1168
1169 def upgrade_gui(self):
1170 """Upgrade the Juju GUI for this model.
1171
1172 """
1173 pass
1174
1175 def upgrade_juju(
1176 self, dry_run=False, reset_previous_upgrade=False,
1177 upload_tools=False, version=None):
1178 """Upgrade Juju on all machines in a model.
1179
1180 :param bool dry_run: Don't do the actual upgrade
1181 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1182 upgrade status
1183 :param bool upload_tools: Upload local version of tools
1184 :param str version: Upgrade to a specific version
1185
1186 """
1187 pass
1188
1189 def upload_backup(self, archive_path):
1190 """Store a backup archive remotely in Juju.
1191
1192 :param str archive_path: Path to local archive
1193
1194 """
1195 pass
1196
1197 @property
1198 def charmstore(self):
1199 return self._charmstore
1200
1201
1202 class BundleHandler(object):
1203 """
1204 Handle bundles by using the API to translate bundle YAML into a plan of
1205 steps and then dispatching each of those using the API.
1206 """
1207 def __init__(self, model):
1208 self.model = model
1209 self.charmstore = model.charmstore
1210 self.plan = []
1211 self.references = {}
1212 self._units_by_app = {}
1213 for unit_name, unit in model.units.items():
1214 app_units = self._units_by_app.setdefault(unit.application, [])
1215 app_units.append(unit_name)
1216 self.client_facade = client.ClientFacade()
1217 self.client_facade.connect(model.connection)
1218 self.app_facade = client.ApplicationFacade()
1219 self.app_facade.connect(model.connection)
1220 self.ann_facade = client.AnnotationsFacade()
1221 self.ann_facade.connect(model.connection)
1222
1223 async def fetch_plan(self, entity_id):
1224 bundle_yaml = await self.charmstore.files(entity_id,
1225 filename='bundle.yaml',
1226 read_file=True)
1227 self.bundle = yaml.safe_load(bundle_yaml)
1228 self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
1229
1230 async def execute_plan(self):
1231 for step in self.plan.changes:
1232 method = getattr(self, step.method)
1233 result = await method(*step.args)
1234 self.references[step.id_] = result
1235
1236 @property
1237 def applications(self):
1238 return list(self.bundle['services'].keys())
1239
1240 def resolve(self, reference):
1241 if reference and reference.startswith('$'):
1242 reference = self.references[reference[1:]]
1243 return reference
1244
1245 async def addCharm(self, charm, series):
1246 """
1247 :param charm string:
1248 Charm holds the URL of the charm to be added.
1249
1250 :param series string:
1251 Series holds the series of the charm to be added
1252 if the charm default is not sufficient.
1253 """
1254 entity_id = await self.charmstore.entityId(charm)
1255 log.debug('Adding %s', entity_id)
1256 await self.client_facade.AddCharm(None, entity_id)
1257 return entity_id
1258
1259 async def addMachines(self, params=None):
1260 """
1261 :param params dict:
1262 Dictionary specifying the machine to add. All keys are optional.
1263 Keys include:
1264
1265 series: string specifying the machine OS series.
1266 constraints: string holding machine constraints, if any. We'll
1267 parse this into the json friendly dict that the juju api
1268 expects.
1269 Container_type: string holding the type of the container (for
1270 instance ""lxc" or kvm"). It is not specified for top level
1271 machines.
1272 parent_id: string holding a placeholder pointing to another
1273 machine change or to a unit change. This value is only
1274 specified in the case this machine is a container, in
1275 which case also ContainerType is set.
1276 """
1277 params = params or {}
1278
1279 if 'parent_id' in params:
1280 params['parent_id'] = self.resolve(params['parent_id'])
1281
1282 params['constraints'] = parse_constraints(
1283 params.get('constraints'))
1284 params['jobs'] = params.get('jobs', ['JobHostUnits'])
1285
1286 params = client.AddMachineParams(**params)
1287 results = await self.client_facade.AddMachines([params])
1288 error = results.machines[0].error
1289 if error:
1290 raise ValueError("Error adding machine: %s", error.message)
1291 machine = results.machines[0].machine
1292 log.debug('Added new machine %s', machine)
1293 return machine
1294
1295 async def addRelation(self, endpoint1, endpoint2):
1296 """
1297 :param endpoint1 string:
1298 :param endpoint2 string:
1299 Endpoint1 and Endpoint2 hold relation endpoints in the
1300 "application:interface" form, where the application is always a
1301 placeholder pointing to an application change, and the interface is
1302 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1303 """
1304 endpoints = [endpoint1, endpoint2]
1305 # resolve indirect references
1306 for i in range(len(endpoints)):
1307 parts = endpoints[i].split(':')
1308 parts[0] = self.resolve(parts[0])
1309 endpoints[i] = ':'.join(parts)
1310
1311 log.info('Relating %s <-> %s', *endpoints)
1312 return await self.model.add_relation(*endpoints)
1313
1314 async def deploy(self, charm, series, application, options, constraints,
1315 storage, endpoint_bindings, resources):
1316 """
1317 :param charm string:
1318 Charm holds the URL of the charm to be used to deploy this
1319 application.
1320
1321 :param series string:
1322 Series holds the series of the application to be deployed
1323 if the charm default is not sufficient.
1324
1325 :param application string:
1326 Application holds the application name.
1327
1328 :param options map[string]interface{}:
1329 Options holds application options.
1330
1331 :param constraints string:
1332 Constraints holds the optional application constraints.
1333
1334 :param storage map[string]string:
1335 Storage holds the optional storage constraints.
1336
1337 :param endpoint_bindings map[string]string:
1338 EndpointBindings holds the optional endpoint bindings
1339
1340 :param resources map[string]int:
1341 Resources identifies the revision to use for each resource
1342 of the application's charm.
1343 """
1344 # resolve indirect references
1345 charm = self.resolve(charm)
1346 # stringify all config values for API
1347 options = {k: str(v) for k, v in options.items()}
1348 # build param object
1349 app = client.ApplicationDeploy(
1350 charm_url=charm,
1351 series=series,
1352 application=application,
1353 config=options,
1354 constraints=constraints,
1355 storage=storage,
1356 endpoint_bindings=endpoint_bindings,
1357 resources=resources,
1358 )
1359 # do the do
1360 log.info('Deploying %s', charm)
1361 await self.app_facade.Deploy([app])
1362 return application
1363
1364 async def addUnit(self, application, to):
1365 """
1366 :param application string:
1367 Application holds the application placeholder name for which a unit
1368 is added.
1369
1370 :param to string:
1371 To holds the optional location where to add the unit, as a
1372 placeholder pointing to another unit change or to a machine change.
1373 """
1374 application = self.resolve(application)
1375 placement = self.resolve(to)
1376 if self._units_by_app.get(application):
1377 # enough units for this application already exist;
1378 # claim one, and carry on
1379 # NB: this should probably honor placement, but the juju client
1380 # doesn't, so we're not bothering, either
1381 unit_name = self._units_by_app[application].pop()
1382 log.debug('Reusing unit %s for %s', unit_name, application)
1383 return self.model.units[unit_name]
1384
1385 log.debug('Adding new unit for %s%s', application,
1386 ' to %s' % placement if placement else '')
1387 return await self.model.applications[application].add_unit(
1388 count=1,
1389 to=placement,
1390 )
1391
1392 async def expose(self, application):
1393 """
1394 :param application string:
1395 Application holds the placeholder name of the application that must
1396 be exposed.
1397 """
1398 application = self.resolve(application)
1399 log.info('Exposing %s', application)
1400 return await self.model.applications[application].expose()
1401
1402 async def setAnnotations(self, id_, entity_type, annotations):
1403 """
1404 :param id_ string:
1405 Id is the placeholder for the application or machine change
1406 corresponding to the entity to be annotated.
1407
1408 :param entity_type EntityType:
1409 EntityType holds the type of the entity, "application" or
1410 "machine".
1411
1412 :param annotations map[string]string:
1413 Annotations holds the annotations as key/value pairs.
1414 """
1415 entity_id = self.resolve(id_)
1416 try:
1417 entity = self.model.state.get_entity(entity_type, entity_id)
1418 except KeyError:
1419 entity = await self.model._wait_for_new(entity_type, entity_id)
1420 return await entity.set_annotations(annotations)
1421
1422
1423 class CharmStore(object):
1424 """
1425 Async wrapper around theblues.charmstore.CharmStore
1426 """
1427 def __init__(self, loop):
1428 self.loop = loop
1429 self._cs = charmstore.CharmStore()
1430
1431 def __getattr__(self, name):
1432 """
1433 Wrap method calls in coroutines that use run_in_executor to make them
1434 async.
1435 """
1436 attr = getattr(self._cs, name)
1437 if not callable(attr):
1438 wrapper = partial(getattr, self._cs, name)
1439 setattr(self, name, wrapper)
1440 else:
1441 async def coro(*args, **kwargs):
1442 method = partial(attr, *args, **kwargs)
1443 return await self.loop.run_in_executor(None, method)
1444 setattr(self, name, coro)
1445 wrapper = coro
1446 return wrapper