26611fc36da76e5b48284f5bb341292989cb057d
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import collections
3 import logging
4 import re
5 import weakref
6 from concurrent.futures import CancelledError
7 from functools import partial
8
9 import yaml
10 from theblues import charmstore
11
12 from .client import client
13 from .client import watcher
14 from .client import connection
15 from .delta import get_entity_delta
16 from .delta import get_entity_class
17 from .exceptions import DeadEntityException
18 from .errors import JujuAPIError
19
20 log = logging.getLogger(__name__)
21
22
23 class _Observer(object):
24 """Wrapper around an observer callable.
25
26 This wrapper allows filter criteria to be associated with the
27 callable so that it's only called for changes that meet the criteria.
28
29 """
30 def __init__(self, callable_, entity_type, action, entity_id, predicate):
31 self.callable_ = callable_
32 self.entity_type = entity_type
33 self.action = action
34 self.entity_id = entity_id
35 self.predicate = predicate
36 if self.entity_id:
37 self.entity_id = str(self.entity_id)
38 if not self.entity_id.startswith('^'):
39 self.entity_id = '^' + self.entity_id
40 if not self.entity_id.endswith('$'):
41 self.entity_id += '$'
42
43 async def __call__(self, delta, old, new, model):
44 await self.callable_(delta, old, new, model)
45
46 def cares_about(self, delta):
47 """Return True if this observer "cares about" (i.e. wants to be
48 called) for a this delta.
49
50 """
51 if (self.entity_id and delta.get_id() and
52 not re.match(self.entity_id, str(delta.get_id()))):
53 return False
54
55 if self.entity_type and self.entity_type != delta.entity:
56 return False
57
58 if self.action and self.action != delta.type:
59 return False
60
61 if self.predicate and not self.predicate(delta):
62 return False
63
64 return True
65
66
67 class ModelObserver(object):
68 async def __call__(self, delta, old, new, model):
69 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
70 method = getattr(self, handler_name, self.on_change)
71 await method(delta, old, new, model)
72
73 async def on_change(self, delta, old, new, model):
74 pass
75
76
77 class ModelState(object):
78 """Holds the state of the model, including the delta history of all
79 entities in the model.
80
81 """
82 def __init__(self, model):
83 self.model = model
84 self.state = dict()
85
86 def _live_entity_map(self, entity_type):
87 """Return an id:Entity map of all the living entities of
88 type ``entity_type``.
89
90 """
91 return {
92 entity_id: self.get_entity(entity_type, entity_id)
93 for entity_id, history in self.state.get(entity_type, {}).items()
94 if history[-1] is not None
95 }
96
97 @property
98 def applications(self):
99 """Return a map of application-name:Application for all applications
100 currently in the model.
101
102 """
103 return self._live_entity_map('application')
104
105 @property
106 def machines(self):
107 """Return a map of machine-id:Machine for all machines currently in
108 the model.
109
110 """
111 return self._live_entity_map('machine')
112
113 @property
114 def units(self):
115 """Return a map of unit-id:Unit for all units currently in
116 the model.
117
118 """
119 return self._live_entity_map('unit')
120
121 def entity_history(self, entity_type, entity_id):
122 """Return the history deque for an entity.
123
124 """
125 return self.state[entity_type][entity_id]
126
127 def entity_data(self, entity_type, entity_id, history_index):
128 """Return the data dict for an entity at a specific index of its
129 history.
130
131 """
132 return self.entity_history(entity_type, entity_id)[history_index]
133
134 def apply_delta(self, delta):
135 """Apply delta to our state and return a copy of the
136 affected object as it was before and after the update, e.g.:
137
138 old_obj, new_obj = self.apply_delta(delta)
139
140 old_obj may be None if the delta is for the creation of a new object,
141 e.g. a new application or unit is deployed.
142
143 new_obj will never be None, but may be dead (new_obj.dead == True)
144 if the object was deleted as a result of the delta being applied.
145
146 """
147 history = (
148 self.state
149 .setdefault(delta.entity, {})
150 .setdefault(delta.get_id(), collections.deque())
151 )
152
153 history.append(delta.data)
154 if delta.type == 'remove':
155 history.append(None)
156
157 entity = self.get_entity(delta.entity, delta.get_id())
158 return entity.previous(), entity
159
160 def get_entity(
161 self, entity_type, entity_id, history_index=-1, connected=True):
162 """Return an object instance representing the entity created or
163 updated by ``delta``
164
165 """
166 """
167 log.debug(
168 'Getting %s:%s at index %s',
169 entity_type, entity_id, history_index)
170 """
171
172 if history_index < 0 and history_index != -1:
173 history_index += len(self.entity_history(entity_type, entity_id))
174 if history_index < 0:
175 return None
176
177 try:
178 self.entity_data(entity_type, entity_id, history_index)
179 except IndexError:
180 return None
181
182 entity_class = get_entity_class(entity_type)
183 return entity_class(
184 entity_id, self.model, history_index=history_index,
185 connected=connected)
186
187
188 class ModelEntity(object):
189 """An object in the Model tree"""
190
191 def __init__(self, entity_id, model, history_index=-1, connected=True):
192 """Initialize a new entity
193
194 :param entity_id str: The unique id of the object in the model
195 :param model: The model instance in whose object tree this
196 entity resides
197 :history_index int: The index of this object's state in the model's
198 history deque for this entity
199 :connected bool: Flag indicating whether this object gets live updates
200 from the model.
201
202 """
203 self.entity_id = entity_id
204 self.model = model
205 self._history_index = history_index
206 self.connected = connected
207 self.connection = model.connection
208
209 def __getattr__(self, name):
210 """Fetch object attributes from the underlying data dict held in the
211 model.
212
213 """
214 if self.data is None:
215 raise DeadEntityException(
216 "Entity {}:{} is dead - its attributes can no longer be "
217 "accessed. Use the .previous() method on this object to get "
218 "a copy of the object at its previous state.".format(
219 self.entity_type, self.entity_id))
220 return self.data[name]
221
222 def __bool__(self):
223 return bool(self.data)
224
225 def on_change(self, callable_):
226 """Add a change observer to this entity.
227
228 """
229 self.model.add_observer(
230 callable_, self.entity_type, 'change', self.entity_id)
231
232 def on_remove(self, callable_):
233 """Add a remove observer to this entity.
234
235 """
236 self.model.add_observer(
237 callable_, self.entity_type, 'remove', self.entity_id)
238
239 @property
240 def entity_type(self):
241 """A string identifying the entity type of this object, e.g.
242 'application' or 'unit', etc.
243
244 """
245 return self.__class__.__name__.lower()
246
247 @property
248 def current(self):
249 """Return True if this object represents the current state of the
250 entity in the underlying model.
251
252 This will be True except when the object represents an entity at a
253 non-latest state in history, e.g. if the object was obtained by calling
254 .previous() on another object.
255
256 """
257 return self._history_index == -1
258
259 @property
260 def dead(self):
261 """Returns True if this entity no longer exists in the underlying
262 model.
263
264 """
265 return (
266 self.data is None or
267 self.model.state.entity_data(
268 self.entity_type, self.entity_id, -1) is None
269 )
270
271 @property
272 def alive(self):
273 """Returns True if this entity still exists in the underlying
274 model.
275
276 """
277 return not self.dead
278
279 @property
280 def data(self):
281 """The data dictionary for this entity.
282
283 """
284 return self.model.state.entity_data(
285 self.entity_type, self.entity_id, self._history_index)
286
287 def previous(self):
288 """Return a copy of this object as was at its previous state in
289 history.
290
291 Returns None if this object is new (and therefore has no history).
292
293 The returned object is always "disconnected", i.e. does not receive
294 live updates.
295
296 """
297 return self.model.state.get_entity(
298 self.entity_type, self.entity_id, self._history_index - 1,
299 connected=False)
300
301 def next(self):
302 """Return a copy of this object at its next state in
303 history.
304
305 Returns None if this object is already the latest.
306
307 The returned object is "disconnected", i.e. does not receive
308 live updates, unless it is current (latest).
309
310 """
311 if self._history_index == -1:
312 return None
313
314 new_index = self._history_index + 1
315 connected = (
316 new_index == len(self.model.state.entity_history(
317 self.entity_type, self.entity_id)) - 1
318 )
319 return self.model.state.get_entity(
320 self.entity_type, self.entity_id, self._history_index - 1,
321 connected=connected)
322
323 def latest(self):
324 """Return a copy of this object at its current state in the model.
325
326 Returns self if this object is already the latest.
327
328 The returned object is always "connected", i.e. receives
329 live updates from the model.
330
331 """
332 if self._history_index == -1:
333 return self
334
335 return self.model.state.get_entity(self.entity_type, self.entity_id)
336
337
338 class Model(object):
339 def __init__(self, loop=None):
340 """Instantiate a new connected Model.
341
342 :param loop: an asyncio event loop
343
344 """
345 self.loop = loop or asyncio.get_event_loop()
346 self.connection = None
347 self.observers = weakref.WeakValueDictionary()
348 self.state = ModelState(self)
349 self._watcher_task = None
350 self._watch_shutdown = asyncio.Event(loop=loop)
351 self._watch_received = asyncio.Event(loop=loop)
352 self._charmstore = CharmStore(self.loop)
353
354 async def connect_current(self):
355 """Connect to the current Juju model.
356
357 """
358 self.connection = await connection.Connection.connect_current()
359 self._watch()
360 await self._watch_received.wait()
361
362 async def disconnect(self):
363 """Shut down the watcher task and close websockets.
364
365 """
366 self._stop_watching()
367 if self.connection and self.connection.is_open:
368 await self._watch_shutdown.wait()
369 log.debug('Closing model connection')
370 await self.connection.close()
371 self.connection = None
372
373 def all_units_idle(self):
374 """Return True if all units are idle.
375
376 """
377 for unit in self.units.values():
378 unit_status = unit.data['agent-status']['current']
379 if unit_status != 'idle':
380 return False
381 return True
382
383 async def reset(self, force=False):
384 """Reset the model to a clean state.
385
386 :param bool force: Force-terminate machines.
387
388 This returns only after the model has reached a clean state. "Clean"
389 means no applications or machines exist in the model.
390
391 """
392 log.debug('Resetting model')
393 for app in self.applications.values():
394 await app.destroy()
395 for machine in self.machines.values():
396 await machine.destroy(force=force)
397 await self.block_until(
398 lambda: len(self.machines) == 0
399 )
400
401 async def block_until(self, *conditions, timeout=None):
402 """Return only after all conditions are true.
403
404 """
405 async def _block():
406 while not all(c() for c in conditions):
407 await asyncio.sleep(0)
408 await asyncio.wait_for(_block(), timeout)
409
410 @property
411 def applications(self):
412 """Return a map of application-name:Application for all applications
413 currently in the model.
414
415 """
416 return self.state.applications
417
418 @property
419 def machines(self):
420 """Return a map of machine-id:Machine for all machines currently in
421 the model.
422
423 """
424 return self.state.machines
425
426 @property
427 def units(self):
428 """Return a map of unit-id:Unit for all units currently in
429 the model.
430
431 """
432 return self.state.units
433
434 def add_observer(
435 self, callable_, entity_type=None, action=None, entity_id=None,
436 predicate=None):
437 """Register an "on-model-change" callback
438
439 Once the model is connected, ``callable_``
440 will be called each time the model changes. callable_ should
441 be Awaitable and accept the following positional arguments:
442
443 delta - An instance of :class:`juju.delta.EntityDelta`
444 containing the raw delta data recv'd from the Juju
445 websocket.
446
447 old_obj - If the delta modifies an existing object in the model,
448 old_obj will be a copy of that object, as it was before the
449 delta was applied. Will be None if the delta creates a new
450 entity in the model.
451
452 new_obj - A copy of the new or updated object, after the delta
453 is applied. Will be None if the delta removes an entity
454 from the model.
455
456 model - The :class:`Model` itself.
457
458 Events for which ``callable_`` is called can be specified by passing
459 entity_type, action, and/or id_ filter criteria, e.g.:
460
461 add_observer(
462 myfunc, entity_type='application', action='add', id_='ubuntu')
463
464 For more complex filtering conditions, pass a predicate function. It
465 will be called with a delta as its only argument. If the predicate
466 function returns True, the callable_ will be called.
467
468 """
469 observer = _Observer(
470 callable_, entity_type, action, entity_id, predicate)
471 self.observers[observer] = callable_
472
473 def _watch(self):
474 """Start an asynchronous watch against this model.
475
476 See :meth:`add_observer` to register an onchange callback.
477
478 """
479 async def _start_watch():
480 self._watch_shutdown.clear()
481 try:
482 allwatcher = watcher.AllWatcher()
483 self._watch_conn = await self.connection.clone()
484 allwatcher.connect(self._watch_conn)
485 while True:
486 results = await allwatcher.Next()
487 for delta in results.deltas:
488 delta = get_entity_delta(delta)
489 old_obj, new_obj = self.state.apply_delta(delta)
490 # XXX: Might not want to shield at this level
491 # We are shielding because when the watcher is
492 # canceled (on disconnect()), we don't want all of
493 # its children (every observer callback) to be
494 # canceled with it. So we shield them. But this means
495 # they can *never* be canceled.
496 await asyncio.shield(
497 self._notify_observers(delta, old_obj, new_obj))
498 self._watch_received.set()
499 except CancelledError:
500 log.debug('Closing watcher connection')
501 await self._watch_conn.close()
502 self._watch_shutdown.set()
503 self._watch_conn = None
504
505 log.debug('Starting watcher task')
506 self._watcher_task = self.loop.create_task(_start_watch())
507
508 def _stop_watching(self):
509 """Stop the asynchronous watch against this model.
510
511 """
512 log.debug('Stopping watcher task')
513 if self._watcher_task:
514 self._watcher_task.cancel()
515
516 async def _notify_observers(self, delta, old_obj, new_obj):
517 """Call observing callbacks, notifying them of a change in model state
518
519 :param delta: The raw change from the watcher
520 (:class:`juju.client.overrides.Delta`)
521 :param old_obj: The object in the model that this delta updates.
522 May be None.
523 :param new_obj: The object in the model that is created or updated
524 by applying this delta.
525
526 """
527 if new_obj and not old_obj:
528 delta.type = 'add'
529
530 log.debug(
531 'Model changed: %s %s %s',
532 delta.entity, delta.type, delta.get_id())
533
534 for o in self.observers:
535 if o.cares_about(delta):
536 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
537
538 async def _wait_for_new(self, entity_type, entity_id, predicate=None):
539 """Wait for a new object to appear in the Model and return it.
540
541 Waits for an object of type ``entity_type`` with id ``entity_id``.
542
543 This coroutine blocks until the new object appears in the model.
544
545 """
546 entity_added = asyncio.Queue(loop=self.loop)
547
548 async def callback(delta, old, new, model):
549 await entity_added.put(delta.get_id())
550
551 self.add_observer(callback, entity_type, 'add', entity_id, predicate)
552 entity_id = await entity_added.get()
553 return self.state._live_entity_map(entity_type)[entity_id]
554
555 def add_machine(
556 self, spec=None, constraints=None, disks=None, series=None,
557 count=1):
558 """Start a new, empty machine and optionally a container, or add a
559 container to a machine.
560
561 :param str spec: Machine specification
562 Examples::
563
564 (None) - starts a new machine
565 'lxc' - starts a new machine with on lxc container
566 'lxc:4' - starts a new lxc container on machine 4
567 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
568 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
569 'maas2.name' - acquire machine maas2.name on MAAS
570 :param constraints: Machine constraints
571 :type constraints: :class:`juju.Constraints`
572 :param list disks: List of disk :class:`constraints <juju.Constraints>`
573 :param str series: Series
574 :param int count: Number of machines to deploy
575
576 Supported container types are: lxc, lxd, kvm
577
578 When deploying a container to an existing machine, constraints cannot
579 be used.
580
581 """
582 pass
583 add_machines = add_machine
584
585 async def add_relation(self, relation1, relation2):
586 """Add a relation between two applications.
587
588 :param str relation1: '<application>[:<relation_name>]'
589 :param str relation2: '<application>[:<relation_name>]'
590
591 """
592 app_facade = client.ApplicationFacade()
593 app_facade.connect(self.connection)
594
595 log.debug(
596 'Adding relation %s <-> %s', relation1, relation2)
597
598 try:
599 result = await app_facade.AddRelation([relation1, relation2])
600 except JujuAPIError as e:
601 if 'relation already exists' not in e.message:
602 raise
603 log.debug(
604 'Relation %s <-> %s already exists', relation1, relation2)
605 # TODO: if relation already exists we should return the
606 # Relation ModelEntity here
607 return None
608
609 def predicate(delta):
610 endpoints = {}
611 for endpoint in delta.data['endpoints']:
612 endpoints[endpoint['application-name']] = endpoint['relation']
613 return endpoints == result.endpoints
614
615 return await self._wait_for_new('relation', None, predicate)
616
617 def add_space(self, name, *cidrs):
618 """Add a new network space.
619
620 Adds a new space with the given name and associates the given
621 (optional) list of existing subnet CIDRs with it.
622
623 :param str name: Name of the space
624 :param \*cidrs: Optional list of existing subnet CIDRs
625
626 """
627 pass
628
629 def add_ssh_key(self, key):
630 """Add a public SSH key to this model.
631
632 :param str key: The public ssh key
633
634 """
635 pass
636 add_ssh_keys = add_ssh_key
637
638 def add_subnet(self, cidr_or_id, space, *zones):
639 """Add an existing subnet to this model.
640
641 :param str cidr_or_id: CIDR or provider ID of the existing subnet
642 :param str space: Network space with which to associate
643 :param str \*zones: Zone(s) in which the subnet resides
644
645 """
646 pass
647
648 def get_backups(self):
649 """Retrieve metadata for backups in this model.
650
651 """
652 pass
653
654 def block(self, *commands):
655 """Add a new block to this model.
656
657 :param str \*commands: The commands to block. Valid values are
658 'all-changes', 'destroy-model', 'remove-object'
659
660 """
661 pass
662
663 def get_blocks(self):
664 """List blocks for this model.
665
666 """
667 pass
668
669 def get_cached_images(self, arch=None, kind=None, series=None):
670 """Return a list of cached OS images.
671
672 :param str arch: Filter by image architecture
673 :param str kind: Filter by image kind, e.g. 'lxd'
674 :param str series: Filter by image series, e.g. 'xenial'
675
676 """
677 pass
678
679 def create_backup(self, note=None, no_download=False):
680 """Create a backup of this model.
681
682 :param str note: A note to store with the backup
683 :param bool no_download: Do not download the backup archive
684 :return str: Path to downloaded archive
685
686 """
687 pass
688
689 def create_storage_pool(self, name, provider_type, **pool_config):
690 """Create or define a storage pool.
691
692 :param str name: Name to give the storage pool
693 :param str provider_type: Pool provider type
694 :param \*\*pool_config: key/value pool configuration pairs
695
696 """
697 pass
698
699 def debug_log(
700 self, no_tail=False, exclude_module=None, include_module=None,
701 include=None, level=None, limit=0, lines=10, replay=False,
702 exclude=None):
703 """Get log messages for this model.
704
705 :param bool no_tail: Stop after returning existing log messages
706 :param list exclude_module: Do not show log messages for these logging
707 modules
708 :param list include_module: Only show log messages for these logging
709 modules
710 :param list include: Only show log messages for these entities
711 :param str level: Log level to show, valid options are 'TRACE',
712 'DEBUG', 'INFO', 'WARNING', 'ERROR,
713 :param int limit: Return this many of the most recent (possibly
714 filtered) lines are shown
715 :param int lines: Yield this many of the most recent lines, and keep
716 yielding
717 :param bool replay: Yield the entire log, and keep yielding
718 :param list exclude: Do not show log messages for these entities
719
720 """
721 pass
722
723 async def deploy(
724 self, entity_url, service_name=None, bind=None, budget=None,
725 channel=None, config=None, constraints=None, force=False,
726 num_units=1, plan=None, resources=None, series=None, storage=None,
727 to=None):
728 """Deploy a new service or bundle.
729
730 :param str entity_url: Charm or bundle url
731 :param str service_name: Name to give the service
732 :param dict bind: <charm endpoint>:<network space> pairs
733 :param dict budget: <budget name>:<limit> pairs
734 :param str channel: Charm store channel from which to retrieve
735 the charm or bundle, e.g. 'development'
736 :param dict config: Charm configuration dictionary
737 :param constraints: Service constraints
738 :type constraints: :class:`juju.Constraints`
739 :param bool force: Allow charm to be deployed to a machine running
740 an unsupported series
741 :param int num_units: Number of units to deploy
742 :param str plan: Plan under which to deploy charm
743 :param dict resources: <resource name>:<file path> pairs
744 :param str series: Series on which to deploy
745 :param dict storage: Storage constraints TODO how do these look?
746 :param str to: Placement directive, e.g.::
747
748 '23' - machine 23
749 'lxc:7' - new lxc container on machine 7
750 '24/lxc/3' - lxc container 3 or machine 24
751
752 If None, a new machine is provisioned.
753
754
755 TODO::
756
757 - service_name is required; fill this in automatically if not
758 provided by caller
759 - series is required; how do we pick a default?
760
761 """
762 if constraints:
763 constraints = client.Value(**constraints)
764
765 if to:
766 placement = [
767 client.Placement(**p) for p in to
768 ]
769 else:
770 placement = []
771
772 if storage:
773 storage = {
774 k: client.Constraints(**v)
775 for k, v in storage.items()
776 }
777
778 entity_id = await self.charmstore.entityId(entity_url)
779
780 app_facade = client.ApplicationFacade()
781 client_facade = client.ClientFacade()
782 app_facade.connect(self.connection)
783 client_facade.connect(self.connection)
784
785 if 'bundle/' in entity_id:
786 handler = BundleHandler(self)
787 await handler.fetch_plan(entity_id)
788 await handler.execute_plan()
789 extant_apps = {app for app in self.applications}
790 pending_apps = set(handler.applications) - extant_apps
791 if pending_apps:
792 # new apps will usually be in the model by now, but if some
793 # haven't made it yet we'll need to wait on them to be added
794 await asyncio.gather(*[
795 asyncio.ensure_future(
796 self.model._wait_for_new('application', app_name))
797 for app_name in pending_apps
798 ])
799 return [app for name, app in self.applications.items()
800 if name in handler.applications]
801 else:
802 log.debug(
803 'Deploying %s', entity_id)
804
805 await client_facade.AddCharm(channel, entity_id)
806 app = client.ApplicationDeploy(
807 application=service_name,
808 channel=channel,
809 charm_url=entity_id,
810 config=config,
811 constraints=constraints,
812 endpoint_bindings=bind,
813 num_units=num_units,
814 placement=placement,
815 resources=resources,
816 series=series,
817 storage=storage,
818 )
819
820 await app_facade.Deploy([app])
821 return await self._wait_for_new('application', service_name)
822
823 def destroy(self):
824 """Terminate all machines and resources for this model.
825
826 """
827 pass
828
829 async def destroy_unit(self, *unit_names):
830 """Destroy units by name.
831
832 """
833 app_facade = client.ApplicationFacade()
834 app_facade.connect(self.connection)
835
836 log.debug(
837 'Destroying unit%s %s',
838 's' if len(unit_names) == 1 else '',
839 ' '.join(unit_names))
840
841 return await app_facade.Destroy(self.name)
842 destroy_units = destroy_unit
843
844 def get_backup(self, archive_id):
845 """Download a backup archive file.
846
847 :param str archive_id: The id of the archive to download
848 :return str: Path to the archive file
849
850 """
851 pass
852
853 def enable_ha(
854 self, num_controllers=0, constraints=None, series=None, to=None):
855 """Ensure sufficient controllers exist to provide redundancy.
856
857 :param int num_controllers: Number of controllers to make available
858 :param constraints: Constraints to apply to the controller machines
859 :type constraints: :class:`juju.Constraints`
860 :param str series: Series of the controller machines
861 :param list to: Placement directives for controller machines, e.g.::
862
863 '23' - machine 23
864 'lxc:7' - new lxc container on machine 7
865 '24/lxc/3' - lxc container 3 or machine 24
866
867 If None, a new machine is provisioned.
868
869 """
870 pass
871
872 def get_config(self):
873 """Return the configuration settings for this model.
874
875 """
876 pass
877
878 def get_constraints(self):
879 """Return the machine constraints for this model.
880
881 """
882 pass
883
884 def grant(self, username, acl='read'):
885 """Grant a user access to this model.
886
887 :param str username: Username
888 :param str acl: Access control ('read' or 'write')
889
890 """
891 pass
892
893 def import_ssh_key(self, identity):
894 """Add a public SSH key from a trusted indentity source to this model.
895
896 :param str identity: User identity in the form <lp|gh>:<username>
897
898 """
899 pass
900 import_ssh_keys = import_ssh_key
901
902 def get_machines(self, machine, utc=False):
903 """Return list of machines in this model.
904
905 :param str machine: Machine id, e.g. '0'
906 :param bool utc: Display time as UTC in RFC3339 format
907
908 """
909 pass
910
911 def get_shares(self):
912 """Return list of all users with access to this model.
913
914 """
915 pass
916
917 def get_spaces(self):
918 """Return list of all known spaces, including associated subnets.
919
920 """
921 pass
922
923 def get_ssh_key(self):
924 """Return known SSH keys for this model.
925
926 """
927 pass
928 get_ssh_keys = get_ssh_key
929
930 def get_storage(self, filesystem=False, volume=False):
931 """Return details of storage instances.
932
933 :param bool filesystem: Include filesystem storage
934 :param bool volume: Include volume storage
935
936 """
937 pass
938
939 def get_storage_pools(self, names=None, providers=None):
940 """Return list of storage pools.
941
942 :param list names: Only include pools with these names
943 :param list providers: Only include pools for these providers
944
945 """
946 pass
947
948 def get_subnets(self, space=None, zone=None):
949 """Return list of known subnets.
950
951 :param str space: Only include subnets in this space
952 :param str zone: Only include subnets in this zone
953
954 """
955 pass
956
957 def remove_blocks(self):
958 """Remove all blocks from this model.
959
960 """
961 pass
962
963 def remove_backup(self, backup_id):
964 """Delete a backup.
965
966 :param str backup_id: The id of the backup to remove
967
968 """
969 pass
970
971 def remove_cached_images(self, arch=None, kind=None, series=None):
972 """Remove cached OS images.
973
974 :param str arch: Architecture of the images to remove
975 :param str kind: Image kind to remove, e.g. 'lxd'
976 :param str series: Image series to remove, e.g. 'xenial'
977
978 """
979 pass
980
981 def remove_machine(self, *machine_ids):
982 """Remove a machine from this model.
983
984 :param str \*machine_ids: Ids of the machines to remove
985
986 """
987 pass
988 remove_machines = remove_machine
989
990 def remove_ssh_key(self, *keys):
991 """Remove a public SSH key(s) from this model.
992
993 :param str \*keys: Keys to remove
994
995 """
996 pass
997 remove_ssh_keys = remove_ssh_key
998
999 def restore_backup(
1000 self, bootstrap=False, constraints=None, archive=None,
1001 backup_id=None, upload_tools=False):
1002 """Restore a backup archive to a new controller.
1003
1004 :param bool bootstrap: Bootstrap a new state machine
1005 :param constraints: Model constraints
1006 :type constraints: :class:`juju.Constraints`
1007 :param str archive: Path to backup archive to restore
1008 :param str backup_id: Id of backup to restore
1009 :param bool upload_tools: Upload tools if bootstrapping a new machine
1010
1011 """
1012 pass
1013
1014 def retry_provisioning(self):
1015 """Retry provisioning for failed machines.
1016
1017 """
1018 pass
1019
1020 def revoke(self, username, acl='read'):
1021 """Revoke a user's access to this model.
1022
1023 :param str username: Username to revoke
1024 :param str acl: Access control ('read' or 'write')
1025
1026 """
1027 pass
1028
1029 def run(self, command, timeout=None):
1030 """Run command on all machines in this model.
1031
1032 :param str command: The command to run
1033 :param int timeout: Time to wait before command is considered failed
1034
1035 """
1036 pass
1037
1038 def set_config(self, **config):
1039 """Set configuration keys on this model.
1040
1041 :param \*\*config: Config key/values
1042
1043 """
1044 pass
1045
1046 def set_constraints(self, constraints):
1047 """Set machine constraints on this model.
1048
1049 :param :class:`juju.Constraints` constraints: Machine constraints
1050
1051 """
1052 pass
1053
1054 def get_action_output(self, action_uuid, wait=-1):
1055 """Get the results of an action by ID.
1056
1057 :param str action_uuid: Id of the action
1058 :param int wait: Time in seconds to wait for action to complete
1059
1060 """
1061 pass
1062
1063 def get_action_status(self, uuid_or_prefix=None, name=None):
1064 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1065
1066 :param str uuid_or_prefix: Filter by action uuid or prefix
1067 :param str name: Filter by action name
1068
1069 """
1070 pass
1071
1072 def get_budget(self, budget_name):
1073 """Get budget usage info.
1074
1075 :param str budget_name: Name of budget
1076
1077 """
1078 pass
1079
1080 def get_status(self, filter_=None, utc=False):
1081 """Return the status of the model.
1082
1083 :param str filter_: Service or unit name or wildcard ('*')
1084 :param bool utc: Display time as UTC in RFC3339 format
1085
1086 """
1087 pass
1088 status = get_status
1089
1090 def sync_tools(
1091 self, all_=False, destination=None, dry_run=False, public=False,
1092 source=None, stream=None, version=None):
1093 """Copy Juju tools into this model.
1094
1095 :param bool all_: Copy all versions, not just the latest
1096 :param str destination: Path to local destination directory
1097 :param bool dry_run: Don't do the actual copy
1098 :param bool public: Tools are for a public cloud, so generate mirrors
1099 information
1100 :param str source: Path to local source directory
1101 :param str stream: Simplestreams stream for which to sync metadata
1102 :param str version: Copy a specific major.minor version
1103
1104 """
1105 pass
1106
1107 def unblock(self, *commands):
1108 """Unblock an operation that would alter this model.
1109
1110 :param str \*commands: The commands to unblock. Valid values are
1111 'all-changes', 'destroy-model', 'remove-object'
1112
1113 """
1114 pass
1115
1116 def unset_config(self, *keys):
1117 """Unset configuration on this model.
1118
1119 :param str \*keys: The keys to unset
1120
1121 """
1122 pass
1123
1124 def upgrade_gui(self):
1125 """Upgrade the Juju GUI for this model.
1126
1127 """
1128 pass
1129
1130 def upgrade_juju(
1131 self, dry_run=False, reset_previous_upgrade=False,
1132 upload_tools=False, version=None):
1133 """Upgrade Juju on all machines in a model.
1134
1135 :param bool dry_run: Don't do the actual upgrade
1136 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1137 upgrade status
1138 :param bool upload_tools: Upload local version of tools
1139 :param str version: Upgrade to a specific version
1140
1141 """
1142 pass
1143
1144 def upload_backup(self, archive_path):
1145 """Store a backup archive remotely in Juju.
1146
1147 :param str archive_path: Path to local archive
1148
1149 """
1150 pass
1151
1152 @property
1153 def charmstore(self):
1154 return self._charmstore
1155
1156
1157 class BundleHandler(object):
1158 """
1159 Handle bundles by using the API to translate bundle YAML into a plan of
1160 steps and then dispatching each of those using the API.
1161 """
1162 def __init__(self, model):
1163 self.model = model
1164 self.charmstore = model.charmstore
1165 self.plan = []
1166 self.references = {}
1167 self._units_by_app = {}
1168 for unit_name, unit in model.units.items():
1169 app_units = self._units_by_app.setdefault(unit.application, [])
1170 app_units.append(unit_name)
1171 self.client_facade = client.ClientFacade()
1172 self.client_facade.connect(model.connection)
1173 self.app_facade = client.ApplicationFacade()
1174 self.app_facade.connect(model.connection)
1175 self.ann_facade = client.AnnotationsFacade()
1176 self.ann_facade.connect(model.connection)
1177
1178 async def fetch_plan(self, entity_id):
1179 bundle_yaml = await self.charmstore.files(entity_id,
1180 filename='bundle.yaml',
1181 read_file=True)
1182 self.bundle = yaml.safe_load(bundle_yaml)
1183 self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
1184
1185 async def execute_plan(self):
1186 for step in self.plan.changes:
1187 method = getattr(self, step.method)
1188 result = await method(*step.args)
1189 self.references[step.id_] = result
1190
1191 @property
1192 def applications(self):
1193 return list(self.bundle['services'].keys())
1194
1195 def resolve(self, reference):
1196 if reference and reference.startswith('$'):
1197 reference = self.references[reference[1:]]
1198 return reference
1199
1200 async def addCharm(self, charm, series):
1201 """
1202 :param charm string:
1203 Charm holds the URL of the charm to be added.
1204
1205 :param series string:
1206 Series holds the series of the charm to be added
1207 if the charm default is not sufficient.
1208 """
1209 entity_id = await self.charmstore.entityId(charm)
1210 log.debug('Adding %s', entity_id)
1211 await self.client_facade.AddCharm(None, entity_id)
1212 return entity_id
1213
1214 async def addMachines(self, series, constraints, container_type,
1215 parent_id):
1216 """
1217 :param series string:
1218 Series holds the optional machine OS series.
1219
1220 :param constraints string:
1221 Constraints holds the optional machine constraints.
1222
1223 :param Container_type string:
1224 ContainerType optionally holds the type of the container (for
1225 instance ""lxc" or kvm"). It is not specified for top level
1226 machines.
1227
1228 :param parent_id string:
1229 ParentId optionally holds a placeholder pointing to another machine
1230 change or to a unit change. This value is only specified in the
1231 case this machine is a container, in which case also ContainerType
1232 is set.
1233 """
1234 params = client.AddMachineParams(
1235 series=series,
1236 constraints=constraints,
1237 container_type=container_type,
1238 parent_id=self.resolve(parent_id),
1239 )
1240 results = await self.client_facade.AddMachines(params)
1241 log.debug('Added new machine %s', results[0].machine)
1242 return results[0].machine
1243
1244 async def addRelation(self, endpoint1, endpoint2):
1245 """
1246 :param endpoint1 string:
1247 :param endpoint2 string:
1248 Endpoint1 and Endpoint2 hold relation endpoints in the
1249 "application:interface" form, where the application is always a
1250 placeholder pointing to an application change, and the interface is
1251 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1252 """
1253 endpoints = [endpoint1, endpoint2]
1254 # resolve indirect references
1255 for i in range(len(endpoints)):
1256 parts = endpoints[i].split(':')
1257 parts[0] = self.resolve(parts[0])
1258 endpoints[i] = ':'.join(parts)
1259
1260 log.info('Relating %s <-> %s', *endpoints)
1261 return await self.model.add_relation(*endpoints)
1262
1263 async def deploy(self, charm, series, application, options, constraints,
1264 storage, endpoint_bindings, resources):
1265 """
1266 :param charm string:
1267 Charm holds the URL of the charm to be used to deploy this
1268 application.
1269
1270 :param series string:
1271 Series holds the series of the application to be deployed
1272 if the charm default is not sufficient.
1273
1274 :param application string:
1275 Application holds the application name.
1276
1277 :param options map[string]interface{}:
1278 Options holds application options.
1279
1280 :param constraints string:
1281 Constraints holds the optional application constraints.
1282
1283 :param storage map[string]string:
1284 Storage holds the optional storage constraints.
1285
1286 :param endpoint_bindings map[string]string:
1287 EndpointBindings holds the optional endpoint bindings
1288
1289 :param resources map[string]int:
1290 Resources identifies the revision to use for each resource
1291 of the application's charm.
1292 """
1293 # resolve indirect references
1294 charm = self.resolve(charm)
1295 # stringify all config values for API
1296 options = {k: str(v) for k, v in options.items()}
1297 # build param object
1298 app = client.ApplicationDeploy(
1299 charm_url=charm,
1300 series=series,
1301 application=application,
1302 config=options,
1303 constraints=constraints,
1304 storage=storage,
1305 endpoint_bindings=endpoint_bindings,
1306 resources=resources,
1307 )
1308 # do the do
1309 log.info('Deploying %s', charm)
1310 await self.app_facade.Deploy([app])
1311 return application
1312
1313 async def addUnit(self, application, to):
1314 """
1315 :param application string:
1316 Application holds the application placeholder name for which a unit
1317 is added.
1318
1319 :param to string:
1320 To holds the optional location where to add the unit, as a
1321 placeholder pointing to another unit change or to a machine change.
1322 """
1323 application = self.resolve(application)
1324 placement = self.resolve(to)
1325 if self._units_by_app.get(application):
1326 # enough units for this application already exist;
1327 # claim one, and carry on
1328 # NB: this should probably honor placement, but the juju client
1329 # doesn't, so we're not bothering, either
1330 unit_name = self._units_by_app[application].pop()
1331 log.debug('Reusing unit %s for %s', unit_name, application)
1332 return self.model.units[unit_name]
1333
1334 log.debug('Adding new unit for %s%s', application,
1335 ' to %s' % placement if placement else '')
1336 return await self.model.applications[application].add_unit(
1337 count=1,
1338 to=placement,
1339 )
1340
1341 async def expose(self, application):
1342 """
1343 :param application string:
1344 Application holds the placeholder name of the application that must
1345 be exposed.
1346 """
1347 application = self.resolve(application)
1348 log.info('Exposing %s', application)
1349 return await self.model.applications[application].expose()
1350
1351 async def setAnnotations(self, id_, entity_type, annotations):
1352 """
1353 :param id_ string:
1354 Id is the placeholder for the application or machine change
1355 corresponding to the entity to be annotated.
1356
1357 :param entity_type EntityType:
1358 EntityType holds the type of the entity, "application" or
1359 "machine".
1360
1361 :param annotations map[string]string:
1362 Annotations holds the annotations as key/value pairs.
1363 """
1364 entity_id = self.resolve(id_)
1365 try:
1366 entity = self.model.state.get_entity(entity_type, entity_id)
1367 except KeyError:
1368 entity = await self.model._wait_for_new(entity_type, entity_id)
1369 return await entity.set_annotations(annotations)
1370
1371
1372 class CharmStore(object):
1373 """
1374 Async wrapper around theblues.charmstore.CharmStore
1375 """
1376 def __init__(self, loop):
1377 self.loop = loop
1378 self._cs = charmstore.CharmStore()
1379
1380 def __getattr__(self, name):
1381 """
1382 Wrap method calls in coroutines that use run_in_executor to make them
1383 async.
1384 """
1385 attr = getattr(self._cs, name)
1386 if not callable(attr):
1387 wrapper = partial(getattr, self._cs, name)
1388 setattr(self, name, wrapper)
1389 else:
1390 async def coro(*args, **kwargs):
1391 method = partial(attr, *args, **kwargs)
1392 return await self.loop.run_in_executor(None, method)
1393 setattr(self, name, coro)
1394 wrapper = coro
1395 return wrapper