Wire up more Application apis
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import collections
3 import logging
4 import re
5 import weakref
6 from concurrent.futures import CancelledError
7 from functools import partial
8
9 import yaml
10 from theblues import charmstore
11
12 from .client import client
13 from .client import watcher
14 from .client import connection
15 from .delta import get_entity_delta
16 from .delta import get_entity_class
17 from .exceptions import DeadEntityException
18 from .errors import JujuAPIError
19
20 log = logging.getLogger(__name__)
21
22
23 class _Observer(object):
24 """Wrapper around an observer callable.
25
26 This wrapper allows filter criteria to be associated with the
27 callable so that it's only called for changes that meet the criteria.
28
29 """
30 def __init__(self, callable_, entity_type, action, entity_id, predicate):
31 self.callable_ = callable_
32 self.entity_type = entity_type
33 self.action = action
34 self.entity_id = entity_id
35 self.predicate = predicate
36 if self.entity_id:
37 self.entity_id = str(self.entity_id)
38 if not self.entity_id.startswith('^'):
39 self.entity_id = '^' + self.entity_id
40 if not self.entity_id.endswith('$'):
41 self.entity_id += '$'
42
43 async def __call__(self, delta, old, new, model):
44 await self.callable_(delta, old, new, model)
45
46 def cares_about(self, delta):
47 """Return True if this observer "cares about" (i.e. wants to be
48 called) for a this delta.
49
50 """
51 if (self.entity_id and delta.get_id() and
52 not re.match(self.entity_id, str(delta.get_id()))):
53 return False
54
55 if self.entity_type and self.entity_type != delta.entity:
56 return False
57
58 if self.action and self.action != delta.type:
59 return False
60
61 if self.predicate and not self.predicate(delta):
62 return False
63
64 return True
65
66
67 class ModelObserver(object):
68 async def __call__(self, delta, old, new, model):
69 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
70 method = getattr(self, handler_name, self.on_change)
71 await method(delta, old, new, model)
72
73 async def on_change(self, delta, old, new, model):
74 pass
75
76
77 class ModelState(object):
78 """Holds the state of the model, including the delta history of all
79 entities in the model.
80
81 """
82 def __init__(self, model):
83 self.model = model
84 self.state = dict()
85
86 def _live_entity_map(self, entity_type):
87 """Return an id:Entity map of all the living entities of
88 type ``entity_type``.
89
90 """
91 return {
92 entity_id: self.get_entity(entity_type, entity_id)
93 for entity_id, history in self.state.get(entity_type, {}).items()
94 if history[-1] is not None
95 }
96
97 @property
98 def applications(self):
99 """Return a map of application-name:Application for all applications
100 currently in the model.
101
102 """
103 return self._live_entity_map('application')
104
105 @property
106 def machines(self):
107 """Return a map of machine-id:Machine for all machines currently in
108 the model.
109
110 """
111 return self._live_entity_map('machine')
112
113 @property
114 def units(self):
115 """Return a map of unit-id:Unit for all units currently in
116 the model.
117
118 """
119 return self._live_entity_map('unit')
120
121 def entity_history(self, entity_type, entity_id):
122 """Return the history deque for an entity.
123
124 """
125 return self.state[entity_type][entity_id]
126
127 def entity_data(self, entity_type, entity_id, history_index):
128 """Return the data dict for an entity at a specific index of its
129 history.
130
131 """
132 return self.entity_history(entity_type, entity_id)[history_index]
133
134 def apply_delta(self, delta):
135 """Apply delta to our state and return a copy of the
136 affected object as it was before and after the update, e.g.:
137
138 old_obj, new_obj = self.apply_delta(delta)
139
140 old_obj may be None if the delta is for the creation of a new object,
141 e.g. a new application or unit is deployed.
142
143 new_obj will never be None, but may be dead (new_obj.dead == True)
144 if the object was deleted as a result of the delta being applied.
145
146 """
147 history = (
148 self.state
149 .setdefault(delta.entity, {})
150 .setdefault(delta.get_id(), collections.deque())
151 )
152
153 history.append(delta.data)
154 if delta.type == 'remove':
155 history.append(None)
156
157 entity = self.get_entity(delta.entity, delta.get_id())
158 return entity.previous(), entity
159
160 def get_entity(
161 self, entity_type, entity_id, history_index=-1, connected=True):
162 """Return an object instance representing the entity created or
163 updated by ``delta``
164
165 """
166 """
167 log.debug(
168 'Getting %s:%s at index %s',
169 entity_type, entity_id, history_index)
170 """
171
172 if history_index < 0 and history_index != -1:
173 history_index += len(self.entity_history(entity_type, entity_id))
174 if history_index < 0:
175 return None
176
177 try:
178 self.entity_data(entity_type, entity_id, history_index)
179 except IndexError:
180 return None
181
182 entity_class = get_entity_class(entity_type)
183 return entity_class(
184 entity_id, self.model, history_index=history_index,
185 connected=connected)
186
187
188 class ModelEntity(object):
189 """An object in the Model tree"""
190
191 def __init__(self, entity_id, model, history_index=-1, connected=True):
192 """Initialize a new entity
193
194 :param entity_id str: The unique id of the object in the model
195 :param model: The model instance in whose object tree this
196 entity resides
197 :history_index int: The index of this object's state in the model's
198 history deque for this entity
199 :connected bool: Flag indicating whether this object gets live updates
200 from the model.
201
202 """
203 self.entity_id = entity_id
204 self.model = model
205 self._history_index = history_index
206 self.connected = connected
207 self.connection = model.connection
208
209 def __getattr__(self, name):
210 """Fetch object attributes from the underlying data dict held in the
211 model.
212
213 """
214 if self.data is None:
215 raise DeadEntityException(
216 "Entity {}:{} is dead - its attributes can no longer be "
217 "accessed. Use the .previous() method on this object to get "
218 "a copy of the object at its previous state.".format(
219 self.entity_type, self.entity_id))
220 return self.data[name]
221
222 def __bool__(self):
223 return bool(self.data)
224
225 def on_change(self, callable_):
226 """Add a change observer to this entity.
227
228 """
229 self.model.add_observer(
230 callable_, self.entity_type, 'change', self.entity_id)
231
232 def on_remove(self, callable_):
233 """Add a remove observer to this entity.
234
235 """
236 self.model.add_observer(
237 callable_, self.entity_type, 'remove', self.entity_id)
238
239 @property
240 def entity_type(self):
241 """A string identifying the entity type of this object, e.g.
242 'application' or 'unit', etc.
243
244 """
245 return self.__class__.__name__.lower()
246
247 @property
248 def current(self):
249 """Return True if this object represents the current state of the
250 entity in the underlying model.
251
252 This will be True except when the object represents an entity at a
253 non-latest state in history, e.g. if the object was obtained by calling
254 .previous() on another object.
255
256 """
257 return self._history_index == -1
258
259 @property
260 def dead(self):
261 """Returns True if this entity no longer exists in the underlying
262 model.
263
264 """
265 return (
266 self.data is None or
267 self.model.state.entity_data(
268 self.entity_type, self.entity_id, -1) is None
269 )
270
271 @property
272 def alive(self):
273 """Returns True if this entity still exists in the underlying
274 model.
275
276 """
277 return not self.dead
278
279 @property
280 def data(self):
281 """The data dictionary for this entity.
282
283 """
284 return self.model.state.entity_data(
285 self.entity_type, self.entity_id, self._history_index)
286
287 def previous(self):
288 """Return a copy of this object as was at its previous state in
289 history.
290
291 Returns None if this object is new (and therefore has no history).
292
293 The returned object is always "disconnected", i.e. does not receive
294 live updates.
295
296 """
297 return self.model.state.get_entity(
298 self.entity_type, self.entity_id, self._history_index - 1,
299 connected=False)
300
301 def next(self):
302 """Return a copy of this object at its next state in
303 history.
304
305 Returns None if this object is already the latest.
306
307 The returned object is "disconnected", i.e. does not receive
308 live updates, unless it is current (latest).
309
310 """
311 if self._history_index == -1:
312 return None
313
314 new_index = self._history_index + 1
315 connected = (
316 new_index == len(self.model.state.entity_history(
317 self.entity_type, self.entity_id)) - 1
318 )
319 return self.model.state.get_entity(
320 self.entity_type, self.entity_id, self._history_index - 1,
321 connected=connected)
322
323 def latest(self):
324 """Return a copy of this object at its current state in the model.
325
326 Returns self if this object is already the latest.
327
328 The returned object is always "connected", i.e. receives
329 live updates from the model.
330
331 """
332 if self._history_index == -1:
333 return self
334
335 return self.model.state.get_entity(self.entity_type, self.entity_id)
336
337
338 class Model(object):
339 def __init__(self, loop=None):
340 """Instantiate a new connected Model.
341
342 :param loop: an asyncio event loop
343
344 """
345 self.loop = loop or asyncio.get_event_loop()
346 self.connection = None
347 self.observers = weakref.WeakValueDictionary()
348 self.state = ModelState(self)
349 self._watcher_task = None
350 self._watch_shutdown = asyncio.Event(loop=loop)
351 self._watch_received = asyncio.Event(loop=loop)
352 self._charmstore = CharmStore(self.loop)
353
354 async def connect_current(self):
355 """Connect to the current Juju model.
356
357 """
358 self.connection = await connection.Connection.connect_current()
359 self._watch()
360 await self._watch_received.wait()
361
362 async def disconnect(self):
363 """Shut down the watcher task and close websockets.
364
365 """
366 self._stop_watching()
367 if self.connection and self.connection.is_open:
368 await self._watch_shutdown.wait()
369 log.debug('Closing model connection')
370 await self.connection.close()
371 self.connection = None
372
373 def all_units_idle(self):
374 """Return True if all units are idle.
375
376 """
377 for unit in self.units.values():
378 unit_status = unit.data['agent-status']['current']
379 if unit_status != 'idle':
380 return False
381 return True
382
383 async def reset(self, force=False):
384 """Reset the model to a clean state.
385
386 :param bool force: Force-terminate machines.
387
388 This returns only after the model has reached a clean state. "Clean"
389 means no applications or machines exist in the model.
390
391 """
392 log.debug('Resetting model')
393 for app in self.applications.values():
394 await app.destroy()
395 for machine in self.machines.values():
396 await machine.destroy(force=force)
397 await self.block_until(
398 lambda: len(self.machines) == 0
399 )
400
401 async def block_until(self, *conditions, timeout=None):
402 """Return only after all conditions are true.
403
404 """
405 async def _block():
406 while not all(c() for c in conditions):
407 await asyncio.sleep(0)
408 await asyncio.wait_for(_block(), timeout)
409
410 @property
411 def applications(self):
412 """Return a map of application-name:Application for all applications
413 currently in the model.
414
415 """
416 return self.state.applications
417
418 @property
419 def machines(self):
420 """Return a map of machine-id:Machine for all machines currently in
421 the model.
422
423 """
424 return self.state.machines
425
426 @property
427 def units(self):
428 """Return a map of unit-id:Unit for all units currently in
429 the model.
430
431 """
432 return self.state.units
433
434 def add_observer(
435 self, callable_, entity_type=None, action=None, entity_id=None,
436 predicate=None):
437 """Register an "on-model-change" callback
438
439 Once the model is connected, ``callable_``
440 will be called each time the model changes. callable_ should
441 be Awaitable and accept the following positional arguments:
442
443 delta - An instance of :class:`juju.delta.EntityDelta`
444 containing the raw delta data recv'd from the Juju
445 websocket.
446
447 old_obj - If the delta modifies an existing object in the model,
448 old_obj will be a copy of that object, as it was before the
449 delta was applied. Will be None if the delta creates a new
450 entity in the model.
451
452 new_obj - A copy of the new or updated object, after the delta
453 is applied. Will be None if the delta removes an entity
454 from the model.
455
456 model - The :class:`Model` itself.
457
458 Events for which ``callable_`` is called can be specified by passing
459 entity_type, action, and/or id_ filter criteria, e.g.:
460
461 add_observer(
462 myfunc, entity_type='application', action='add', id_='ubuntu')
463
464 For more complex filtering conditions, pass a predicate function. It
465 will be called with a delta as its only argument. If the predicate
466 function returns True, the callable_ will be called.
467
468 """
469 observer = _Observer(
470 callable_, entity_type, action, entity_id, predicate)
471 self.observers[observer] = callable_
472
473 def _watch(self):
474 """Start an asynchronous watch against this model.
475
476 See :meth:`add_observer` to register an onchange callback.
477
478 """
479 async def _start_watch():
480 self._watch_shutdown.clear()
481 try:
482 allwatcher = watcher.AllWatcher()
483 self._watch_conn = await self.connection.clone()
484 allwatcher.connect(self._watch_conn)
485 while True:
486 results = await allwatcher.Next()
487 for delta in results.deltas:
488 delta = get_entity_delta(delta)
489 old_obj, new_obj = self.state.apply_delta(delta)
490 # XXX: Might not want to shield at this level
491 # We are shielding because when the watcher is
492 # canceled (on disconnect()), we don't want all of
493 # its children (every observer callback) to be
494 # canceled with it. So we shield them. But this means
495 # they can *never* be canceled.
496 await asyncio.shield(
497 self._notify_observers(delta, old_obj, new_obj))
498 self._watch_received.set()
499 except CancelledError:
500 log.debug('Closing watcher connection')
501 await self._watch_conn.close()
502 self._watch_shutdown.set()
503 self._watch_conn = None
504
505 log.debug('Starting watcher task')
506 self._watcher_task = self.loop.create_task(_start_watch())
507
508 def _stop_watching(self):
509 """Stop the asynchronous watch against this model.
510
511 """
512 log.debug('Stopping watcher task')
513 if self._watcher_task:
514 self._watcher_task.cancel()
515
516 async def _notify_observers(self, delta, old_obj, new_obj):
517 """Call observing callbacks, notifying them of a change in model state
518
519 :param delta: The raw change from the watcher
520 (:class:`juju.client.overrides.Delta`)
521 :param old_obj: The object in the model that this delta updates.
522 May be None.
523 :param new_obj: The object in the model that is created or updated
524 by applying this delta.
525
526 """
527 if new_obj and not old_obj:
528 delta.type = 'add'
529
530 log.debug(
531 'Model changed: %s %s %s',
532 delta.entity, delta.type, delta.get_id())
533
534 for o in self.observers:
535 if o.cares_about(delta):
536 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
537
538 async def _wait_for_new(self, entity_type, entity_id, predicate=None):
539 """Wait for a new object to appear in the Model and return it.
540
541 Waits for an object of type ``entity_type`` with id ``entity_id``.
542
543 This coroutine blocks until the new object appears in the model.
544
545 """
546 entity_added = asyncio.Queue(loop=self.loop)
547
548 async def callback(delta, old, new, model):
549 await entity_added.put(delta.get_id())
550
551 self.add_observer(callback, entity_type, 'add', entity_id, predicate)
552 entity_id = await entity_added.get()
553 return self.state._live_entity_map(entity_type)[entity_id]
554
555 def add_machine(
556 self, spec=None, constraints=None, disks=None, series=None,
557 count=1):
558 """Start a new, empty machine and optionally a container, or add a
559 container to a machine.
560
561 :param str spec: Machine specification
562 Examples::
563
564 (None) - starts a new machine
565 'lxc' - starts a new machine with on lxc container
566 'lxc:4' - starts a new lxc container on machine 4
567 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
568 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
569 'maas2.name' - acquire machine maas2.name on MAAS
570 :param constraints: Machine constraints
571 :type constraints: :class:`juju.Constraints`
572 :param list disks: List of disk :class:`constraints <juju.Constraints>`
573 :param str series: Series
574 :param int count: Number of machines to deploy
575
576 Supported container types are: lxc, lxd, kvm
577
578 When deploying a container to an existing machine, constraints cannot
579 be used.
580
581 """
582 pass
583 add_machines = add_machine
584
585 async def add_relation(self, relation1, relation2):
586 """Add a relation between two applications.
587
588 :param str relation1: '<application>[:<relation_name>]'
589 :param str relation2: '<application>[:<relation_name>]'
590
591 """
592 app_facade = client.ApplicationFacade()
593 app_facade.connect(self.connection)
594
595 log.debug(
596 'Adding relation %s <-> %s', relation1, relation2)
597
598 try:
599 result = await app_facade.AddRelation([relation1, relation2])
600 except JujuAPIError as e:
601 if 'relation already exists' not in e.message:
602 raise
603 log.debug(
604 'Relation %s <-> %s already exists', relation1, relation2)
605 # TODO: if relation already exists we should return the
606 # Relation ModelEntity here
607 return None
608
609 def predicate(delta):
610 endpoints = {}
611 for endpoint in delta.data['endpoints']:
612 endpoints[endpoint['application-name']] = endpoint['relation']
613 return endpoints == result.endpoints
614
615 return await self._wait_for_new('relation', None, predicate)
616
617 def add_space(self, name, *cidrs):
618 """Add a new network space.
619
620 Adds a new space with the given name and associates the given
621 (optional) list of existing subnet CIDRs with it.
622
623 :param str name: Name of the space
624 :param \*cidrs: Optional list of existing subnet CIDRs
625
626 """
627 pass
628
629 def add_ssh_key(self, key):
630 """Add a public SSH key to this model.
631
632 :param str key: The public ssh key
633
634 """
635 pass
636 add_ssh_keys = add_ssh_key
637
638 def add_subnet(self, cidr_or_id, space, *zones):
639 """Add an existing subnet to this model.
640
641 :param str cidr_or_id: CIDR or provider ID of the existing subnet
642 :param str space: Network space with which to associate
643 :param str \*zones: Zone(s) in which the subnet resides
644
645 """
646 pass
647
648 def get_backups(self):
649 """Retrieve metadata for backups in this model.
650
651 """
652 pass
653
654 def block(self, *commands):
655 """Add a new block to this model.
656
657 :param str \*commands: The commands to block. Valid values are
658 'all-changes', 'destroy-model', 'remove-object'
659
660 """
661 pass
662
663 def get_blocks(self):
664 """List blocks for this model.
665
666 """
667 pass
668
669 def get_cached_images(self, arch=None, kind=None, series=None):
670 """Return a list of cached OS images.
671
672 :param str arch: Filter by image architecture
673 :param str kind: Filter by image kind, e.g. 'lxd'
674 :param str series: Filter by image series, e.g. 'xenial'
675
676 """
677 pass
678
679 def create_backup(self, note=None, no_download=False):
680 """Create a backup of this model.
681
682 :param str note: A note to store with the backup
683 :param bool no_download: Do not download the backup archive
684 :return str: Path to downloaded archive
685
686 """
687 pass
688
689 def create_storage_pool(self, name, provider_type, **pool_config):
690 """Create or define a storage pool.
691
692 :param str name: Name to give the storage pool
693 :param str provider_type: Pool provider type
694 :param \*\*pool_config: key/value pool configuration pairs
695
696 """
697 pass
698
699 def debug_log(
700 self, no_tail=False, exclude_module=None, include_module=None,
701 include=None, level=None, limit=0, lines=10, replay=False,
702 exclude=None):
703 """Get log messages for this model.
704
705 :param bool no_tail: Stop after returning existing log messages
706 :param list exclude_module: Do not show log messages for these logging
707 modules
708 :param list include_module: Only show log messages for these logging
709 modules
710 :param list include: Only show log messages for these entities
711 :param str level: Log level to show, valid options are 'TRACE',
712 'DEBUG', 'INFO', 'WARNING', 'ERROR,
713 :param int limit: Return this many of the most recent (possibly
714 filtered) lines are shown
715 :param int lines: Yield this many of the most recent lines, and keep
716 yielding
717 :param bool replay: Yield the entire log, and keep yielding
718 :param list exclude: Do not show log messages for these entities
719
720 """
721 pass
722
723 async def deploy(
724 self, entity_url, service_name=None, bind=None, budget=None,
725 channel=None, config=None, constraints=None, force=False,
726 num_units=1, plan=None, resources=None, series=None, storage=None,
727 to=None):
728 """Deploy a new service or bundle.
729
730 :param str entity_url: Charm or bundle url
731 :param str service_name: Name to give the service
732 :param dict bind: <charm endpoint>:<network space> pairs
733 :param dict budget: <budget name>:<limit> pairs
734 :param str channel: Charm store channel from which to retrieve
735 the charm or bundle, e.g. 'development'
736 :param dict config: Charm configuration dictionary
737 :param constraints: Service constraints
738 :type constraints: :class:`juju.Constraints`
739 :param bool force: Allow charm to be deployed to a machine running
740 an unsupported series
741 :param int num_units: Number of units to deploy
742 :param str plan: Plan under which to deploy charm
743 :param dict resources: <resource name>:<file path> pairs
744 :param str series: Series on which to deploy
745 :param dict storage: Storage constraints TODO how do these look?
746 :param str to: Placement directive, e.g.::
747
748 '23' - machine 23
749 'lxc:7' - new lxc container on machine 7
750 '24/lxc/3' - lxc container 3 or machine 24
751
752 If None, a new machine is provisioned.
753
754
755 TODO::
756
757 - service_name is required; fill this in automatically if not
758 provided by caller
759 - series is required; how do we pick a default?
760
761 """
762 if constraints:
763 constraints = client.Value(**constraints)
764
765 if to:
766 placement = [
767 client.Placement(**p) for p in to
768 ]
769 else:
770 placement = []
771
772 if storage:
773 storage = {
774 k: client.Constraints(**v)
775 for k, v in storage.items()
776 }
777
778 entity_id = await self.charmstore.entityId(entity_url)
779
780 app_facade = client.ApplicationFacade()
781 client_facade = client.ClientFacade()
782 app_facade.connect(self.connection)
783 client_facade.connect(self.connection)
784
785 if 'bundle/' in entity_id:
786 handler = BundleHandler(self)
787 await handler.fetch_plan(entity_id)
788 await handler.execute_plan()
789 extant_apps = {app for app in self.applications}
790 pending_apps = set(handler.applications) - extant_apps
791 if pending_apps:
792 # new apps will usually be in the model by now, but if some
793 # haven't made it yet we'll need to wait on them to be added
794 await asyncio.wait([self._wait_for_new('application', app_name)
795 for app_name in pending_apps])
796 return [app for name, app in self.applications.items()
797 if name in handler.applications]
798 else:
799 log.debug(
800 'Deploying %s', entity_id)
801
802 await client_facade.AddCharm(channel, entity_id)
803 app = client.ApplicationDeploy(
804 application=service_name,
805 channel=channel,
806 charm_url=entity_id,
807 config=config,
808 constraints=constraints,
809 endpoint_bindings=bind,
810 num_units=num_units,
811 placement=placement,
812 resources=resources,
813 series=series,
814 storage=storage,
815 )
816
817 await app_facade.Deploy([app])
818 return [await self._wait_for_new('application', service_name)]
819
820 def destroy(self):
821 """Terminate all machines and resources for this model.
822
823 """
824 pass
825
826 async def destroy_unit(self, *unit_names):
827 """Destroy units by name.
828
829 """
830 app_facade = client.ApplicationFacade()
831 app_facade.connect(self.connection)
832
833 log.debug(
834 'Destroying unit%s %s',
835 's' if len(unit_names) == 1 else '',
836 ' '.join(unit_names))
837
838 return await app_facade.Destroy(self.name)
839 destroy_units = destroy_unit
840
841 def get_backup(self, archive_id):
842 """Download a backup archive file.
843
844 :param str archive_id: The id of the archive to download
845 :return str: Path to the archive file
846
847 """
848 pass
849
850 def enable_ha(
851 self, num_controllers=0, constraints=None, series=None, to=None):
852 """Ensure sufficient controllers exist to provide redundancy.
853
854 :param int num_controllers: Number of controllers to make available
855 :param constraints: Constraints to apply to the controller machines
856 :type constraints: :class:`juju.Constraints`
857 :param str series: Series of the controller machines
858 :param list to: Placement directives for controller machines, e.g.::
859
860 '23' - machine 23
861 'lxc:7' - new lxc container on machine 7
862 '24/lxc/3' - lxc container 3 or machine 24
863
864 If None, a new machine is provisioned.
865
866 """
867 pass
868
869 def get_config(self):
870 """Return the configuration settings for this model.
871
872 """
873 pass
874
875 def get_constraints(self):
876 """Return the machine constraints for this model.
877
878 """
879 pass
880
881 def grant(self, username, acl='read'):
882 """Grant a user access to this model.
883
884 :param str username: Username
885 :param str acl: Access control ('read' or 'write')
886
887 """
888 pass
889
890 def import_ssh_key(self, identity):
891 """Add a public SSH key from a trusted indentity source to this model.
892
893 :param str identity: User identity in the form <lp|gh>:<username>
894
895 """
896 pass
897 import_ssh_keys = import_ssh_key
898
899 def get_machines(self, machine, utc=False):
900 """Return list of machines in this model.
901
902 :param str machine: Machine id, e.g. '0'
903 :param bool utc: Display time as UTC in RFC3339 format
904
905 """
906 pass
907
908 def get_shares(self):
909 """Return list of all users with access to this model.
910
911 """
912 pass
913
914 def get_spaces(self):
915 """Return list of all known spaces, including associated subnets.
916
917 """
918 pass
919
920 def get_ssh_key(self):
921 """Return known SSH keys for this model.
922
923 """
924 pass
925 get_ssh_keys = get_ssh_key
926
927 def get_storage(self, filesystem=False, volume=False):
928 """Return details of storage instances.
929
930 :param bool filesystem: Include filesystem storage
931 :param bool volume: Include volume storage
932
933 """
934 pass
935
936 def get_storage_pools(self, names=None, providers=None):
937 """Return list of storage pools.
938
939 :param list names: Only include pools with these names
940 :param list providers: Only include pools for these providers
941
942 """
943 pass
944
945 def get_subnets(self, space=None, zone=None):
946 """Return list of known subnets.
947
948 :param str space: Only include subnets in this space
949 :param str zone: Only include subnets in this zone
950
951 """
952 pass
953
954 def remove_blocks(self):
955 """Remove all blocks from this model.
956
957 """
958 pass
959
960 def remove_backup(self, backup_id):
961 """Delete a backup.
962
963 :param str backup_id: The id of the backup to remove
964
965 """
966 pass
967
968 def remove_cached_images(self, arch=None, kind=None, series=None):
969 """Remove cached OS images.
970
971 :param str arch: Architecture of the images to remove
972 :param str kind: Image kind to remove, e.g. 'lxd'
973 :param str series: Image series to remove, e.g. 'xenial'
974
975 """
976 pass
977
978 def remove_machine(self, *machine_ids):
979 """Remove a machine from this model.
980
981 :param str \*machine_ids: Ids of the machines to remove
982
983 """
984 pass
985 remove_machines = remove_machine
986
987 def remove_ssh_key(self, *keys):
988 """Remove a public SSH key(s) from this model.
989
990 :param str \*keys: Keys to remove
991
992 """
993 pass
994 remove_ssh_keys = remove_ssh_key
995
996 def restore_backup(
997 self, bootstrap=False, constraints=None, archive=None,
998 backup_id=None, upload_tools=False):
999 """Restore a backup archive to a new controller.
1000
1001 :param bool bootstrap: Bootstrap a new state machine
1002 :param constraints: Model constraints
1003 :type constraints: :class:`juju.Constraints`
1004 :param str archive: Path to backup archive to restore
1005 :param str backup_id: Id of backup to restore
1006 :param bool upload_tools: Upload tools if bootstrapping a new machine
1007
1008 """
1009 pass
1010
1011 def retry_provisioning(self):
1012 """Retry provisioning for failed machines.
1013
1014 """
1015 pass
1016
1017 def revoke(self, username, acl='read'):
1018 """Revoke a user's access to this model.
1019
1020 :param str username: Username to revoke
1021 :param str acl: Access control ('read' or 'write')
1022
1023 """
1024 pass
1025
1026 def run(self, command, timeout=None):
1027 """Run command on all machines in this model.
1028
1029 :param str command: The command to run
1030 :param int timeout: Time to wait before command is considered failed
1031
1032 """
1033 pass
1034
1035 def set_config(self, **config):
1036 """Set configuration keys on this model.
1037
1038 :param \*\*config: Config key/values
1039
1040 """
1041 pass
1042
1043 def set_constraints(self, constraints):
1044 """Set machine constraints on this model.
1045
1046 :param :class:`juju.Constraints` constraints: Machine constraints
1047
1048 """
1049 pass
1050
1051 def get_action_output(self, action_uuid, wait=-1):
1052 """Get the results of an action by ID.
1053
1054 :param str action_uuid: Id of the action
1055 :param int wait: Time in seconds to wait for action to complete
1056
1057 """
1058 pass
1059
1060 def get_action_status(self, uuid_or_prefix=None, name=None):
1061 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1062
1063 :param str uuid_or_prefix: Filter by action uuid or prefix
1064 :param str name: Filter by action name
1065
1066 """
1067 pass
1068
1069 def get_budget(self, budget_name):
1070 """Get budget usage info.
1071
1072 :param str budget_name: Name of budget
1073
1074 """
1075 pass
1076
1077 def get_status(self, filter_=None, utc=False):
1078 """Return the status of the model.
1079
1080 :param str filter_: Service or unit name or wildcard ('*')
1081 :param bool utc: Display time as UTC in RFC3339 format
1082
1083 """
1084 pass
1085 status = get_status
1086
1087 def sync_tools(
1088 self, all_=False, destination=None, dry_run=False, public=False,
1089 source=None, stream=None, version=None):
1090 """Copy Juju tools into this model.
1091
1092 :param bool all_: Copy all versions, not just the latest
1093 :param str destination: Path to local destination directory
1094 :param bool dry_run: Don't do the actual copy
1095 :param bool public: Tools are for a public cloud, so generate mirrors
1096 information
1097 :param str source: Path to local source directory
1098 :param str stream: Simplestreams stream for which to sync metadata
1099 :param str version: Copy a specific major.minor version
1100
1101 """
1102 pass
1103
1104 def unblock(self, *commands):
1105 """Unblock an operation that would alter this model.
1106
1107 :param str \*commands: The commands to unblock. Valid values are
1108 'all-changes', 'destroy-model', 'remove-object'
1109
1110 """
1111 pass
1112
1113 def unset_config(self, *keys):
1114 """Unset configuration on this model.
1115
1116 :param str \*keys: The keys to unset
1117
1118 """
1119 pass
1120
1121 def upgrade_gui(self):
1122 """Upgrade the Juju GUI for this model.
1123
1124 """
1125 pass
1126
1127 def upgrade_juju(
1128 self, dry_run=False, reset_previous_upgrade=False,
1129 upload_tools=False, version=None):
1130 """Upgrade Juju on all machines in a model.
1131
1132 :param bool dry_run: Don't do the actual upgrade
1133 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1134 upgrade status
1135 :param bool upload_tools: Upload local version of tools
1136 :param str version: Upgrade to a specific version
1137
1138 """
1139 pass
1140
1141 def upload_backup(self, archive_path):
1142 """Store a backup archive remotely in Juju.
1143
1144 :param str archive_path: Path to local archive
1145
1146 """
1147 pass
1148
1149 @property
1150 def charmstore(self):
1151 return self._charmstore
1152
1153
1154 class BundleHandler(object):
1155 """
1156 Handle bundles by using the API to translate bundle YAML into a plan of
1157 steps and then dispatching each of those using the API.
1158 """
1159 def __init__(self, model):
1160 self.model = model
1161 self.charmstore = model.charmstore
1162 self.plan = []
1163 self.references = {}
1164 self._units_by_app = {}
1165 for unit_name, unit in model.units.items():
1166 app_units = self._units_by_app.setdefault(unit.application, [])
1167 app_units.append(unit_name)
1168 self.client_facade = client.ClientFacade()
1169 self.client_facade.connect(model.connection)
1170 self.app_facade = client.ApplicationFacade()
1171 self.app_facade.connect(model.connection)
1172 self.ann_facade = client.AnnotationsFacade()
1173 self.ann_facade.connect(model.connection)
1174
1175 async def fetch_plan(self, entity_id):
1176 bundle_yaml = await self.charmstore.files(entity_id,
1177 filename='bundle.yaml',
1178 read_file=True)
1179 self.bundle = yaml.safe_load(bundle_yaml)
1180 self.plan = await self.client_facade.GetBundleChanges(bundle_yaml)
1181
1182 async def execute_plan(self):
1183 for step in self.plan.changes:
1184 method = getattr(self, step.method)
1185 result = await method(*step.args)
1186 self.references[step.id_] = result
1187
1188 @property
1189 def applications(self):
1190 return list(self.bundle['services'].keys())
1191
1192 def resolve(self, reference):
1193 if reference and reference.startswith('$'):
1194 reference = self.references[reference[1:]]
1195 return reference
1196
1197 async def addCharm(self, charm, series):
1198 """
1199 :param charm string:
1200 Charm holds the URL of the charm to be added.
1201
1202 :param series string:
1203 Series holds the series of the charm to be added
1204 if the charm default is not sufficient.
1205 """
1206 entity_id = await self.charmstore.entityId(charm)
1207 log.debug('Adding %s', entity_id)
1208 await self.client_facade.AddCharm(None, entity_id)
1209 return entity_id
1210
1211 async def addMachines(self, series, constraints, container_type,
1212 parent_id):
1213 """
1214 :param series string:
1215 Series holds the optional machine OS series.
1216
1217 :param constraints string:
1218 Constraints holds the optional machine constraints.
1219
1220 :param Container_type string:
1221 ContainerType optionally holds the type of the container (for
1222 instance ""lxc" or kvm"). It is not specified for top level
1223 machines.
1224
1225 :param parent_id string:
1226 ParentId optionally holds a placeholder pointing to another machine
1227 change or to a unit change. This value is only specified in the
1228 case this machine is a container, in which case also ContainerType
1229 is set.
1230 """
1231 params = client.AddMachineParams(
1232 series=series,
1233 constraints=constraints,
1234 container_type=container_type,
1235 parent_id=self.resolve(parent_id),
1236 )
1237 results = await self.client_facade.AddMachines(params)
1238 log.debug('Added new machine %s', results[0].machine)
1239 return results[0].machine
1240
1241 async def addRelation(self, endpoint1, endpoint2):
1242 """
1243 :param endpoint1 string:
1244 :param endpoint2 string:
1245 Endpoint1 and Endpoint2 hold relation endpoints in the
1246 "application:interface" form, where the application is always a
1247 placeholder pointing to an application change, and the interface is
1248 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1249 """
1250 endpoints = [endpoint1, endpoint2]
1251 # resolve indirect references
1252 for i in range(len(endpoints)):
1253 parts = endpoints[i].split(':')
1254 parts[0] = self.resolve(parts[0])
1255 endpoints[i] = ':'.join(parts)
1256
1257 log.info('Relating %s <-> %s', *endpoints)
1258 return await self.model.add_relation(*endpoints)
1259
1260 async def deploy(self, charm, series, application, options, constraints,
1261 storage, endpoint_bindings, resources):
1262 """
1263 :param charm string:
1264 Charm holds the URL of the charm to be used to deploy this
1265 application.
1266
1267 :param series string:
1268 Series holds the series of the application to be deployed
1269 if the charm default is not sufficient.
1270
1271 :param application string:
1272 Application holds the application name.
1273
1274 :param options map[string]interface{}:
1275 Options holds application options.
1276
1277 :param constraints string:
1278 Constraints holds the optional application constraints.
1279
1280 :param storage map[string]string:
1281 Storage holds the optional storage constraints.
1282
1283 :param endpoint_bindings map[string]string:
1284 EndpointBindings holds the optional endpoint bindings
1285
1286 :param resources map[string]int:
1287 Resources identifies the revision to use for each resource
1288 of the application's charm.
1289 """
1290 # resolve indirect references
1291 charm = self.resolve(charm)
1292 # stringify all config values for API
1293 options = {k: str(v) for k, v in options.items()}
1294 # build param object
1295 app = client.ApplicationDeploy(
1296 charm_url=charm,
1297 series=series,
1298 application=application,
1299 config=options,
1300 constraints=constraints,
1301 storage=storage,
1302 endpoint_bindings=endpoint_bindings,
1303 resources=resources,
1304 )
1305 # do the do
1306 log.info('Deploying %s', charm)
1307 await self.app_facade.Deploy([app])
1308 return application
1309
1310 async def addUnit(self, application, to):
1311 """
1312 :param application string:
1313 Application holds the application placeholder name for which a unit
1314 is added.
1315
1316 :param to string:
1317 To holds the optional location where to add the unit, as a
1318 placeholder pointing to another unit change or to a machine change.
1319 """
1320 application = self.resolve(application)
1321 placement = self.resolve(to)
1322 if self._units_by_app.get(application):
1323 # enough units for this application already exist;
1324 # claim one, and carry on
1325 # NB: this should probably honor placement, but the juju client
1326 # doesn't, so we're not bothering, either
1327 unit_name = self._units_by_app[application].pop()
1328 log.debug('Reusing unit %s for %s', unit_name, application)
1329 return self.model.units[unit_name]
1330
1331 log.debug('Adding new unit for %s%s', application,
1332 ' to %s' % placement if placement else '')
1333 return await self.model.applications[application].add_unit(
1334 count=1,
1335 to=placement,
1336 )
1337
1338 async def expose(self, application):
1339 """
1340 :param application string:
1341 Application holds the placeholder name of the application that must
1342 be exposed.
1343 """
1344 application = self.resolve(application)
1345 log.info('Exposing %s', application)
1346 return await self.model.applications[application].expose()
1347
1348 async def setAnnotations(self, id_, entity_type, annotations):
1349 """
1350 :param id_ string:
1351 Id is the placeholder for the application or machine change
1352 corresponding to the entity to be annotated.
1353
1354 :param entity_type EntityType:
1355 EntityType holds the type of the entity, "application" or
1356 "machine".
1357
1358 :param annotations map[string]string:
1359 Annotations holds the annotations as key/value pairs.
1360 """
1361 entity_id = self.resolve(id_)
1362 try:
1363 entity = self.model.state.get_entity(entity_type, entity_id)
1364 except KeyError:
1365 entity = await self.model._wait_for_new(entity_type, entity_id)
1366 return await entity.set_annotations(annotations)
1367
1368
1369 class CharmStore(object):
1370 """
1371 Async wrapper around theblues.charmstore.CharmStore
1372 """
1373 def __init__(self, loop):
1374 self.loop = loop
1375 self._cs = charmstore.CharmStore()
1376
1377 def __getattr__(self, name):
1378 """
1379 Wrap method calls in coroutines that use run_in_executor to make them
1380 async.
1381 """
1382 attr = getattr(self._cs, name)
1383 if not callable(attr):
1384 wrapper = partial(getattr, self._cs, name)
1385 setattr(self, name, wrapper)
1386 else:
1387 async def coro(*args, **kwargs):
1388 method = partial(attr, *args, **kwargs)
1389 return await self.loop.run_in_executor(None, method)
1390 setattr(self, name, coro)
1391 wrapper = coro
1392 return wrapper