fe07f8099a571d4bdb6b4401bec8b6a54dd62fbd
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import collections
3 import logging
4 from concurrent.futures import CancelledError
5 from functools import partial
6
7 from theblues import charmstore
8
9 from .client import client
10 from .client import watcher
11 from .client import connection
12 from .delta import get_entity_delta
13 from .delta import get_entity_class
14 from .exceptions import DeadEntityException
15 from .errors import JujuAPIError
16
17 log = logging.getLogger(__name__)
18
19
20 class ModelObserver(object):
21 async def __call__(self, delta, old, new, model):
22 if old is None and new is not None:
23 type_ = 'add'
24 else:
25 type_ = delta.type
26 handler_name = 'on_{}_{}'.format(delta.entity, type_)
27 method = getattr(self, handler_name, self.on_change)
28 await method(delta, old, new, model)
29
30 async def on_change(self, delta, old, new, model):
31 pass
32
33
34 class ModelState(object):
35 """Holds the state of the model, including the delta history of all
36 entities in the model.
37
38 """
39 def __init__(self, model):
40 self.model = model
41 self.state = dict()
42
43 def _live_entity_map(self, entity_type):
44 """Return an id:Entity map of all the living entities of
45 type ``entity_type``.
46
47 """
48 return {
49 entity_id: self.get_entity(entity_type, entity_id)
50 for entity_id, history in self.state.get(entity_type, {}).items()
51 if history[-1] is not None
52 }
53
54 @property
55 def applications(self):
56 """Return a map of application-name:Application for all applications
57 currently in the model.
58
59 """
60 return self._live_entity_map('application')
61
62 @property
63 def machines(self):
64 """Return a map of machine-id:Machine for all machines currently in
65 the model.
66
67 """
68 return self._live_entity_map('machine')
69
70 @property
71 def units(self):
72 """Return a map of unit-id:Unit for all units currently in
73 the model.
74
75 """
76 return self._live_entity_map('unit')
77
78 def entity_history(self, entity_type, entity_id):
79 """Return the history deque for an entity.
80
81 """
82 return self.state[entity_type][entity_id]
83
84 def entity_data(self, entity_type, entity_id, history_index):
85 """Return the data dict for an entity at a specific index of its
86 history.
87
88 """
89 return self.entity_history(entity_type, entity_id)[history_index]
90
91 def apply_delta(self, delta):
92 """Apply delta to our state and return a copy of the
93 affected object as it was before and after the update, e.g.:
94
95 old_obj, new_obj = self.apply_delta(delta)
96
97 old_obj may be None if the delta is for the creation of a new object,
98 e.g. a new application or unit is deployed.
99
100 new_obj will never be None, but may be dead (new_obj.dead == True)
101 if the object was deleted as a result of the delta being applied.
102
103 """
104 history = (
105 self.state
106 .setdefault(delta.entity, {})
107 .setdefault(delta.get_id(), collections.deque())
108 )
109
110 history.append(delta.data)
111 if delta.type == 'remove':
112 history.append(None)
113
114 entity = self.get_entity(delta.entity, delta.get_id())
115 return entity.previous(), entity
116
117 def get_entity(
118 self, entity_type, entity_id, history_index=-1, connected=True):
119 """Return an object instance representing the entity created or
120 updated by ``delta``
121
122 """
123 if history_index < 0 and history_index != -1:
124 history_index += len(self.entity_history(entity_type, entity_id))
125
126 try:
127 self.entity_data(entity_type, entity_id, history_index)
128 except IndexError:
129 return None
130
131 entity_class = get_entity_class(entity_type)
132 return entity_class(
133 entity_id, self.model, history_index=history_index,
134 connected=connected)
135
136
137 class ModelEntity(object):
138 """An object in the Model tree"""
139
140 def __init__(self, entity_id, model, history_index=-1, connected=True):
141 """Initialize a new entity
142
143 :param entity_id str: The unique id of the object in the model
144 :param model: The model instance in whose object tree this
145 entity resides
146 :history_index int: The index of this object's state in the model's
147 history deque for this entity
148 :connected bool: Flag indicating whether this object gets live updates
149 from the model.
150
151 """
152 self.entity_id = entity_id
153 self.model = model
154 self._history_index = history_index
155 self.connected = connected
156 self.connection = model.connection
157
158 def __getattr__(self, name):
159 """Fetch object attributes from the underlying data dict held in the
160 model.
161
162 """
163 if self.data is None:
164 raise DeadEntityException(
165 "Entity {}:{} is dead - its attributes can no longer be "
166 "accessed. Use the .previous() method on this object to get "
167 "a copy of the object at its previous state.".format(
168 self.entity_type, self.entity_id))
169 return self.data[name]
170
171 @property
172 def entity_type(self):
173 """A string identifying the entity type of this object, e.g.
174 'application' or 'unit', etc.
175
176 """
177 return self.__class__.__name__.lower()
178
179 @property
180 def current(self):
181 """Return True if this object represents the current state of the
182 entity in the underlying model.
183
184 This will be True except when the object represents an entity at a
185 prior state in history, e.g. if the object was obtained by calling
186 .previous() on another object.
187
188 """
189 return self._history_index == -1
190
191 @property
192 def dead(self):
193 """Returns True if this entity no longer exists in the underlying
194 model.
195
196 """
197 return (
198 self.data is None or
199 self.model.state.entity_data(
200 self.entity_type, self.entity_id, -1) is None
201 )
202
203 @property
204 def alive(self):
205 """Returns True if this entity still exists in the underlying
206 model.
207
208 """
209 return not self.dead
210
211 @property
212 def data(self):
213 """The data dictionary for this entity.
214
215 """
216 return self.model.state.entity_data(
217 self.entity_type, self.entity_id, self._history_index)
218
219 def previous(self):
220 """Return a copy of this object as was at its previous state in
221 history.
222
223 Returns None if this object is new (and therefore has no history).
224
225 The returned object is always "disconnected", i.e. does not receive
226 live updates.
227
228 """
229 return self.model.state.get_entity(
230 self.entity_type, self.entity_id, self._history_index - 1,
231 connected=False)
232
233 def next(self):
234 """Return a copy of this object at its next state in
235 history.
236
237 Returns None if this object is already the latest.
238
239 The returned object is "disconnected", i.e. does not receive
240 live updates, unless it is current (latest).
241
242 """
243 if self._history_index == -1:
244 return None
245
246 new_index = self._history_index + 1
247 connected = (
248 new_index == len(self.model.state.entity_history(
249 self.entity_type, self.entity_id)) - 1
250 )
251 return self.model.state.get_entity(
252 self.entity_type, self.entity_id, self._history_index - 1,
253 connected=connected)
254
255 def latest(self):
256 """Return a copy of this object at its current state in the model.
257
258 Returns self if this object is already the latest.
259
260 The returned object is always "connected", i.e. receives
261 live updates from the model.
262
263 """
264 if self._history_index == -1:
265 return self
266
267 return self.model.state.get_entity(self.entity_type, self.entity_id)
268
269
270 class Model(object):
271 def __init__(self, loop=None):
272 """Instantiate a new connected Model.
273
274 :param loop: an asyncio event loop
275
276 """
277 self.loop = loop or asyncio.get_event_loop()
278 self.connection = None
279 self.observers = set()
280 self.state = ModelState(self)
281 self._watcher_task = None
282 self._watch_shutdown = asyncio.Event(loop=loop)
283 self._watch_received = asyncio.Event(loop=loop)
284 self._charmstore = CharmStore(self.loop)
285
286 async def connect_current(self):
287 """Connect to the current Juju model.
288
289 """
290 self.connection = await connection.Connection.connect_current()
291 self._watch()
292 await self._watch_received.wait()
293
294 async def disconnect(self):
295 """Shut down the watcher task and close websockets.
296
297 """
298 self._stop_watching()
299 if self.connection and self.connection.is_open:
300 await self._watch_shutdown.wait()
301 log.debug('Closing model connection')
302 await self.connection.close()
303 self.connection = None
304
305 def all_units_idle(self):
306 """Return True if all units are idle.
307
308 """
309 for unit in self.units.values():
310 unit_status = unit.data['agent-status']['current']
311 if unit_status != 'idle':
312 return False
313 return True
314
315 async def reset(self, force=False):
316 """Reset the model to a clean state.
317
318 :param bool force: Force-terminate machines.
319
320 This returns only after the model has reached a clean state. "Clean"
321 means no applications or machines exist in the model.
322
323 """
324 log.debug('Resetting model')
325 for app in self.applications.values():
326 await app.destroy()
327 for machine in self.machines.values():
328 await machine.destroy(force=force)
329 await self.block_until(
330 lambda: len(self.machines) == 0
331 )
332
333 async def block_until(self, *conditions, timeout=None):
334 """Return only after all conditions are true.
335
336 """
337 async def _block():
338 while not all(c() for c in conditions):
339 await asyncio.sleep(.1)
340 await asyncio.wait_for(_block(), timeout)
341
342 @property
343 def applications(self):
344 """Return a map of application-name:Application for all applications
345 currently in the model.
346
347 """
348 return self.state.applications
349
350 @property
351 def machines(self):
352 """Return a map of machine-id:Machine for all machines currently in
353 the model.
354
355 """
356 return self.state.machines
357
358 @property
359 def units(self):
360 """Return a map of unit-id:Unit for all units currently in
361 the model.
362
363 """
364 return self.state.units
365
366 def add_observer(self, callable_):
367 """Register an "on-model-change" callback
368
369 Once a watch is started (Model.watch() is called), ``callable_``
370 will be called each time the model changes. callable_ should
371 be Awaitable and accept the following positional arguments:
372
373 delta - An instance of :class:`juju.delta.EntityDelta`
374 containing the raw delta data recv'd from the Juju
375 websocket.
376
377 old_obj - If the delta modifies an existing object in the model,
378 old_obj will be a copy of that object, as it was before the
379 delta was applied. Will be None if the delta creates a new
380 entity in the model.
381
382 new_obj - A copy of the new or updated object, after the delta
383 is applied. Will be None if the delta removes an entity
384 from the model.
385
386 model - The :class:`Model` itself.
387
388 """
389 self.observers.add(callable_)
390
391 def _watch(self):
392 """Start an asynchronous watch against this model.
393
394 See :meth:`add_observer` to register an onchange callback.
395
396 """
397 async def _start_watch():
398 self._watch_shutdown.clear()
399 try:
400 allwatcher = watcher.AllWatcher()
401 self._watch_conn = await self.connection.clone()
402 allwatcher.connect(self._watch_conn)
403 while True:
404 results = await allwatcher.Next()
405 for delta in results.deltas:
406 delta = get_entity_delta(delta)
407 old_obj, new_obj = self.state.apply_delta(delta)
408 # XXX: Might not want to shield at this level
409 # We are shielding because when the watcher is
410 # canceled (on disconnect()), we don't want all of
411 # its children (every observer callback) to be
412 # canceled with it. So we shield them. But this means
413 # they can *never* be canceled.
414 await asyncio.shield(
415 self._notify_observers(delta, old_obj, new_obj))
416 self._watch_received.set()
417 except CancelledError:
418 log.debug('Closing watcher connection')
419 await self._watch_conn.close()
420 self._watch_shutdown.set()
421 self._watch_conn = None
422
423 log.debug('Starting watcher task')
424 self._watcher_task = self.loop.create_task(_start_watch())
425
426 def _stop_watching(self):
427 """Stop the asynchronous watch against this model.
428
429 """
430 log.debug('Stopping watcher task')
431 if self._watcher_task:
432 self._watcher_task.cancel()
433
434 async def _notify_observers(self, delta, old_obj, new_obj):
435 """Call observing callbacks, notifying them of a change in model state
436
437 :param delta: The raw change from the watcher
438 (:class:`juju.client.overrides.Delta`)
439 :param old_obj: The object in the model that this delta updates.
440 May be None.
441 :param new_obj: The object in the model that is created or updated
442 by applying this delta.
443
444 """
445 log.debug(
446 'Model changed: %s %s %s',
447 delta.entity, delta.type, delta.get_id())
448 for o in self.observers:
449 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
450
451 def add_machine(
452 self, spec=None, constraints=None, disks=None, series=None,
453 count=1):
454 """Start a new, empty machine and optionally a container, or add a
455 container to a machine.
456
457 :param str spec: Machine specification
458 Examples::
459
460 (None) - starts a new machine
461 'lxc' - starts a new machine with on lxc container
462 'lxc:4' - starts a new lxc container on machine 4
463 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
464 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
465 'maas2.name' - acquire machine maas2.name on MAAS
466 :param constraints: Machine constraints
467 :type constraints: :class:`juju.Constraints`
468 :param list disks: List of disk :class:`constraints <juju.Constraints>`
469 :param str series: Series
470 :param int count: Number of machines to deploy
471
472 Supported container types are: lxc, lxd, kvm
473
474 When deploying a container to an existing machine, constraints cannot
475 be used.
476
477 """
478 pass
479 add_machines = add_machine
480
481 async def add_relation(self, relation1, relation2):
482 """Add a relation between two applications.
483
484 :param str relation1: '<application>[:<relation_name>]'
485 :param str relation2: '<application>[:<relation_name>]'
486
487 """
488 app_facade = client.ApplicationFacade()
489 app_facade.connect(self.connection)
490
491 log.debug(
492 'Adding relation %s <-> %s', relation1, relation2)
493
494 return await app_facade.AddRelation([relation1, relation2])
495
496 def add_space(self, name, *cidrs):
497 """Add a new network space.
498
499 Adds a new space with the given name and associates the given
500 (optional) list of existing subnet CIDRs with it.
501
502 :param str name: Name of the space
503 :param \*cidrs: Optional list of existing subnet CIDRs
504
505 """
506 pass
507
508 def add_ssh_key(self, key):
509 """Add a public SSH key to this model.
510
511 :param str key: The public ssh key
512
513 """
514 pass
515 add_ssh_keys = add_ssh_key
516
517 def add_subnet(self, cidr_or_id, space, *zones):
518 """Add an existing subnet to this model.
519
520 :param str cidr_or_id: CIDR or provider ID of the existing subnet
521 :param str space: Network space with which to associate
522 :param str \*zones: Zone(s) in which the subnet resides
523
524 """
525 pass
526
527 def get_backups(self):
528 """Retrieve metadata for backups in this model.
529
530 """
531 pass
532
533 def block(self, *commands):
534 """Add a new block to this model.
535
536 :param str \*commands: The commands to block. Valid values are
537 'all-changes', 'destroy-model', 'remove-object'
538
539 """
540 pass
541
542 def get_blocks(self):
543 """List blocks for this model.
544
545 """
546 pass
547
548 def get_cached_images(self, arch=None, kind=None, series=None):
549 """Return a list of cached OS images.
550
551 :param str arch: Filter by image architecture
552 :param str kind: Filter by image kind, e.g. 'lxd'
553 :param str series: Filter by image series, e.g. 'xenial'
554
555 """
556 pass
557
558 def create_backup(self, note=None, no_download=False):
559 """Create a backup of this model.
560
561 :param str note: A note to store with the backup
562 :param bool no_download: Do not download the backup archive
563 :return str: Path to downloaded archive
564
565 """
566 pass
567
568 def create_storage_pool(self, name, provider_type, **pool_config):
569 """Create or define a storage pool.
570
571 :param str name: Name to give the storage pool
572 :param str provider_type: Pool provider type
573 :param \*\*pool_config: key/value pool configuration pairs
574
575 """
576 pass
577
578 def debug_log(
579 self, no_tail=False, exclude_module=None, include_module=None,
580 include=None, level=None, limit=0, lines=10, replay=False,
581 exclude=None):
582 """Get log messages for this model.
583
584 :param bool no_tail: Stop after returning existing log messages
585 :param list exclude_module: Do not show log messages for these logging
586 modules
587 :param list include_module: Only show log messages for these logging
588 modules
589 :param list include: Only show log messages for these entities
590 :param str level: Log level to show, valid options are 'TRACE',
591 'DEBUG', 'INFO', 'WARNING', 'ERROR,
592 :param int limit: Return this many of the most recent (possibly
593 filtered) lines are shown
594 :param int lines: Yield this many of the most recent lines, and keep
595 yielding
596 :param bool replay: Yield the entire log, and keep yielding
597 :param list exclude: Do not show log messages for these entities
598
599 """
600 pass
601
602 async def deploy(
603 self, entity_url, service_name=None, bind=None, budget=None,
604 channel=None, config=None, constraints=None, force=False,
605 num_units=1, plan=None, resources=None, series=None, storage=None,
606 to=None):
607 """Deploy a new service or bundle.
608
609 :param str entity_url: Charm or bundle url
610 :param str service_name: Name to give the service
611 :param dict bind: <charm endpoint>:<network space> pairs
612 :param dict budget: <budget name>:<limit> pairs
613 :param str channel: Charm store channel from which to retrieve
614 the charm or bundle, e.g. 'development'
615 :param dict config: Charm configuration dictionary
616 :param constraints: Service constraints
617 :type constraints: :class:`juju.Constraints`
618 :param bool force: Allow charm to be deployed to a machine running
619 an unsupported series
620 :param int num_units: Number of units to deploy
621 :param str plan: Plan under which to deploy charm
622 :param dict resources: <resource name>:<file path> pairs
623 :param str series: Series on which to deploy
624 :param dict storage: Storage constraints TODO how do these look?
625 :param str to: Placement directive, e.g.::
626
627 '23' - machine 23
628 'lxc:7' - new lxc container on machine 7
629 '24/lxc/3' - lxc container 3 or machine 24
630
631 If None, a new machine is provisioned.
632
633
634 TODO::
635
636 - entity_url must have a revision; look up latest automatically if
637 not provided by caller
638 - service_name is required; fill this in automatically if not
639 provided by caller
640 - series is required; how do we pick a default?
641
642 """
643 if constraints:
644 constraints = client.Value(**constraints)
645
646 if to:
647 placement = [
648 client.Placement(**p) for p in to
649 ]
650 else:
651 placement = []
652
653 if storage:
654 storage = {
655 k: client.Constraints(**v)
656 for k, v in storage.items()
657 }
658
659 entity_id = await self.charmstore.entityId(entity_url)
660
661 app_facade = client.ApplicationFacade()
662 client_facade = client.ClientFacade()
663 app_facade.connect(self.connection)
664 client_facade.connect(self.connection)
665
666 if 'bundle/' in entity_id:
667 handler = BundleHandler(self)
668 await handler.fetch_plan(entity_id)
669 await handler.execute_plan()
670 else:
671 log.debug(
672 'Deploying %s', entity_id)
673
674 await client_facade.AddCharm(channel, entity_id)
675 app = client.ApplicationDeploy(
676 application=service_name,
677 channel=channel,
678 charm_url=entity_id,
679 config=config,
680 constraints=constraints,
681 endpoint_bindings=bind,
682 num_units=num_units,
683 placement=placement,
684 resources=resources,
685 series=series,
686 storage=storage,
687 )
688
689 return await app_facade.Deploy([app])
690
691 def destroy(self):
692 """Terminate all machines and resources for this model.
693
694 """
695 pass
696
697 def get_backup(self, archive_id):
698 """Download a backup archive file.
699
700 :param str archive_id: The id of the archive to download
701 :return str: Path to the archive file
702
703 """
704 pass
705
706 def enable_ha(
707 self, num_controllers=0, constraints=None, series=None, to=None):
708 """Ensure sufficient controllers exist to provide redundancy.
709
710 :param int num_controllers: Number of controllers to make available
711 :param constraints: Constraints to apply to the controller machines
712 :type constraints: :class:`juju.Constraints`
713 :param str series: Series of the controller machines
714 :param list to: Placement directives for controller machines, e.g.::
715
716 '23' - machine 23
717 'lxc:7' - new lxc container on machine 7
718 '24/lxc/3' - lxc container 3 or machine 24
719
720 If None, a new machine is provisioned.
721
722 """
723 pass
724
725 def get_config(self):
726 """Return the configuration settings for this model.
727
728 """
729 pass
730
731 def get_constraints(self):
732 """Return the machine constraints for this model.
733
734 """
735 pass
736
737 def grant(self, username, acl='read'):
738 """Grant a user access to this model.
739
740 :param str username: Username
741 :param str acl: Access control ('read' or 'write')
742
743 """
744 pass
745
746 def import_ssh_key(self, identity):
747 """Add a public SSH key from a trusted indentity source to this model.
748
749 :param str identity: User identity in the form <lp|gh>:<username>
750
751 """
752 pass
753 import_ssh_keys = import_ssh_key
754
755 def get_machines(self, machine, utc=False):
756 """Return list of machines in this model.
757
758 :param str machine: Machine id, e.g. '0'
759 :param bool utc: Display time as UTC in RFC3339 format
760
761 """
762 pass
763
764 def get_shares(self):
765 """Return list of all users with access to this model.
766
767 """
768 pass
769
770 def get_spaces(self):
771 """Return list of all known spaces, including associated subnets.
772
773 """
774 pass
775
776 def get_ssh_key(self):
777 """Return known SSH keys for this model.
778
779 """
780 pass
781 get_ssh_keys = get_ssh_key
782
783 def get_storage(self, filesystem=False, volume=False):
784 """Return details of storage instances.
785
786 :param bool filesystem: Include filesystem storage
787 :param bool volume: Include volume storage
788
789 """
790 pass
791
792 def get_storage_pools(self, names=None, providers=None):
793 """Return list of storage pools.
794
795 :param list names: Only include pools with these names
796 :param list providers: Only include pools for these providers
797
798 """
799 pass
800
801 def get_subnets(self, space=None, zone=None):
802 """Return list of known subnets.
803
804 :param str space: Only include subnets in this space
805 :param str zone: Only include subnets in this zone
806
807 """
808 pass
809
810 def remove_blocks(self):
811 """Remove all blocks from this model.
812
813 """
814 pass
815
816 def remove_backup(self, backup_id):
817 """Delete a backup.
818
819 :param str backup_id: The id of the backup to remove
820
821 """
822 pass
823
824 def remove_cached_images(self, arch=None, kind=None, series=None):
825 """Remove cached OS images.
826
827 :param str arch: Architecture of the images to remove
828 :param str kind: Image kind to remove, e.g. 'lxd'
829 :param str series: Image series to remove, e.g. 'xenial'
830
831 """
832 pass
833
834 def remove_machine(self, *machine_ids):
835 """Remove a machine from this model.
836
837 :param str \*machine_ids: Ids of the machines to remove
838
839 """
840 pass
841 remove_machines = remove_machine
842
843 def remove_ssh_key(self, *keys):
844 """Remove a public SSH key(s) from this model.
845
846 :param str \*keys: Keys to remove
847
848 """
849 pass
850 remove_ssh_keys = remove_ssh_key
851
852 def restore_backup(
853 self, bootstrap=False, constraints=None, archive=None,
854 backup_id=None, upload_tools=False):
855 """Restore a backup archive to a new controller.
856
857 :param bool bootstrap: Bootstrap a new state machine
858 :param constraints: Model constraints
859 :type constraints: :class:`juju.Constraints`
860 :param str archive: Path to backup archive to restore
861 :param str backup_id: Id of backup to restore
862 :param bool upload_tools: Upload tools if bootstrapping a new machine
863
864 """
865 pass
866
867 def retry_provisioning(self):
868 """Retry provisioning for failed machines.
869
870 """
871 pass
872
873 def revoke(self, username, acl='read'):
874 """Revoke a user's access to this model.
875
876 :param str username: Username to revoke
877 :param str acl: Access control ('read' or 'write')
878
879 """
880 pass
881
882 def run(self, command, timeout=None):
883 """Run command on all machines in this model.
884
885 :param str command: The command to run
886 :param int timeout: Time to wait before command is considered failed
887
888 """
889 pass
890
891 def set_config(self, **config):
892 """Set configuration keys on this model.
893
894 :param \*\*config: Config key/values
895
896 """
897 pass
898
899 def set_constraints(self, constraints):
900 """Set machine constraints on this model.
901
902 :param :class:`juju.Constraints` constraints: Machine constraints
903
904 """
905 pass
906
907 def get_action_output(self, action_uuid, wait=-1):
908 """Get the results of an action by ID.
909
910 :param str action_uuid: Id of the action
911 :param int wait: Time in seconds to wait for action to complete
912
913 """
914 pass
915
916 def get_action_status(self, uuid_or_prefix=None, name=None):
917 """Get the status of all actions, filtered by ID, ID prefix, or action name.
918
919 :param str uuid_or_prefix: Filter by action uuid or prefix
920 :param str name: Filter by action name
921
922 """
923 pass
924
925 def get_budget(self, budget_name):
926 """Get budget usage info.
927
928 :param str budget_name: Name of budget
929
930 """
931 pass
932
933 def get_status(self, filter_=None, utc=False):
934 """Return the status of the model.
935
936 :param str filter_: Service or unit name or wildcard ('*')
937 :param bool utc: Display time as UTC in RFC3339 format
938
939 """
940 pass
941 status = get_status
942
943 def sync_tools(
944 self, all_=False, destination=None, dry_run=False, public=False,
945 source=None, stream=None, version=None):
946 """Copy Juju tools into this model.
947
948 :param bool all_: Copy all versions, not just the latest
949 :param str destination: Path to local destination directory
950 :param bool dry_run: Don't do the actual copy
951 :param bool public: Tools are for a public cloud, so generate mirrors
952 information
953 :param str source: Path to local source directory
954 :param str stream: Simplestreams stream for which to sync metadata
955 :param str version: Copy a specific major.minor version
956
957 """
958 pass
959
960 def unblock(self, *commands):
961 """Unblock an operation that would alter this model.
962
963 :param str \*commands: The commands to unblock. Valid values are
964 'all-changes', 'destroy-model', 'remove-object'
965
966 """
967 pass
968
969 def unset_config(self, *keys):
970 """Unset configuration on this model.
971
972 :param str \*keys: The keys to unset
973
974 """
975 pass
976
977 def upgrade_gui(self):
978 """Upgrade the Juju GUI for this model.
979
980 """
981 pass
982
983 def upgrade_juju(
984 self, dry_run=False, reset_previous_upgrade=False,
985 upload_tools=False, version=None):
986 """Upgrade Juju on all machines in a model.
987
988 :param bool dry_run: Don't do the actual upgrade
989 :param bool reset_previous_upgrade: Clear the previous (incomplete)
990 upgrade status
991 :param bool upload_tools: Upload local version of tools
992 :param str version: Upgrade to a specific version
993
994 """
995 pass
996
997 def upload_backup(self, archive_path):
998 """Store a backup archive remotely in Juju.
999
1000 :param str archive_path: Path to local archive
1001
1002 """
1003 pass
1004
1005 @property
1006 def charmstore(self):
1007 return self._charmstore
1008
1009
1010 class BundleHandler(object):
1011 """
1012 Handle bundles by using the API to translate bundle YAML into a plan of
1013 steps and then dispatching each of those using the API.
1014 """
1015 def __init__(self, model):
1016 self.model = model
1017 self.charmstore = model.charmstore
1018 self.plan = []
1019 self.references = {}
1020 self._units_by_app = {}
1021 for unit_name, unit in model.units.items():
1022 app_units = self._units_by_app.setdefault(unit.application, [])
1023 app_units.append(unit_name)
1024 self.client_facade = client.ClientFacade()
1025 self.client_facade.connect(model.connection)
1026 self.app_facade = client.ApplicationFacade()
1027 self.app_facade.connect(model.connection)
1028 self.ann_facade = client.AnnotationsFacade()
1029 self.ann_facade.connect(model.connection)
1030
1031 async def fetch_plan(self, entity_id):
1032 yaml = await self.charmstore.files(entity_id,
1033 filename='bundle.yaml',
1034 read_file=True)
1035 self.plan = await self.client_facade.GetBundleChanges(yaml)
1036
1037 async def execute_plan(self):
1038 for step in self.plan.changes:
1039 method = getattr(self, step.method)
1040 result = await method(*step.args)
1041 self.references[step.id_] = result
1042
1043 def resolve(self, reference):
1044 if reference and reference.startswith('$'):
1045 reference = self.references[reference[1:]]
1046 return reference
1047
1048 async def addCharm(self, charm, series):
1049 """
1050 :param charm string:
1051 Charm holds the URL of the charm to be added.
1052
1053 :param series string:
1054 Series holds the series of the charm to be added
1055 if the charm default is not sufficient.
1056 """
1057 entity_id = await self.charmstore.entityId(charm)
1058 log.debug('Adding %s', entity_id)
1059 await self.client_facade.AddCharm(None, entity_id)
1060 return entity_id
1061
1062 async def addMachines(self, series, constraints, container_type,
1063 parent_id):
1064 """
1065 :param series string:
1066 Series holds the optional machine OS series.
1067
1068 :param constraints string:
1069 Constraints holds the optional machine constraints.
1070
1071 :param Container_type string:
1072 ContainerType optionally holds the type of the container (for
1073 instance ""lxc" or kvm"). It is not specified for top level
1074 machines.
1075
1076 :param parent_id string:
1077 ParentId optionally holds a placeholder pointing to another machine
1078 change or to a unit change. This value is only specified in the
1079 case this machine is a container, in which case also ContainerType
1080 is set.
1081 """
1082 params = client.AddMachineParams(
1083 series=series,
1084 constraints=constraints,
1085 container_type=container_type,
1086 parent_id=self.resolve(parent_id),
1087 )
1088 results = await self.client_facade.AddMachines(params)
1089 log.debug('Added new machine %s', results[0].machine)
1090 return results[0].machine
1091
1092 async def addRelation(self, endpoint1, endpoint2):
1093 """
1094 :param endpoint1 string:
1095 :param endpoint2 string:
1096 Endpoint1 and Endpoint2 hold relation endpoints in the
1097 "application:interface" form, where the application is always a
1098 placeholder pointing to an application change, and the interface is
1099 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1100 """
1101 endpoints = [endpoint1, endpoint2]
1102 # resolve indirect references
1103 for i in range(len(endpoints)):
1104 parts = endpoints[i].split(':')
1105 parts[0] = self.resolve(parts[0])
1106 endpoints[i] = ':'.join(parts)
1107 try:
1108 await self.app_facade.AddRelation(endpoints)
1109 log.debug('Added relation %s <-> %s', *endpoints)
1110 except JujuAPIError as e:
1111 if 'relation already exists' not in e.message:
1112 raise
1113 log.debug('Relation %s <-> %s already exists', *endpoints)
1114 return None
1115
1116 async def deploy(self, charm, series, application, options, constraints,
1117 storage, endpoint_bindings, resources):
1118 """
1119 :param charm string:
1120 Charm holds the URL of the charm to be used to deploy this
1121 application.
1122
1123 :param series string:
1124 Series holds the series of the application to be deployed
1125 if the charm default is not sufficient.
1126
1127 :param application string:
1128 Application holds the application name.
1129
1130 :param options map[string]interface{}:
1131 Options holds application options.
1132
1133 :param constraints string:
1134 Constraints holds the optional application constraints.
1135
1136 :param storage map[string]string:
1137 Storage holds the optional storage constraints.
1138
1139 :param endpoint_bindings map[string]string:
1140 EndpointBindings holds the optional endpoint bindings
1141
1142 :param resources map[string]int:
1143 Resources identifies the revision to use for each resource
1144 of the application's charm.
1145 """
1146 # resolve indirect references
1147 charm = self.resolve(charm)
1148 # stringify all config values for API
1149 options = {k: str(v) for k, v in options.items()}
1150 # build param object
1151 app = client.ApplicationDeploy(
1152 charm_url=charm,
1153 series=series,
1154 application=application,
1155 config=options,
1156 constraints=constraints,
1157 storage=storage,
1158 endpoint_bindings=endpoint_bindings,
1159 resources=resources,
1160 )
1161 # do the do
1162 log.debug('Deploying %s', charm)
1163 await self.app_facade.Deploy([app])
1164 return application
1165
1166 async def addUnit(self, application, to):
1167 """
1168 :param application string:
1169 Application holds the application placeholder name for which a unit
1170 is added.
1171
1172 :param to string:
1173 To holds the optional location where to add the unit, as a
1174 placeholder pointing to another unit change or to a machine change.
1175 """
1176 application = self.resolve(application)
1177 placement = self.resolve(to)
1178 if self._units_by_app.get(application):
1179 # enough units for this application already exist;
1180 # claim one, and carry on
1181 # NB: this should probably honor placement, but the juju client
1182 # doesn't, so we're not bothering, either
1183 unit_name = self._units_by_app[application].pop()
1184 log.debug('Reusing unit %s for %s', unit_name, application)
1185 return unit_name
1186 log.debug('Adding unit of %s%s',
1187 application,
1188 (' to %s' % placement) if placement else '')
1189 result = await self.app_facade.AddUnits(
1190 application=application,
1191 placement=placement,
1192 num_units=1,
1193 )
1194 return result.units[0]
1195
1196 async def expose(self, application):
1197 """
1198 :param application string:
1199 Application holds the placeholder name of the application that must
1200 be exposed.
1201 """
1202 application = self.resolve(application)
1203 log.debug('Exposing %s', application)
1204 await self.app_facade.Expose(application)
1205 return None
1206
1207 async def setAnnotations(self, id_, entity_type, annotations):
1208 """
1209 :param id_ string:
1210 Id is the placeholder for the application or machine change
1211 corresponding to the entity to be annotated.
1212
1213 :param entity_type EntityType:
1214 EntityType holds the type of the entity, "application" or
1215 "machine".
1216
1217 :param annotations map[string]string:
1218 Annotations holds the annotations as key/value pairs.
1219 """
1220 entity_id = self.resolve(id_)
1221 log.debug('Updating annotations of %s', entity_id)
1222 ann = client.EntityAnnotations(
1223 entity=entity_id,
1224 annotations=annotations,
1225 )
1226 await self.ann_facade.Set([ann])
1227 return None
1228
1229
1230 class CharmStore(object):
1231 """
1232 Async wrapper around theblues.charmstore.CharmStore
1233 """
1234 def __init__(self, loop):
1235 self.loop = loop
1236 self._cs = charmstore.CharmStore()
1237
1238 def __getattr__(self, name):
1239 """
1240 Wrap method calls in coroutines that use run_in_executor to make them
1241 async.
1242 """
1243 attr = getattr(self._cs, name)
1244 if not callable(attr):
1245 wrapper = partial(getattr, self._cs, name)
1246 setattr(self, name, wrapper)
1247 else:
1248 async def coro(*args, **kwargs):
1249 method = partial(attr, *args, **kwargs)
1250 return await self.loop.run_in_executor(None, method)
1251 setattr(self, name, coro)
1252 wrapper = coro
1253 return wrapper