5d436fd8209acc14cd89447ae42647d0d098e058
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import collections
3 import logging
4 import re
5 import weakref
6 from concurrent.futures import CancelledError
7 from functools import partial
8
9 from theblues import charmstore
10
11 from .client import client
12 from .client import watcher
13 from .client import connection
14 from .delta import get_entity_delta
15 from .delta import get_entity_class
16 from .exceptions import DeadEntityException
17 from .errors import JujuAPIError
18
19 log = logging.getLogger(__name__)
20
21
22 class _Observer(object):
23 """Wrapper around an observer callable.
24
25 This wrapper allows filter criteria to be associated with the
26 callable so that it's only called for changes that meet the criteria.
27
28 """
29 def __init__(self, callable_, entity_type, action, entity_id):
30 self.callable_ = callable_
31 self.entity_type = entity_type
32 self.action = action
33 self.entity_id = entity_id
34 if self.entity_id:
35 if not self.entity_id.startswith('^'):
36 self.entity_id = '^' + self.entity_id
37 if not self.entity_id.endswith('$'):
38 self.entity_id += '$'
39
40 async def __call__(self, delta, old, new, model):
41 await self.callable_(delta, old, new, model)
42
43 def cares_about(self, entity_type, action, entity_id):
44 """Return True if this observer "cares about" (i.e. wants to be
45 called) for a change matching the entity_type, action, and
46 entity_id parameters.
47
48 """
49 if (self.entity_id and entity_id and
50 not re.match(self.entity_id, str(entity_id))):
51 return False
52
53 if self.entity_type and self.entity_type != entity_type:
54 return False
55
56 if self.action and self.action != action:
57 return False
58
59 return True
60
61
62 class ModelObserver(object):
63 async def __call__(self, delta, old, new, model):
64 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
65 method = getattr(self, handler_name, self.on_change)
66 await method(delta, old, new, model)
67
68 async def on_change(self, delta, old, new, model):
69 pass
70
71
72 class ModelState(object):
73 """Holds the state of the model, including the delta history of all
74 entities in the model.
75
76 """
77 def __init__(self, model):
78 self.model = model
79 self.state = dict()
80
81 def clear(self):
82 self.state.clear()
83
84 def _live_entity_map(self, entity_type):
85 """Return an id:Entity map of all the living entities of
86 type ``entity_type``.
87
88 """
89 return {
90 entity_id: self.get_entity(entity_type, entity_id)
91 for entity_id, history in self.state.get(entity_type, {}).items()
92 if history[-1] is not None
93 }
94
95 @property
96 def applications(self):
97 """Return a map of application-name:Application for all applications
98 currently in the model.
99
100 """
101 return self._live_entity_map('application')
102
103 @property
104 def machines(self):
105 """Return a map of machine-id:Machine for all machines currently in
106 the model.
107
108 """
109 return self._live_entity_map('machine')
110
111 @property
112 def units(self):
113 """Return a map of unit-id:Unit for all units currently in
114 the model.
115
116 """
117 return self._live_entity_map('unit')
118
119 def entity_history(self, entity_type, entity_id):
120 """Return the history deque for an entity.
121
122 """
123 return self.state[entity_type][entity_id]
124
125 def entity_data(self, entity_type, entity_id, history_index):
126 """Return the data dict for an entity at a specific index of its
127 history.
128
129 """
130 return self.entity_history(entity_type, entity_id)[history_index]
131
132 def apply_delta(self, delta):
133 """Apply delta to our state and return a copy of the
134 affected object as it was before and after the update, e.g.:
135
136 old_obj, new_obj = self.apply_delta(delta)
137
138 old_obj may be None if the delta is for the creation of a new object,
139 e.g. a new application or unit is deployed.
140
141 new_obj will never be None, but may be dead (new_obj.dead == True)
142 if the object was deleted as a result of the delta being applied.
143
144 """
145 history = (
146 self.state
147 .setdefault(delta.entity, {})
148 .setdefault(delta.get_id(), collections.deque())
149 )
150
151 history.append(delta.data)
152 if delta.type == 'remove':
153 history.append(None)
154
155 entity = self.get_entity(delta.entity, delta.get_id())
156 return entity.previous(), entity
157
158 def get_entity(
159 self, entity_type, entity_id, history_index=-1, connected=True):
160 """Return an object instance representing the entity created or
161 updated by ``delta``
162
163 """
164 """
165 log.debug(
166 'Getting %s:%s at index %s',
167 entity_type, entity_id, history_index)
168 """
169
170 if history_index < 0 and history_index != -1:
171 history_index += len(self.entity_history(entity_type, entity_id))
172 if history_index < 0:
173 return None
174
175 try:
176 self.entity_data(entity_type, entity_id, history_index)
177 except IndexError:
178 return None
179
180 entity_class = get_entity_class(entity_type)
181 return entity_class(
182 entity_id, self.model, history_index=history_index,
183 connected=connected)
184
185
186 class ModelEntity(object):
187 """An object in the Model tree"""
188
189 def __init__(self, entity_id, model, history_index=-1, connected=True):
190 """Initialize a new entity
191
192 :param entity_id str: The unique id of the object in the model
193 :param model: The model instance in whose object tree this
194 entity resides
195 :history_index int: The index of this object's state in the model's
196 history deque for this entity
197 :connected bool: Flag indicating whether this object gets live updates
198 from the model.
199
200 """
201 self.entity_id = entity_id
202 self.model = model
203 self._history_index = history_index
204 self.connected = connected
205 self.connection = model.connection
206
207 def __getattr__(self, name):
208 """Fetch object attributes from the underlying data dict held in the
209 model.
210
211 """
212 if self.data is None:
213 raise DeadEntityException(
214 "Entity {}:{} is dead - its attributes can no longer be "
215 "accessed. Use the .previous() method on this object to get "
216 "a copy of the object at its previous state.".format(
217 self.entity_type, self.entity_id))
218 return self.data[name]
219
220 def __bool__(self):
221 return bool(self.data)
222
223 def on_change(self, callable_):
224 """Add a change observer to this entity.
225
226 """
227 self.model.add_observer(
228 callable_, self.entity_type, 'change', self.entity_id)
229
230 def on_remove(self, callable_):
231 """Add a remove observer to this entity.
232
233 """
234 self.model.add_observer(
235 callable_, self.entity_type, 'remove', self.entity_id)
236
237 @property
238 def entity_type(self):
239 """A string identifying the entity type of this object, e.g.
240 'application' or 'unit', etc.
241
242 """
243 return self.__class__.__name__.lower()
244
245 @property
246 def current(self):
247 """Return True if this object represents the current state of the
248 entity in the underlying model.
249
250 This will be True except when the object represents an entity at a
251 non-latest state in history, e.g. if the object was obtained by calling
252 .previous() on another object.
253
254 """
255 return self._history_index == -1
256
257 @property
258 def dead(self):
259 """Returns True if this entity no longer exists in the underlying
260 model.
261
262 """
263 return (
264 self.data is None or
265 self.model.state.entity_data(
266 self.entity_type, self.entity_id, -1) is None
267 )
268
269 @property
270 def alive(self):
271 """Returns True if this entity still exists in the underlying
272 model.
273
274 """
275 return not self.dead
276
277 @property
278 def data(self):
279 """The data dictionary for this entity.
280
281 """
282 return self.model.state.entity_data(
283 self.entity_type, self.entity_id, self._history_index)
284
285 def previous(self):
286 """Return a copy of this object as was at its previous state in
287 history.
288
289 Returns None if this object is new (and therefore has no history).
290
291 The returned object is always "disconnected", i.e. does not receive
292 live updates.
293
294 """
295 return self.model.state.get_entity(
296 self.entity_type, self.entity_id, self._history_index - 1,
297 connected=False)
298
299 def next(self):
300 """Return a copy of this object at its next state in
301 history.
302
303 Returns None if this object is already the latest.
304
305 The returned object is "disconnected", i.e. does not receive
306 live updates, unless it is current (latest).
307
308 """
309 if self._history_index == -1:
310 return None
311
312 new_index = self._history_index + 1
313 connected = (
314 new_index == len(self.model.state.entity_history(
315 self.entity_type, self.entity_id)) - 1
316 )
317 return self.model.state.get_entity(
318 self.entity_type, self.entity_id, self._history_index - 1,
319 connected=connected)
320
321 def latest(self):
322 """Return a copy of this object at its current state in the model.
323
324 Returns self if this object is already the latest.
325
326 The returned object is always "connected", i.e. receives
327 live updates from the model.
328
329 """
330 if self._history_index == -1:
331 return self
332
333 return self.model.state.get_entity(self.entity_type, self.entity_id)
334
335
336 class Model(object):
337 def __init__(self, loop=None):
338 """Instantiate a new connected Model.
339
340 :param loop: an asyncio event loop
341
342 """
343 self.loop = loop or asyncio.get_event_loop()
344 self.connection = None
345 self.observers = weakref.WeakValueDictionary()
346 self.state = ModelState(self)
347 self._watcher_task = None
348 self._watch_shutdown = asyncio.Event(loop=loop)
349 self._watch_received = asyncio.Event(loop=loop)
350 self._charmstore = CharmStore(self.loop)
351
352 async def connect_current(self):
353 """Connect to the current Juju model.
354
355 """
356 self.connection = await connection.Connection.connect_current()
357 self._watch()
358 await self._watch_received.wait()
359
360 async def disconnect(self):
361 """Shut down the watcher task and close websockets.
362
363 """
364 self._stop_watching()
365 if self.connection and self.connection.is_open:
366 await self._watch_shutdown.wait()
367 log.debug('Closing model connection')
368 await self.connection.close()
369 self.connection = None
370
371 def all_units_idle(self):
372 """Return True if all units are idle.
373
374 """
375 for unit in self.units.values():
376 unit_status = unit.data['agent-status']['current']
377 if unit_status != 'idle':
378 return False
379 return True
380
381 async def reset(self, force=False):
382 """Reset the model to a clean state.
383
384 :param bool force: Force-terminate machines.
385
386 This returns only after the model has reached a clean state. "Clean"
387 means no applications or machines exist in the model.
388
389 """
390 log.debug('Resetting model')
391 for app in self.applications.values():
392 await app.destroy()
393 for machine in self.machines.values():
394 await machine.destroy(force=force)
395 await self.block_until(
396 lambda: len(self.machines) == 0
397 )
398 self.state.clear()
399
400 async def block_until(self, *conditions, timeout=None):
401 """Return only after all conditions are true.
402
403 """
404 async def _block():
405 while not all(c() for c in conditions):
406 await asyncio.sleep(0)
407 await asyncio.wait_for(_block(), timeout)
408
409 @property
410 def applications(self):
411 """Return a map of application-name:Application for all applications
412 currently in the model.
413
414 """
415 return self.state.applications
416
417 @property
418 def machines(self):
419 """Return a map of machine-id:Machine for all machines currently in
420 the model.
421
422 """
423 return self.state.machines
424
425 @property
426 def units(self):
427 """Return a map of unit-id:Unit for all units currently in
428 the model.
429
430 """
431 return self.state.units
432
433 def add_observer(
434 self, callable_, entity_type=None, action=None, entity_id=None):
435 """Register an "on-model-change" callback
436
437 Once the model is connected, ``callable_``
438 will be called each time the model changes. callable_ should
439 be Awaitable and accept the following positional arguments:
440
441 delta - An instance of :class:`juju.delta.EntityDelta`
442 containing the raw delta data recv'd from the Juju
443 websocket.
444
445 old_obj - If the delta modifies an existing object in the model,
446 old_obj will be a copy of that object, as it was before the
447 delta was applied. Will be None if the delta creates a new
448 entity in the model.
449
450 new_obj - A copy of the new or updated object, after the delta
451 is applied. Will be None if the delta removes an entity
452 from the model.
453
454 model - The :class:`Model` itself.
455
456 Events for which ``callable_`` is called can be specified by passing
457 entity_type, action, and/or id_ filter criteria, e.g.:
458
459 add_observer(
460 myfunc, entity_type='application', action='add', id_='ubuntu')
461
462 """
463 observer = _Observer(callable_, entity_type, action, entity_id)
464 self.observers[observer] = callable_
465
466 def _watch(self):
467 """Start an asynchronous watch against this model.
468
469 See :meth:`add_observer` to register an onchange callback.
470
471 """
472 async def _start_watch():
473 self._watch_shutdown.clear()
474 try:
475 allwatcher = watcher.AllWatcher()
476 self._watch_conn = await self.connection.clone()
477 allwatcher.connect(self._watch_conn)
478 while True:
479 results = await allwatcher.Next()
480 for delta in results.deltas:
481 delta = get_entity_delta(delta)
482 old_obj, new_obj = self.state.apply_delta(delta)
483 # XXX: Might not want to shield at this level
484 # We are shielding because when the watcher is
485 # canceled (on disconnect()), we don't want all of
486 # its children (every observer callback) to be
487 # canceled with it. So we shield them. But this means
488 # they can *never* be canceled.
489 await asyncio.shield(
490 self._notify_observers(delta, old_obj, new_obj))
491 self._watch_received.set()
492 except CancelledError:
493 log.debug('Closing watcher connection')
494 await self._watch_conn.close()
495 self._watch_shutdown.set()
496 self._watch_conn = None
497
498 log.debug('Starting watcher task')
499 self._watcher_task = self.loop.create_task(_start_watch())
500
501 def _stop_watching(self):
502 """Stop the asynchronous watch against this model.
503
504 """
505 log.debug('Stopping watcher task')
506 if self._watcher_task:
507 self._watcher_task.cancel()
508
509 async def _notify_observers(self, delta, old_obj, new_obj):
510 """Call observing callbacks, notifying them of a change in model state
511
512 :param delta: The raw change from the watcher
513 (:class:`juju.client.overrides.Delta`)
514 :param old_obj: The object in the model that this delta updates.
515 May be None.
516 :param new_obj: The object in the model that is created or updated
517 by applying this delta.
518
519 """
520 if not old_obj:
521 delta.type = 'add'
522
523 log.debug(
524 'Model changed: %s %s %s',
525 delta.entity, delta.type, delta.get_id())
526
527 for o in self.observers:
528 if o.cares_about(delta.entity, delta.type, delta.get_id()):
529 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
530
531 async def _wait_for_new(self, entity_type, entity_id):
532 """Wait for a new object to appear in the Model and return it.
533
534 Waits for an object of type ``entity_type`` with id ``entity_id``.
535
536 This coroutine blocks until the new object appears in the model.
537
538 """
539 entity_added = asyncio.Event(loop=self.loop)
540
541 async def callback(delta, old, new, model):
542 entity_added.set()
543 self.add_observer(callback, entity_type, 'add', entity_id)
544 await entity_added.wait()
545 return self.state._live_entity_map(entity_type)[entity_id]
546
547 def add_machine(
548 self, spec=None, constraints=None, disks=None, series=None,
549 count=1):
550 """Start a new, empty machine and optionally a container, or add a
551 container to a machine.
552
553 :param str spec: Machine specification
554 Examples::
555
556 (None) - starts a new machine
557 'lxc' - starts a new machine with on lxc container
558 'lxc:4' - starts a new lxc container on machine 4
559 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
560 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
561 'maas2.name' - acquire machine maas2.name on MAAS
562 :param constraints: Machine constraints
563 :type constraints: :class:`juju.Constraints`
564 :param list disks: List of disk :class:`constraints <juju.Constraints>`
565 :param str series: Series
566 :param int count: Number of machines to deploy
567
568 Supported container types are: lxc, lxd, kvm
569
570 When deploying a container to an existing machine, constraints cannot
571 be used.
572
573 """
574 pass
575 add_machines = add_machine
576
577 async def add_relation(self, relation1, relation2):
578 """Add a relation between two applications.
579
580 :param str relation1: '<application>[:<relation_name>]'
581 :param str relation2: '<application>[:<relation_name>]'
582
583 """
584 app_facade = client.ApplicationFacade()
585 app_facade.connect(self.connection)
586
587 log.debug(
588 'Adding relation %s <-> %s', relation1, relation2)
589
590 return await app_facade.AddRelation([relation1, relation2])
591
592 def add_space(self, name, *cidrs):
593 """Add a new network space.
594
595 Adds a new space with the given name and associates the given
596 (optional) list of existing subnet CIDRs with it.
597
598 :param str name: Name of the space
599 :param \*cidrs: Optional list of existing subnet CIDRs
600
601 """
602 pass
603
604 def add_ssh_key(self, key):
605 """Add a public SSH key to this model.
606
607 :param str key: The public ssh key
608
609 """
610 pass
611 add_ssh_keys = add_ssh_key
612
613 def add_subnet(self, cidr_or_id, space, *zones):
614 """Add an existing subnet to this model.
615
616 :param str cidr_or_id: CIDR or provider ID of the existing subnet
617 :param str space: Network space with which to associate
618 :param str \*zones: Zone(s) in which the subnet resides
619
620 """
621 pass
622
623 def get_backups(self):
624 """Retrieve metadata for backups in this model.
625
626 """
627 pass
628
629 def block(self, *commands):
630 """Add a new block to this model.
631
632 :param str \*commands: The commands to block. Valid values are
633 'all-changes', 'destroy-model', 'remove-object'
634
635 """
636 pass
637
638 def get_blocks(self):
639 """List blocks for this model.
640
641 """
642 pass
643
644 def get_cached_images(self, arch=None, kind=None, series=None):
645 """Return a list of cached OS images.
646
647 :param str arch: Filter by image architecture
648 :param str kind: Filter by image kind, e.g. 'lxd'
649 :param str series: Filter by image series, e.g. 'xenial'
650
651 """
652 pass
653
654 def create_backup(self, note=None, no_download=False):
655 """Create a backup of this model.
656
657 :param str note: A note to store with the backup
658 :param bool no_download: Do not download the backup archive
659 :return str: Path to downloaded archive
660
661 """
662 pass
663
664 def create_storage_pool(self, name, provider_type, **pool_config):
665 """Create or define a storage pool.
666
667 :param str name: Name to give the storage pool
668 :param str provider_type: Pool provider type
669 :param \*\*pool_config: key/value pool configuration pairs
670
671 """
672 pass
673
674 def debug_log(
675 self, no_tail=False, exclude_module=None, include_module=None,
676 include=None, level=None, limit=0, lines=10, replay=False,
677 exclude=None):
678 """Get log messages for this model.
679
680 :param bool no_tail: Stop after returning existing log messages
681 :param list exclude_module: Do not show log messages for these logging
682 modules
683 :param list include_module: Only show log messages for these logging
684 modules
685 :param list include: Only show log messages for these entities
686 :param str level: Log level to show, valid options are 'TRACE',
687 'DEBUG', 'INFO', 'WARNING', 'ERROR,
688 :param int limit: Return this many of the most recent (possibly
689 filtered) lines are shown
690 :param int lines: Yield this many of the most recent lines, and keep
691 yielding
692 :param bool replay: Yield the entire log, and keep yielding
693 :param list exclude: Do not show log messages for these entities
694
695 """
696 pass
697
698 async def deploy(
699 self, entity_url, service_name=None, bind=None, budget=None,
700 channel=None, config=None, constraints=None, force=False,
701 num_units=1, plan=None, resources=None, series=None, storage=None,
702 to=None):
703 """Deploy a new service or bundle.
704
705 :param str entity_url: Charm or bundle url
706 :param str service_name: Name to give the service
707 :param dict bind: <charm endpoint>:<network space> pairs
708 :param dict budget: <budget name>:<limit> pairs
709 :param str channel: Charm store channel from which to retrieve
710 the charm or bundle, e.g. 'development'
711 :param dict config: Charm configuration dictionary
712 :param constraints: Service constraints
713 :type constraints: :class:`juju.Constraints`
714 :param bool force: Allow charm to be deployed to a machine running
715 an unsupported series
716 :param int num_units: Number of units to deploy
717 :param str plan: Plan under which to deploy charm
718 :param dict resources: <resource name>:<file path> pairs
719 :param str series: Series on which to deploy
720 :param dict storage: Storage constraints TODO how do these look?
721 :param str to: Placement directive, e.g.::
722
723 '23' - machine 23
724 'lxc:7' - new lxc container on machine 7
725 '24/lxc/3' - lxc container 3 or machine 24
726
727 If None, a new machine is provisioned.
728
729
730 TODO::
731
732 - entity_url must have a revision; look up latest automatically if
733 not provided by caller
734 - service_name is required; fill this in automatically if not
735 provided by caller
736 - series is required; how do we pick a default?
737
738 """
739 if constraints:
740 constraints = client.Value(**constraints)
741
742 if to:
743 placement = [
744 client.Placement(**p) for p in to
745 ]
746 else:
747 placement = []
748
749 if storage:
750 storage = {
751 k: client.Constraints(**v)
752 for k, v in storage.items()
753 }
754
755 entity_id = await self.charmstore.entityId(entity_url)
756
757 app_facade = client.ApplicationFacade()
758 client_facade = client.ClientFacade()
759 app_facade.connect(self.connection)
760 client_facade.connect(self.connection)
761
762 if 'bundle/' in entity_id:
763 handler = BundleHandler(self)
764 await handler.fetch_plan(entity_id)
765 await handler.execute_plan()
766 else:
767 log.debug(
768 'Deploying %s', entity_id)
769
770 await client_facade.AddCharm(channel, entity_id)
771 app = client.ApplicationDeploy(
772 application=service_name,
773 channel=channel,
774 charm_url=entity_id,
775 config=config,
776 constraints=constraints,
777 endpoint_bindings=bind,
778 num_units=num_units,
779 placement=placement,
780 resources=resources,
781 series=series,
782 storage=storage,
783 )
784
785 await app_facade.Deploy([app])
786 return await self._wait_for_new('application', service_name)
787
788 def destroy(self):
789 """Terminate all machines and resources for this model.
790
791 """
792 pass
793
794 def get_backup(self, archive_id):
795 """Download a backup archive file.
796
797 :param str archive_id: The id of the archive to download
798 :return str: Path to the archive file
799
800 """
801 pass
802
803 def enable_ha(
804 self, num_controllers=0, constraints=None, series=None, to=None):
805 """Ensure sufficient controllers exist to provide redundancy.
806
807 :param int num_controllers: Number of controllers to make available
808 :param constraints: Constraints to apply to the controller machines
809 :type constraints: :class:`juju.Constraints`
810 :param str series: Series of the controller machines
811 :param list to: Placement directives for controller machines, e.g.::
812
813 '23' - machine 23
814 'lxc:7' - new lxc container on machine 7
815 '24/lxc/3' - lxc container 3 or machine 24
816
817 If None, a new machine is provisioned.
818
819 """
820 pass
821
822 def get_config(self):
823 """Return the configuration settings for this model.
824
825 """
826 pass
827
828 def get_constraints(self):
829 """Return the machine constraints for this model.
830
831 """
832 pass
833
834 def grant(self, username, acl='read'):
835 """Grant a user access to this model.
836
837 :param str username: Username
838 :param str acl: Access control ('read' or 'write')
839
840 """
841 pass
842
843 def import_ssh_key(self, identity):
844 """Add a public SSH key from a trusted indentity source to this model.
845
846 :param str identity: User identity in the form <lp|gh>:<username>
847
848 """
849 pass
850 import_ssh_keys = import_ssh_key
851
852 def get_machines(self, machine, utc=False):
853 """Return list of machines in this model.
854
855 :param str machine: Machine id, e.g. '0'
856 :param bool utc: Display time as UTC in RFC3339 format
857
858 """
859 pass
860
861 def get_shares(self):
862 """Return list of all users with access to this model.
863
864 """
865 pass
866
867 def get_spaces(self):
868 """Return list of all known spaces, including associated subnets.
869
870 """
871 pass
872
873 def get_ssh_key(self):
874 """Return known SSH keys for this model.
875
876 """
877 pass
878 get_ssh_keys = get_ssh_key
879
880 def get_storage(self, filesystem=False, volume=False):
881 """Return details of storage instances.
882
883 :param bool filesystem: Include filesystem storage
884 :param bool volume: Include volume storage
885
886 """
887 pass
888
889 def get_storage_pools(self, names=None, providers=None):
890 """Return list of storage pools.
891
892 :param list names: Only include pools with these names
893 :param list providers: Only include pools for these providers
894
895 """
896 pass
897
898 def get_subnets(self, space=None, zone=None):
899 """Return list of known subnets.
900
901 :param str space: Only include subnets in this space
902 :param str zone: Only include subnets in this zone
903
904 """
905 pass
906
907 def remove_blocks(self):
908 """Remove all blocks from this model.
909
910 """
911 pass
912
913 def remove_backup(self, backup_id):
914 """Delete a backup.
915
916 :param str backup_id: The id of the backup to remove
917
918 """
919 pass
920
921 def remove_cached_images(self, arch=None, kind=None, series=None):
922 """Remove cached OS images.
923
924 :param str arch: Architecture of the images to remove
925 :param str kind: Image kind to remove, e.g. 'lxd'
926 :param str series: Image series to remove, e.g. 'xenial'
927
928 """
929 pass
930
931 def remove_machine(self, *machine_ids):
932 """Remove a machine from this model.
933
934 :param str \*machine_ids: Ids of the machines to remove
935
936 """
937 pass
938 remove_machines = remove_machine
939
940 def remove_ssh_key(self, *keys):
941 """Remove a public SSH key(s) from this model.
942
943 :param str \*keys: Keys to remove
944
945 """
946 pass
947 remove_ssh_keys = remove_ssh_key
948
949 def restore_backup(
950 self, bootstrap=False, constraints=None, archive=None,
951 backup_id=None, upload_tools=False):
952 """Restore a backup archive to a new controller.
953
954 :param bool bootstrap: Bootstrap a new state machine
955 :param constraints: Model constraints
956 :type constraints: :class:`juju.Constraints`
957 :param str archive: Path to backup archive to restore
958 :param str backup_id: Id of backup to restore
959 :param bool upload_tools: Upload tools if bootstrapping a new machine
960
961 """
962 pass
963
964 def retry_provisioning(self):
965 """Retry provisioning for failed machines.
966
967 """
968 pass
969
970 def revoke(self, username, acl='read'):
971 """Revoke a user's access to this model.
972
973 :param str username: Username to revoke
974 :param str acl: Access control ('read' or 'write')
975
976 """
977 pass
978
979 def run(self, command, timeout=None):
980 """Run command on all machines in this model.
981
982 :param str command: The command to run
983 :param int timeout: Time to wait before command is considered failed
984
985 """
986 pass
987
988 def set_config(self, **config):
989 """Set configuration keys on this model.
990
991 :param \*\*config: Config key/values
992
993 """
994 pass
995
996 def set_constraints(self, constraints):
997 """Set machine constraints on this model.
998
999 :param :class:`juju.Constraints` constraints: Machine constraints
1000
1001 """
1002 pass
1003
1004 def get_action_output(self, action_uuid, wait=-1):
1005 """Get the results of an action by ID.
1006
1007 :param str action_uuid: Id of the action
1008 :param int wait: Time in seconds to wait for action to complete
1009
1010 """
1011 pass
1012
1013 def get_action_status(self, uuid_or_prefix=None, name=None):
1014 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1015
1016 :param str uuid_or_prefix: Filter by action uuid or prefix
1017 :param str name: Filter by action name
1018
1019 """
1020 pass
1021
1022 def get_budget(self, budget_name):
1023 """Get budget usage info.
1024
1025 :param str budget_name: Name of budget
1026
1027 """
1028 pass
1029
1030 def get_status(self, filter_=None, utc=False):
1031 """Return the status of the model.
1032
1033 :param str filter_: Service or unit name or wildcard ('*')
1034 :param bool utc: Display time as UTC in RFC3339 format
1035
1036 """
1037 pass
1038 status = get_status
1039
1040 def sync_tools(
1041 self, all_=False, destination=None, dry_run=False, public=False,
1042 source=None, stream=None, version=None):
1043 """Copy Juju tools into this model.
1044
1045 :param bool all_: Copy all versions, not just the latest
1046 :param str destination: Path to local destination directory
1047 :param bool dry_run: Don't do the actual copy
1048 :param bool public: Tools are for a public cloud, so generate mirrors
1049 information
1050 :param str source: Path to local source directory
1051 :param str stream: Simplestreams stream for which to sync metadata
1052 :param str version: Copy a specific major.minor version
1053
1054 """
1055 pass
1056
1057 def unblock(self, *commands):
1058 """Unblock an operation that would alter this model.
1059
1060 :param str \*commands: The commands to unblock. Valid values are
1061 'all-changes', 'destroy-model', 'remove-object'
1062
1063 """
1064 pass
1065
1066 def unset_config(self, *keys):
1067 """Unset configuration on this model.
1068
1069 :param str \*keys: The keys to unset
1070
1071 """
1072 pass
1073
1074 def upgrade_gui(self):
1075 """Upgrade the Juju GUI for this model.
1076
1077 """
1078 pass
1079
1080 def upgrade_juju(
1081 self, dry_run=False, reset_previous_upgrade=False,
1082 upload_tools=False, version=None):
1083 """Upgrade Juju on all machines in a model.
1084
1085 :param bool dry_run: Don't do the actual upgrade
1086 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1087 upgrade status
1088 :param bool upload_tools: Upload local version of tools
1089 :param str version: Upgrade to a specific version
1090
1091 """
1092 pass
1093
1094 def upload_backup(self, archive_path):
1095 """Store a backup archive remotely in Juju.
1096
1097 :param str archive_path: Path to local archive
1098
1099 """
1100 pass
1101
1102 @property
1103 def charmstore(self):
1104 return self._charmstore
1105
1106
1107 class BundleHandler(object):
1108 """
1109 Handle bundles by using the API to translate bundle YAML into a plan of
1110 steps and then dispatching each of those using the API.
1111 """
1112 def __init__(self, model):
1113 self.model = model
1114 self.charmstore = model.charmstore
1115 self.plan = []
1116 self.references = {}
1117 self._units_by_app = {}
1118 for unit_name, unit in model.units.items():
1119 app_units = self._units_by_app.setdefault(unit.application, [])
1120 app_units.append(unit_name)
1121 self.client_facade = client.ClientFacade()
1122 self.client_facade.connect(model.connection)
1123 self.app_facade = client.ApplicationFacade()
1124 self.app_facade.connect(model.connection)
1125 self.ann_facade = client.AnnotationsFacade()
1126 self.ann_facade.connect(model.connection)
1127
1128 async def fetch_plan(self, entity_id):
1129 yaml = await self.charmstore.files(entity_id,
1130 filename='bundle.yaml',
1131 read_file=True)
1132 self.plan = await self.client_facade.GetBundleChanges(yaml)
1133
1134 async def execute_plan(self):
1135 for step in self.plan.changes:
1136 method = getattr(self, step.method)
1137 result = await method(*step.args)
1138 self.references[step.id_] = result
1139
1140 def resolve(self, reference):
1141 if reference and reference.startswith('$'):
1142 reference = self.references[reference[1:]]
1143 return reference
1144
1145 async def addCharm(self, charm, series):
1146 """
1147 :param charm string:
1148 Charm holds the URL of the charm to be added.
1149
1150 :param series string:
1151 Series holds the series of the charm to be added
1152 if the charm default is not sufficient.
1153 """
1154 entity_id = await self.charmstore.entityId(charm)
1155 log.debug('Adding %s', entity_id)
1156 await self.client_facade.AddCharm(None, entity_id)
1157 return entity_id
1158
1159 async def addMachines(self, series, constraints, container_type,
1160 parent_id):
1161 """
1162 :param series string:
1163 Series holds the optional machine OS series.
1164
1165 :param constraints string:
1166 Constraints holds the optional machine constraints.
1167
1168 :param Container_type string:
1169 ContainerType optionally holds the type of the container (for
1170 instance ""lxc" or kvm"). It is not specified for top level
1171 machines.
1172
1173 :param parent_id string:
1174 ParentId optionally holds a placeholder pointing to another machine
1175 change or to a unit change. This value is only specified in the
1176 case this machine is a container, in which case also ContainerType
1177 is set.
1178 """
1179 params = client.AddMachineParams(
1180 series=series,
1181 constraints=constraints,
1182 container_type=container_type,
1183 parent_id=self.resolve(parent_id),
1184 )
1185 results = await self.client_facade.AddMachines(params)
1186 log.debug('Added new machine %s', results[0].machine)
1187 return results[0].machine
1188
1189 async def addRelation(self, endpoint1, endpoint2):
1190 """
1191 :param endpoint1 string:
1192 :param endpoint2 string:
1193 Endpoint1 and Endpoint2 hold relation endpoints in the
1194 "application:interface" form, where the application is always a
1195 placeholder pointing to an application change, and the interface is
1196 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1197 """
1198 endpoints = [endpoint1, endpoint2]
1199 # resolve indirect references
1200 for i in range(len(endpoints)):
1201 parts = endpoints[i].split(':')
1202 parts[0] = self.resolve(parts[0])
1203 endpoints[i] = ':'.join(parts)
1204 try:
1205 await self.app_facade.AddRelation(endpoints)
1206 log.debug('Added relation %s <-> %s', *endpoints)
1207 except JujuAPIError as e:
1208 if 'relation already exists' not in e.message:
1209 raise
1210 log.debug('Relation %s <-> %s already exists', *endpoints)
1211 return None
1212
1213 async def deploy(self, charm, series, application, options, constraints,
1214 storage, endpoint_bindings, resources):
1215 """
1216 :param charm string:
1217 Charm holds the URL of the charm to be used to deploy this
1218 application.
1219
1220 :param series string:
1221 Series holds the series of the application to be deployed
1222 if the charm default is not sufficient.
1223
1224 :param application string:
1225 Application holds the application name.
1226
1227 :param options map[string]interface{}:
1228 Options holds application options.
1229
1230 :param constraints string:
1231 Constraints holds the optional application constraints.
1232
1233 :param storage map[string]string:
1234 Storage holds the optional storage constraints.
1235
1236 :param endpoint_bindings map[string]string:
1237 EndpointBindings holds the optional endpoint bindings
1238
1239 :param resources map[string]int:
1240 Resources identifies the revision to use for each resource
1241 of the application's charm.
1242 """
1243 # resolve indirect references
1244 charm = self.resolve(charm)
1245 # stringify all config values for API
1246 options = {k: str(v) for k, v in options.items()}
1247 # build param object
1248 app = client.ApplicationDeploy(
1249 charm_url=charm,
1250 series=series,
1251 application=application,
1252 config=options,
1253 constraints=constraints,
1254 storage=storage,
1255 endpoint_bindings=endpoint_bindings,
1256 resources=resources,
1257 )
1258 # do the do
1259 log.debug('Deploying %s', charm)
1260 await self.app_facade.Deploy([app])
1261 return application
1262
1263 async def addUnit(self, application, to):
1264 """
1265 :param application string:
1266 Application holds the application placeholder name for which a unit
1267 is added.
1268
1269 :param to string:
1270 To holds the optional location where to add the unit, as a
1271 placeholder pointing to another unit change or to a machine change.
1272 """
1273 application = self.resolve(application)
1274 placement = self.resolve(to)
1275 if self._units_by_app.get(application):
1276 # enough units for this application already exist;
1277 # claim one, and carry on
1278 # NB: this should probably honor placement, but the juju client
1279 # doesn't, so we're not bothering, either
1280 unit_name = self._units_by_app[application].pop()
1281 log.debug('Reusing unit %s for %s', unit_name, application)
1282 return unit_name
1283 log.debug('Adding unit of %s%s',
1284 application,
1285 (' to %s' % placement) if placement else '')
1286 result = await self.app_facade.AddUnits(
1287 application=application,
1288 placement=placement,
1289 num_units=1,
1290 )
1291 return result.units[0]
1292
1293 async def expose(self, application):
1294 """
1295 :param application string:
1296 Application holds the placeholder name of the application that must
1297 be exposed.
1298 """
1299 application = self.resolve(application)
1300 log.debug('Exposing %s', application)
1301 await self.app_facade.Expose(application)
1302 return None
1303
1304 async def setAnnotations(self, id_, entity_type, annotations):
1305 """
1306 :param id_ string:
1307 Id is the placeholder for the application or machine change
1308 corresponding to the entity to be annotated.
1309
1310 :param entity_type EntityType:
1311 EntityType holds the type of the entity, "application" or
1312 "machine".
1313
1314 :param annotations map[string]string:
1315 Annotations holds the annotations as key/value pairs.
1316 """
1317 entity_id = self.resolve(id_)
1318 log.debug('Updating annotations of %s', entity_id)
1319 ann = client.EntityAnnotations(
1320 entity=entity_id,
1321 annotations=annotations,
1322 )
1323 await self.ann_facade.Set([ann])
1324 return None
1325
1326
1327 class CharmStore(object):
1328 """
1329 Async wrapper around theblues.charmstore.CharmStore
1330 """
1331 def __init__(self, loop):
1332 self.loop = loop
1333 self._cs = charmstore.CharmStore()
1334
1335 def __getattr__(self, name):
1336 """
1337 Wrap method calls in coroutines that use run_in_executor to make them
1338 async.
1339 """
1340 attr = getattr(self._cs, name)
1341 if not callable(attr):
1342 wrapper = partial(getattr, self._cs, name)
1343 setattr(self, name, wrapper)
1344 else:
1345 async def coro(*args, **kwargs):
1346 method = partial(attr, *args, **kwargs)
1347 return await self.loop.run_in_executor(None, method)
1348 setattr(self, name, coro)
1349 wrapper = coro
1350 return wrapper