Return app ModelEntities from bundle deploy
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import collections
3 import logging
4 import re
5 import weakref
6 from concurrent.futures import CancelledError
7 from functools import partial
8
9 import yaml
10 from theblues import charmstore
11
12 from .client import client
13 from .client import watcher
14 from .client import connection
15 from .delta import get_entity_delta
16 from .delta import get_entity_class
17 from .exceptions import DeadEntityException
18 from .errors import JujuAPIError
19
20 log = logging.getLogger(__name__)
21
22
23 class _Observer(object):
24 """Wrapper around an observer callable.
25
26 This wrapper allows filter criteria to be associated with the
27 callable so that it's only called for changes that meet the criteria.
28
29 """
30 def __init__(self, callable_, entity_type, action, entity_id, predicate):
31 self.callable_ = callable_
32 self.entity_type = entity_type
33 self.action = action
34 self.entity_id = entity_id
35 self.predicate = predicate
36 if self.entity_id:
37 self.entity_id = str(self.entity_id)
38 if not self.entity_id.startswith('^'):
39 self.entity_id = '^' + self.entity_id
40 if not self.entity_id.endswith('$'):
41 self.entity_id += '$'
42
43 async def __call__(self, delta, old, new, model):
44 await self.callable_(delta, old, new, model)
45
46 def cares_about(self, delta):
47 """Return True if this observer "cares about" (i.e. wants to be
48 called) for a this delta.
49
50 """
51 if (self.entity_id and delta.get_id() and
52 not re.match(self.entity_id, str(delta.get_id()))):
53 return False
54
55 if self.entity_type and self.entity_type != delta.entity:
56 return False
57
58 if self.action and self.action != delta.type:
59 return False
60
61 if self.predicate and not self.predicate(delta):
62 return False
63
64 return True
65
66
67 class ModelObserver(object):
68 async def __call__(self, delta, old, new, model):
69 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
70 method = getattr(self, handler_name, self.on_change)
71 await method(delta, old, new, model)
72
73 async def on_change(self, delta, old, new, model):
74 pass
75
76
77 class ModelState(object):
78 """Holds the state of the model, including the delta history of all
79 entities in the model.
80
81 """
82 def __init__(self, model):
83 self.model = model
84 self.state = dict()
85
86 def _live_entity_map(self, entity_type):
87 """Return an id:Entity map of all the living entities of
88 type ``entity_type``.
89
90 """
91 return {
92 entity_id: self.get_entity(entity_type, entity_id)
93 for entity_id, history in self.state.get(entity_type, {}).items()
94 if history[-1] is not None
95 }
96
97 @property
98 def applications(self):
99 """Return a map of application-name:Application for all applications
100 currently in the model.
101
102 """
103 return self._live_entity_map('application')
104
105 @property
106 def machines(self):
107 """Return a map of machine-id:Machine for all machines currently in
108 the model.
109
110 """
111 return self._live_entity_map('machine')
112
113 @property
114 def units(self):
115 """Return a map of unit-id:Unit for all units currently in
116 the model.
117
118 """
119 return self._live_entity_map('unit')
120
121 def entity_history(self, entity_type, entity_id):
122 """Return the history deque for an entity.
123
124 """
125 return self.state[entity_type][entity_id]
126
127 def entity_data(self, entity_type, entity_id, history_index):
128 """Return the data dict for an entity at a specific index of its
129 history.
130
131 """
132 return self.entity_history(entity_type, entity_id)[history_index]
133
134 def apply_delta(self, delta):
135 """Apply delta to our state and return a copy of the
136 affected object as it was before and after the update, e.g.:
137
138 old_obj, new_obj = self.apply_delta(delta)
139
140 old_obj may be None if the delta is for the creation of a new object,
141 e.g. a new application or unit is deployed.
142
143 new_obj will never be None, but may be dead (new_obj.dead == True)
144 if the object was deleted as a result of the delta being applied.
145
146 """
147 history = (
148 self.state
149 .setdefault(delta.entity, {})
150 .setdefault(delta.get_id(), collections.deque())
151 )
152
153 history.append(delta.data)
154 if delta.type == 'remove':
155 history.append(None)
156
157 entity = self.get_entity(delta.entity, delta.get_id())
158 return entity.previous(), entity
159
160 def get_entity(
161 self, entity_type, entity_id, history_index=-1, connected=True):
162 """Return an object instance representing the entity created or
163 updated by ``delta``
164
165 """
166 """
167 log.debug(
168 'Getting %s:%s at index %s',
169 entity_type, entity_id, history_index)
170 """
171
172 if history_index < 0 and history_index != -1:
173 history_index += len(self.entity_history(entity_type, entity_id))
174 if history_index < 0:
175 return None
176
177 try:
178 self.entity_data(entity_type, entity_id, history_index)
179 except IndexError:
180 return None
181
182 entity_class = get_entity_class(entity_type)
183 return entity_class(
184 entity_id, self.model, history_index=history_index,
185 connected=connected)
186
187
188 class ModelEntity(object):
189 """An object in the Model tree"""
190
191 def __init__(self, entity_id, model, history_index=-1, connected=True):
192 """Initialize a new entity
193
194 :param entity_id str: The unique id of the object in the model
195 :param model: The model instance in whose object tree this
196 entity resides
197 :history_index int: The index of this object's state in the model's
198 history deque for this entity
199 :connected bool: Flag indicating whether this object gets live updates
200 from the model.
201
202 """
203 self.entity_id = entity_id
204 self.model = model
205 self._history_index = history_index
206 self.connected = connected
207 self.connection = model.connection
208
209 def __getattr__(self, name):
210 """Fetch object attributes from the underlying data dict held in the
211 model.
212
213 """
214 if self.data is None:
215 raise DeadEntityException(
216 "Entity {}:{} is dead - its attributes can no longer be "
217 "accessed. Use the .previous() method on this object to get "
218 "a copy of the object at its previous state.".format(
219 self.entity_type, self.entity_id))
220 return self.data[name]
221
222 def __bool__(self):
223 return bool(self.data)
224
225 def on_change(self, callable_):
226 """Add a change observer to this entity.
227
228 """
229 self.model.add_observer(
230 callable_, self.entity_type, 'change', self.entity_id)
231
232 def on_remove(self, callable_):
233 """Add a remove observer to this entity.
234
235 """
236 self.model.add_observer(
237 callable_, self.entity_type, 'remove', self.entity_id)
238
239 @property
240 def entity_type(self):
241 """A string identifying the entity type of this object, e.g.
242 'application' or 'unit', etc.
243
244 """
245 return self.__class__.__name__.lower()
246
247 @property
248 def current(self):
249 """Return True if this object represents the current state of the
250 entity in the underlying model.
251
252 This will be True except when the object represents an entity at a
253 non-latest state in history, e.g. if the object was obtained by calling
254 .previous() on another object.
255
256 """
257 return self._history_index == -1
258
259 @property
260 def dead(self):
261 """Returns True if this entity no longer exists in the underlying
262 model.
263
264 """
265 return (
266 self.data is None or
267 self.model.state.entity_data(
268 self.entity_type, self.entity_id, -1) is None
269 )
270
271 @property
272 def alive(self):
273 """Returns True if this entity still exists in the underlying
274 model.
275
276 """
277 return not self.dead
278
279 @property
280 def data(self):
281 """The data dictionary for this entity.
282
283 """
284 return self.model.state.entity_data(
285 self.entity_type, self.entity_id, self._history_index)
286
287 def previous(self):
288 """Return a copy of this object as was at its previous state in
289 history.
290
291 Returns None if this object is new (and therefore has no history).
292
293 The returned object is always "disconnected", i.e. does not receive
294 live updates.
295
296 """
297 return self.model.state.get_entity(
298 self.entity_type, self.entity_id, self._history_index - 1,
299 connected=False)
300
301 def next(self):
302 """Return a copy of this object at its next state in
303 history.
304
305 Returns None if this object is already the latest.
306
307 The returned object is "disconnected", i.e. does not receive
308 live updates, unless it is current (latest).
309
310 """
311 if self._history_index == -1:
312 return None
313
314 new_index = self._history_index + 1
315 connected = (
316 new_index == len(self.model.state.entity_history(
317 self.entity_type, self.entity_id)) - 1
318 )
319 return self.model.state.get_entity(
320 self.entity_type, self.entity_id, self._history_index - 1,
321 connected=connected)
322
323 def latest(self):
324 """Return a copy of this object at its current state in the model.
325
326 Returns self if this object is already the latest.
327
328 The returned object is always "connected", i.e. receives
329 live updates from the model.
330
331 """
332 if self._history_index == -1:
333 return self
334
335 return self.model.state.get_entity(self.entity_type, self.entity_id)
336
337
338 class Model(object):
339 def __init__(self, loop=None):
340 """Instantiate a new connected Model.
341
342 :param loop: an asyncio event loop
343
344 """
345 self.loop = loop or asyncio.get_event_loop()
346 self.connection = None
347 self.observers = weakref.WeakValueDictionary()
348 self.state = ModelState(self)
349 self._watcher_task = None
350 self._watch_shutdown = asyncio.Event(loop=loop)
351 self._watch_received = asyncio.Event(loop=loop)
352 self._charmstore = CharmStore(self.loop)
353
354 async def connect_current(self):
355 """Connect to the current Juju model.
356
357 """
358 self.connection = await connection.Connection.connect_current()
359 self._watch()
360 await self._watch_received.wait()
361
362 async def disconnect(self):
363 """Shut down the watcher task and close websockets.
364
365 """
366 self._stop_watching()
367 if self.connection and self.connection.is_open:
368 await self._watch_shutdown.wait()
369 log.debug('Closing model connection')
370 await self.connection.close()
371 self.connection = None
372
373 def all_units_idle(self):
374 """Return True if all units are idle.
375
376 """
377 for unit in self.units.values():
378 unit_status = unit.data['agent-status']['current']
379 if unit_status != 'idle':
380 return False
381 return True
382
383 async def reset(self, force=False):
384 """Reset the model to a clean state.
385
386 :param bool force: Force-terminate machines.
387
388 This returns only after the model has reached a clean state. "Clean"
389 means no applications or machines exist in the model.
390
391 """
392 log.debug('Resetting model')
393 for app in self.applications.values():
394 await app.destroy()
395 for machine in self.machines.values():
396 await machine.destroy(force=force)
397 await self.block_until(
398 lambda: len(self.machines) == 0
399 )
400
401 async def block_until(self, *conditions, timeout=None):
402 """Return only after all conditions are true.
403
404 """
405 async def _block():
406 while not all(c() for c in conditions):
407 await asyncio.sleep(0)
408 await asyncio.wait_for(_block(), timeout)
409
410 @property
411 def applications(self):
412 """Return a map of application-name:Application for all applications
413 currently in the model.
414
415 """
416 return self.state.applications
417
418 @property
419 def machines(self):
420 """Return a map of machine-id:Machine for all machines currently in
421 the model.
422
423 """
424 return self.state.machines
425
426 @property
427 def units(self):
428 """Return a map of unit-id:Unit for all units currently in
429 the model.
430
431 """
432 return self.state.units
433
434 def add_observer(
435 self, callable_, entity_type=None, action=None, entity_id=None,
436 predicate=None):
437 """Register an "on-model-change" callback
438
439 Once the model is connected, ``callable_``
440 will be called each time the model changes. callable_ should
441 be Awaitable and accept the following positional arguments:
442
443 delta - An instance of :class:`juju.delta.EntityDelta`
444 containing the raw delta data recv'd from the Juju
445 websocket.
446
447 old_obj - If the delta modifies an existing object in the model,
448 old_obj will be a copy of that object, as it was before the
449 delta was applied. Will be None if the delta creates a new
450 entity in the model.
451
452 new_obj - A copy of the new or updated object, after the delta
453 is applied. Will be None if the delta removes an entity
454 from the model.
455
456 model - The :class:`Model` itself.
457
458 Events for which ``callable_`` is called can be specified by passing
459 entity_type, action, and/or id_ filter criteria, e.g.:
460
461 add_observer(
462 myfunc, entity_type='application', action='add', id_='ubuntu')
463
464 For more complex filtering conditions, pass a predicate function. It
465 will be called with a delta as its only argument. If the predicate
466 function returns True, the callable_ will be called.
467
468 """
469 observer = _Observer(
470 callable_, entity_type, action, entity_id, predicate)
471 self.observers[observer] = callable_
472
473 def _watch(self):
474 """Start an asynchronous watch against this model.
475
476 See :meth:`add_observer` to register an onchange callback.
477
478 """
479 async def _start_watch():
480 self._watch_shutdown.clear()
481 try:
482 allwatcher = watcher.AllWatcher()
483 self._watch_conn = await self.connection.clone()
484 allwatcher.connect(self._watch_conn)
485 while True:
486 results = await allwatcher.Next()
487 for delta in results.deltas:
488 delta = get_entity_delta(delta)
489 old_obj, new_obj = self.state.apply_delta(delta)
490 # XXX: Might not want to shield at this level
491 # We are shielding because when the watcher is
492 # canceled (on disconnect()), we don't want all of
493 # its children (every observer callback) to be
494 # canceled with it. So we shield them. But this means
495 # they can *never* be canceled.
496 await asyncio.shield(
497 self._notify_observers(delta, old_obj, new_obj))
498 self._watch_received.set()
499 except CancelledError:
500 log.debug('Closing watcher connection')
501 await self._watch_conn.close()
502 self._watch_shutdown.set()
503 self._watch_conn = None
504
505 log.debug('Starting watcher task')
506 self._watcher_task = self.loop.create_task(_start_watch())
507
508 def _stop_watching(self):
509 """Stop the asynchronous watch against this model.
510
511 """
512 log.debug('Stopping watcher task')
513 if self._watcher_task:
514 self._watcher_task.cancel()
515
516 async def _notify_observers(self, delta, old_obj, new_obj):
517 """Call observing callbacks, notifying them of a change in model state
518
519 :param delta: The raw change from the watcher
520 (:class:`juju.client.overrides.Delta`)
521 :param old_obj: The object in the model that this delta updates.
522 May be None.
523 :param new_obj: The object in the model that is created or updated
524 by applying this delta.
525
526 """
527 if new_obj and not old_obj:
528 delta.type = 'add'
529
530 log.debug(
531 'Model changed: %s %s %s',
532 delta.entity, delta.type, delta.get_id())
533
534 for o in self.observers:
535 if o.cares_about(delta):
536 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
537
538 async def _wait_for_new(self, entity_type, entity_id, predicate=None):
539 """Wait for a new object to appear in the Model and return it.
540
541 Waits for an object of type ``entity_type`` with id ``entity_id``.
542
543 This coroutine blocks until the new object appears in the model.
544
545 """
546 entity_added = asyncio.Queue(loop=self.loop)
547
548 async def callback(delta, old, new, model):
549 await entity_added.put(delta.get_id())
550
551 self.add_observer(callback, entity_type, 'add', entity_id, predicate)
552 entity_id = await entity_added.get()
553 return self.state._live_entity_map(entity_type)[entity_id]
554
555 def add_machine(
556 self, spec=None, constraints=None, disks=None, series=None,
557 count=1):
558 """Start a new, empty machine and optionally a container, or add a
559 container to a machine.
560
561 :param str spec: Machine specification
562 Examples::
563
564 (None) - starts a new machine
565 'lxc' - starts a new machine with on lxc container
566 'lxc:4' - starts a new lxc container on machine 4
567 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
568 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
569 'maas2.name' - acquire machine maas2.name on MAAS
570 :param constraints: Machine constraints
571 :type constraints: :class:`juju.Constraints`
572 :param list disks: List of disk :class:`constraints <juju.Constraints>`
573 :param str series: Series
574 :param int count: Number of machines to deploy
575
576 Supported container types are: lxc, lxd, kvm
577
578 When deploying a container to an existing machine, constraints cannot
579 be used.
580
581 """
582 pass
583 add_machines = add_machine
584
585 async def add_relation(self, relation1, relation2):
586 """Add a relation between two applications.
587
588 :param str relation1: '<application>[:<relation_name>]'
589 :param str relation2: '<application>[:<relation_name>]'
590
591 """
592 app_facade = client.ApplicationFacade()
593 app_facade.connect(self.connection)
594
595 log.debug(
596 'Adding relation %s <-> %s', relation1, relation2)
597
598 try:
599 result = await app_facade.AddRelation([relation1, relation2])
600 except JujuAPIError as e:
601 if 'relation already exists' not in e.message:
602 raise
603 log.debug(
604 'Relation %s <-> %s already exists', relation1, relation2)
605 # TODO: if relation already exists we should return the
606 # Relation ModelEntity here
607 return None
608
609 def predicate(delta):
610 endpoints = {}
611 for endpoint in delta.data['endpoints']:
612 endpoints[endpoint['application-name']] = endpoint['relation']
613 return endpoints == result.endpoints
614
615 return await self._wait_for_new('relation', None, predicate)
616
617 def add_space(self, name, *cidrs):
618 """Add a new network space.
619
620 Adds a new space with the given name and associates the given
621 (optional) list of existing subnet CIDRs with it.
622
623 :param str name: Name of the space
624 :param \*cidrs: Optional list of existing subnet CIDRs
625
626 """
627 pass
628
629 def add_ssh_key(self, key):
630 """Add a public SSH key to this model.
631
632 :param str key: The public ssh key
633
634 """
635 pass
636 add_ssh_keys = add_ssh_key
637
638 def add_subnet(self, cidr_or_id, space, *zones):
639 """Add an existing subnet to this model.
640
641 :param str cidr_or_id: CIDR or provider ID of the existing subnet
642 :param str space: Network space with which to associate
643 :param str \*zones: Zone(s) in which the subnet resides
644
645 """
646 pass
647
648 def get_backups(self):
649 """Retrieve metadata for backups in this model.
650
651 """
652 pass
653
654 def block(self, *commands):
655 """Add a new block to this model.
656
657 :param str \*commands: The commands to block. Valid values are
658 'all-changes', 'destroy-model', 'remove-object'
659
660 """
661 pass
662
663 def get_blocks(self):
664 """List blocks for this model.
665
666 """
667 pass
668
669 def get_cached_images(self, arch=None, kind=None, series=None):
670 """Return a list of cached OS images.
671
672 :param str arch: Filter by image architecture
673 :param str kind: Filter by image kind, e.g. 'lxd'
674 :param str series: Filter by image series, e.g. 'xenial'
675
676 """
677 pass
678
679 def create_backup(self, note=None, no_download=False):
680 """Create a backup of this model.
681
682 :param str note: A note to store with the backup
683 :param bool no_download: Do not download the backup archive
684 :return str: Path to downloaded archive
685
686 """
687 pass
688
689 def create_storage_pool(self, name, provider_type, **pool_config):
690 """Create or define a storage pool.
691
692 :param str name: Name to give the storage pool
693 :param str provider_type: Pool provider type
694 :param \*\*pool_config: key/value pool configuration pairs
695
696 """
697 pass
698
699 def debug_log(
700 self, no_tail=False, exclude_module=None, include_module=None,
701 include=None, level=None, limit=0, lines=10, replay=False,
702 exclude=None):
703 """Get log messages for this model.
704
705 :param bool no_tail: Stop after returning existing log messages
706 :param list exclude_module: Do not show log messages for these logging
707 modules
708 :param list include_module: Only show log messages for these logging
709 modules
710 :param list include: Only show log messages for these entities
711 :param str level: Log level to show, valid options are 'TRACE',
712 'DEBUG', 'INFO', 'WARNING', 'ERROR,
713 :param int limit: Return this many of the most recent (possibly
714 filtered) lines are shown
715 :param int lines: Yield this many of the most recent lines, and keep
716 yielding
717 :param bool replay: Yield the entire log, and keep yielding
718 :param list exclude: Do not show log messages for these entities
719
720 """
721 pass
722
723 async def deploy(
724 self, entity_url, service_name=None, bind=None, budget=None,
725 channel=None, config=None, constraints=None, force=False,
726 num_units=1, plan=None, resources=None, series=None, storage=None,
727 to=None):
728 """Deploy a new service or bundle.
729
730 :param str entity_url: Charm or bundle url
731 :param str service_name: Name to give the service
732 :param dict bind: <charm endpoint>:<network space> pairs
733 :param dict budget: <budget name>:<limit> pairs
734 :param str channel: Charm store channel from which to retrieve
735 the charm or bundle, e.g. 'development'
736 :param dict config: Charm configuration dictionary
737 :param constraints: Service constraints
738 :type constraints: :class:`juju.Constraints`
739 :param bool force: Allow charm to be deployed to a machine running
740 an unsupported series
741 :param int num_units: Number of units to deploy
742 :param str plan: Plan under which to deploy charm
743 :param dict resources: <resource name>:<file path> pairs
744 :param str series: Series on which to deploy
745 :param dict storage: Storage constraints TODO how do these look?
746 :param str to: Placement directive, e.g.::
747
748 '23' - machine 23
749 'lxc:7' - new lxc container on machine 7
750 '24/lxc/3' - lxc container 3 or machine 24
751
752 If None, a new machine is provisioned.
753
754
755 TODO::
756
757 - service_name is required; fill this in automatically if not
758 provided by caller
759 - series is required; how do we pick a default?
760
761 """
762 if constraints:
763 constraints = client.Value(**constraints)
764
765 if to:
766 placement = [
767 client.Placement(**p) for p in to
768 ]
769 else:
770 placement = []
771
772 if storage:
773 storage = {
774 k: client.Constraints(**v)
775 for k, v in storage.items()
776 }
777
778 entity_id = await self.charmstore.entityId(entity_url)
779
780 app_facade = client.ApplicationFacade()
781 client_facade = client.ClientFacade()
782 app_facade.connect(self.connection)
783 client_facade.connect(self.connection)
784
785 if 'bundle/' in entity_id:
786 handler = BundleHandler(self)
787 await handler.fetch_plan(entity_id)
788 await handler.execute_plan()
789 extant_apps = {app for app in self.applications}
790 pending_apps = set(handler.applications) - extant_apps
791 if pending_apps:
792 # new apps will usually be in the model by now, but if some
793 # haven't made it yet we'll need to wait on them to be added
794 await asyncio.wait([self._wait_for_new('application', app_name)
795 for app_name in pending_apps])
796 return [app for name, app in self.applications.items()
797 if name in handler.applications]
798 else:
799 log.debug(
800 'Deploying %s', entity_id)
801
802 await client_facade.AddCharm(channel, entity_id)
803 app = client.ApplicationDeploy(
804 application=service_name,
805 channel=channel,
806 charm_url=entity_id,
807 config=config,
808 constraints=constraints,
809 endpoint_bindings=bind,
810 num_units=num_units,
811 placement=placement,
812 resources=resources,
813 series=series,
814 storage=storage,
815 )
816
817 await app_facade.Deploy([app])
818 return [await self._wait_for_new('application', service_name)]
819
820 def destroy(self):
821 """Terminate all machines and resources for this model.
822
823 """
824 pass
825
826 def get_backup(self, archive_id):
827 """Download a backup archive file.
828
829 :param str archive_id: The id of the archive to download
830 :return str: Path to the archive file
831
832 """
833 pass
834
835 def enable_ha(
836 self, num_controllers=0, constraints=None, series=None, to=None):
837 """Ensure sufficient controllers exist to provide redundancy.
838
839 :param int num_controllers: Number of controllers to make available
840 :param constraints: Constraints to apply to the controller machines
841 :type constraints: :class:`juju.Constraints`
842 :param str series: Series of the controller machines
843 :param list to: Placement directives for controller machines, e.g.::
844
845 '23' - machine 23
846 'lxc:7' - new lxc container on machine 7
847 '24/lxc/3' - lxc container 3 or machine 24
848
849 If None, a new machine is provisioned.
850
851 """
852 pass
853
854 def get_config(self):
855 """Return the configuration settings for this model.
856
857 """
858 pass
859
860 def get_constraints(self):
861 """Return the machine constraints for this model.
862
863 """
864 pass
865
866 def grant(self, username, acl='read'):
867 """Grant a user access to this model.
868
869 :param str username: Username
870 :param str acl: Access control ('read' or 'write')
871
872 """
873 pass
874
875 def import_ssh_key(self, identity):
876 """Add a public SSH key from a trusted indentity source to this model.
877
878 :param str identity: User identity in the form <lp|gh>:<username>
879
880 """
881 pass
882 import_ssh_keys = import_ssh_key
883
884 def get_machines(self, machine, utc=False):
885 """Return list of machines in this model.
886
887 :param str machine: Machine id, e.g. '0'
888 :param bool utc: Display time as UTC in RFC3339 format
889
890 """
891 pass
892
893 def get_shares(self):
894 """Return list of all users with access to this model.
895
896 """
897 pass
898
899 def get_spaces(self):
900 """Return list of all known spaces, including associated subnets.
901
902 """
903 pass
904
905 def get_ssh_key(self):
906 """Return known SSH keys for this model.
907
908 """
909 pass
910 get_ssh_keys = get_ssh_key
911
912 def get_storage(self, filesystem=False, volume=False):
913 """Return details of storage instances.
914
915 :param bool filesystem: Include filesystem storage
916 :param bool volume: Include volume storage
917
918 """
919 pass
920
921 def get_storage_pools(self, names=None, providers=None):
922 """Return list of storage pools.
923
924 :param list names: Only include pools with these names
925 :param list providers: Only include pools for these providers
926
927 """
928 pass
929
930 def get_subnets(self, space=None, zone=None):
931 """Return list of known subnets.
932
933 :param str space: Only include subnets in this space
934 :param str zone: Only include subnets in this zone
935
936 """
937 pass
938
939 def remove_blocks(self):
940 """Remove all blocks from this model.
941
942 """
943 pass
944
945 def remove_backup(self, backup_id):
946 """Delete a backup.
947
948 :param str backup_id: The id of the backup to remove
949
950 """
951 pass
952
953 def remove_cached_images(self, arch=None, kind=None, series=None):
954 """Remove cached OS images.
955
956 :param str arch: Architecture of the images to remove
957 :param str kind: Image kind to remove, e.g. 'lxd'
958 :param str series: Image series to remove, e.g. 'xenial'
959
960 """
961 pass
962
963 def remove_machine(self, *machine_ids):
964 """Remove a machine from this model.
965
966 :param str \*machine_ids: Ids of the machines to remove
967
968 """
969 pass
970 remove_machines = remove_machine
971
972 def remove_ssh_key(self, *keys):
973 """Remove a public SSH key(s) from this model.
974
975 :param str \*keys: Keys to remove
976
977 """
978 pass
979 remove_ssh_keys = remove_ssh_key
980
981 def restore_backup(
982 self, bootstrap=False, constraints=None, archive=None,
983 backup_id=None, upload_tools=False):
984 """Restore a backup archive to a new controller.
985
986 :param bool bootstrap: Bootstrap a new state machine
987 :param constraints: Model constraints
988 :type constraints: :class:`juju.Constraints`
989 :param str archive: Path to backup archive to restore
990 :param str backup_id: Id of backup to restore
991 :param bool upload_tools: Upload tools if bootstrapping a new machine
992
993 """
994 pass
995
996 def retry_provisioning(self):
997 """Retry provisioning for failed machines.
998
999 """
1000 pass
1001
1002 def revoke(self, username, acl='read'):
1003 """Revoke a user's access to this model.
1004
1005 :param str username: Username to revoke
1006 :param str acl: Access control ('read' or 'write')
1007
1008 """
1009 pass
1010
1011 def run(self, command, timeout=None):
1012 """Run command on all machines in this model.
1013
1014 :param str command: The command to run
1015 :param int timeout: Time to wait before command is considered failed
1016
1017 """
1018 pass
1019
1020 def set_config(self, **config):
1021 """Set configuration keys on this model.
1022
1023 :param \*\*config: Config key/values
1024
1025 """
1026 pass
1027
1028 def set_constraints(self, constraints):
1029 """Set machine constraints on this model.
1030
1031 :param :class:`juju.Constraints` constraints: Machine constraints
1032
1033 """
1034 pass
1035
1036 def get_action_output(self, action_uuid, wait=-1):
1037 """Get the results of an action by ID.
1038
1039 :param str action_uuid: Id of the action
1040 :param int wait: Time in seconds to wait for action to complete
1041
1042 """
1043 pass
1044
1045 def get_action_status(self, uuid_or_prefix=None, name=None):
1046 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1047
1048 :param str uuid_or_prefix: Filter by action uuid or prefix
1049 :param str name: Filter by action name
1050
1051 """
1052 pass
1053
1054 def get_budget(self, budget_name):
1055 """Get budget usage info.
1056
1057 :param str budget_name: Name of budget
1058
1059 """
1060 pass
1061
1062 def get_status(self, filter_=None, utc=False):
1063 """Return the status of the model.
1064
1065 :param str filter_: Service or unit name or wildcard ('*')
1066 :param bool utc: Display time as UTC in RFC3339 format
1067
1068 """
1069 pass
1070 status = get_status
1071
1072 def sync_tools(
1073 self, all_=False, destination=None, dry_run=False, public=False,
1074 source=None, stream=None, version=None):
1075 """Copy Juju tools into this model.
1076
1077 :param bool all_: Copy all versions, not just the latest
1078 :param str destination: Path to local destination directory
1079 :param bool dry_run: Don't do the actual copy
1080 :param bool public: Tools are for a public cloud, so generate mirrors
1081 information
1082 :param str source: Path to local source directory
1083 :param str stream: Simplestreams stream for which to sync metadata
1084 :param str version: Copy a specific major.minor version
1085
1086 """
1087 pass
1088
1089 def unblock(self, *commands):
1090 """Unblock an operation that would alter this model.
1091
1092 :param str \*commands: The commands to unblock. Valid values are
1093 'all-changes', 'destroy-model', 'remove-object'
1094
1095 """
1096 pass
1097
1098 def unset_config(self, *keys):
1099 """Unset configuration on this model.
1100
1101 :param str \*keys: The keys to unset
1102
1103 """
1104 pass
1105
1106 def upgrade_gui(self):
1107 """Upgrade the Juju GUI for this model.
1108
1109 """
1110 pass
1111
1112 def upgrade_juju(
1113 self, dry_run=False, reset_previous_upgrade=False,
1114 upload_tools=False, version=None):
1115 """Upgrade Juju on all machines in a model.
1116
1117 :param bool dry_run: Don't do the actual upgrade
1118 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1119 upgrade status
1120 :param bool upload_tools: Upload local version of tools
1121 :param str version: Upgrade to a specific version
1122
1123 """
1124 pass
1125
1126 def upload_backup(self, archive_path):
1127 """Store a backup archive remotely in Juju.
1128
1129 :param str archive_path: Path to local archive
1130
1131 """
1132 pass
1133
1134 @property
1135 def charmstore(self):
1136 return self._charmstore
1137
1138
1139 class BundleHandler(object):
1140 """
1141 Handle bundles by using the API to translate bundle YAML into a plan of
1142 steps and then dispatching each of those using the API.
1143 """
1144 def __init__(self, model):
1145 self.model = model
1146 self.charmstore = model.charmstore
1147 self.plan = []
1148 self.references = {}
1149 self._units_by_app = {}
1150 for unit_name, unit in model.units.items():
1151 app_units = self._units_by_app.setdefault(unit.application, [])
1152 app_units.append(unit_name)
1153 self.client_facade = client.ClientFacade()
1154 self.client_facade.connect(model.connection)
1155 self.app_facade = client.ApplicationFacade()
1156 self.app_facade.connect(model.connection)
1157 self.ann_facade = client.AnnotationsFacade()
1158 self.ann_facade.connect(model.connection)
1159
1160 async def fetch_plan(self, entity_id):
1161 bundle_yaml = await self.charmstore.files(entity_id,
1162 filename='bundle.yaml',
1163 read_file=True)
1164 self.bundle = yaml.safe_load(bundle_yaml)
1165 self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
1166
1167 async def execute_plan(self):
1168 for step in self.plan.changes:
1169 method = getattr(self, step.method)
1170 result = await method(*step.args)
1171 self.references[step.id_] = result
1172
1173 @property
1174 def applications(self):
1175 return list(self.bundle['services'].keys())
1176
1177 def resolve(self, reference):
1178 if reference and reference.startswith('$'):
1179 reference = self.references[reference[1:]]
1180 return reference
1181
1182 async def addCharm(self, charm, series):
1183 """
1184 :param charm string:
1185 Charm holds the URL of the charm to be added.
1186
1187 :param series string:
1188 Series holds the series of the charm to be added
1189 if the charm default is not sufficient.
1190 """
1191 entity_id = await self.charmstore.entityId(charm)
1192 log.debug('Adding %s', entity_id)
1193 await self.client_facade.AddCharm(None, entity_id)
1194 return entity_id
1195
1196 async def addMachines(self, series, constraints, container_type,
1197 parent_id):
1198 """
1199 :param series string:
1200 Series holds the optional machine OS series.
1201
1202 :param constraints string:
1203 Constraints holds the optional machine constraints.
1204
1205 :param Container_type string:
1206 ContainerType optionally holds the type of the container (for
1207 instance ""lxc" or kvm"). It is not specified for top level
1208 machines.
1209
1210 :param parent_id string:
1211 ParentId optionally holds a placeholder pointing to another machine
1212 change or to a unit change. This value is only specified in the
1213 case this machine is a container, in which case also ContainerType
1214 is set.
1215 """
1216 params = client.AddMachineParams(
1217 series=series,
1218 constraints=constraints,
1219 container_type=container_type,
1220 parent_id=self.resolve(parent_id),
1221 )
1222 results = await self.client_facade.AddMachines(params)
1223 log.debug('Added new machine %s', results[0].machine)
1224 return results[0].machine
1225
1226 async def addRelation(self, endpoint1, endpoint2):
1227 """
1228 :param endpoint1 string:
1229 :param endpoint2 string:
1230 Endpoint1 and Endpoint2 hold relation endpoints in the
1231 "application:interface" form, where the application is always a
1232 placeholder pointing to an application change, and the interface is
1233 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1234 """
1235 endpoints = [endpoint1, endpoint2]
1236 # resolve indirect references
1237 for i in range(len(endpoints)):
1238 parts = endpoints[i].split(':')
1239 parts[0] = self.resolve(parts[0])
1240 endpoints[i] = ':'.join(parts)
1241
1242 log.info('Relating %s <-> %s', *endpoints)
1243 return await self.model.add_relation(*endpoints)
1244
1245 async def deploy(self, charm, series, application, options, constraints,
1246 storage, endpoint_bindings, resources):
1247 """
1248 :param charm string:
1249 Charm holds the URL of the charm to be used to deploy this
1250 application.
1251
1252 :param series string:
1253 Series holds the series of the application to be deployed
1254 if the charm default is not sufficient.
1255
1256 :param application string:
1257 Application holds the application name.
1258
1259 :param options map[string]interface{}:
1260 Options holds application options.
1261
1262 :param constraints string:
1263 Constraints holds the optional application constraints.
1264
1265 :param storage map[string]string:
1266 Storage holds the optional storage constraints.
1267
1268 :param endpoint_bindings map[string]string:
1269 EndpointBindings holds the optional endpoint bindings
1270
1271 :param resources map[string]int:
1272 Resources identifies the revision to use for each resource
1273 of the application's charm.
1274 """
1275 # resolve indirect references
1276 charm = self.resolve(charm)
1277 # stringify all config values for API
1278 options = {k: str(v) for k, v in options.items()}
1279 # build param object
1280 app = client.ApplicationDeploy(
1281 charm_url=charm,
1282 series=series,
1283 application=application,
1284 config=options,
1285 constraints=constraints,
1286 storage=storage,
1287 endpoint_bindings=endpoint_bindings,
1288 resources=resources,
1289 )
1290 # do the do
1291 log.info('Deploying %s', charm)
1292 await self.app_facade.Deploy([app])
1293 return application
1294
1295 async def addUnit(self, application, to):
1296 """
1297 :param application string:
1298 Application holds the application placeholder name for which a unit
1299 is added.
1300
1301 :param to string:
1302 To holds the optional location where to add the unit, as a
1303 placeholder pointing to another unit change or to a machine change.
1304 """
1305 application = self.resolve(application)
1306 placement = self.resolve(to)
1307 if self._units_by_app.get(application):
1308 # enough units for this application already exist;
1309 # claim one, and carry on
1310 # NB: this should probably honor placement, but the juju client
1311 # doesn't, so we're not bothering, either
1312 unit_name = self._units_by_app[application].pop()
1313 log.debug('Reusing unit %s for %s', unit_name, application)
1314 return self.model.units[unit_name]
1315
1316 log.debug('Adding new unit for %s%s', application,
1317 ' to %s' % placement if placement else '')
1318 return await self.model.applications[application].add_unit(
1319 count=1,
1320 to=placement,
1321 )
1322
1323 async def expose(self, application):
1324 """
1325 :param application string:
1326 Application holds the placeholder name of the application that must
1327 be exposed.
1328 """
1329 application = self.resolve(application)
1330 log.info('Exposing %s', application)
1331 return await self.model.applications[application].expose()
1332
1333 async def setAnnotations(self, id_, entity_type, annotations):
1334 """
1335 :param id_ string:
1336 Id is the placeholder for the application or machine change
1337 corresponding to the entity to be annotated.
1338
1339 :param entity_type EntityType:
1340 EntityType holds the type of the entity, "application" or
1341 "machine".
1342
1343 :param annotations map[string]string:
1344 Annotations holds the annotations as key/value pairs.
1345 """
1346 entity_id = self.resolve(id_)
1347 try:
1348 entity = self.model.state.get_entity(entity_type, entity_id)
1349 except KeyError:
1350 entity = await self.model._wait_for_new(entity_type, entity_id)
1351 return await entity.set_annotations(annotations)
1352
1353
1354 class CharmStore(object):
1355 """
1356 Async wrapper around theblues.charmstore.CharmStore
1357 """
1358 def __init__(self, loop):
1359 self.loop = loop
1360 self._cs = charmstore.CharmStore()
1361
1362 def __getattr__(self, name):
1363 """
1364 Wrap method calls in coroutines that use run_in_executor to make them
1365 async.
1366 """
1367 attr = getattr(self._cs, name)
1368 if not callable(attr):
1369 wrapper = partial(getattr, self._cs, name)
1370 setattr(self, name, wrapper)
1371 else:
1372 async def coro(*args, **kwargs):
1373 method = partial(attr, *args, **kwargs)
1374 return await self.loop.run_in_executor(None, method)
1375 setattr(self, name, coro)
1376 wrapper = coro
1377 return wrapper