Make observers async; make model.reset() blocking
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import logging
3 from concurrent.futures import CancelledError
4
5 from .client import client
6 from .client import watcher
7 from .client import connection
8 from .delta import get_entity_delta
9
10 log = logging.getLogger(__name__)
11
12
13 class ModelObserver(object):
14 async def __call__(self, delta, old, new, model):
15 if old is None and new is not None:
16 type_ = 'add'
17 else:
18 type_ = delta.type
19 handler_name = 'on_{}_{}'.format(delta.entity, type_)
20 method = getattr(self, handler_name, self.on_change)
21 log.debug(
22 'Model changed: %s %s %s',
23 delta.entity, delta.type, delta.get_id())
24 await method(delta, old, new, model)
25
26 async def on_change(self, delta, old, new, model):
27 pass
28
29
30 class ModelEntity(object):
31 """An object in the Model tree"""
32
33 def __init__(self, data, model):
34 """Initialize a new entity
35
36 :param data: dict of data from a watcher delta
37 :param model: The model instance in whose object tree this
38 entity resides
39
40 """
41 self.data = data
42 self.model = model
43 self.connection = model.connection
44
45 def __getattr__(self, name):
46 return self.data[name]
47
48
49 class Model(object):
50 def __init__(self, loop=None):
51 """Instantiate a new connected Model.
52
53 :param loop: an asyncio event loop
54
55 """
56 self.loop = loop or asyncio.get_event_loop()
57 self.connection = None
58 self.observers = set()
59 self.state = dict()
60 self._watcher_task = None
61 self._watch_shutdown = asyncio.Event(loop=loop)
62 self._watch_received = asyncio.Event(loop=loop)
63
64 async def connect_current(self):
65 """Connect to the current Juju model.
66
67 """
68 self.connection = await connection.Connection.connect_current()
69 self._watch()
70 await self._watch_received.wait()
71
72 async def disconnect(self):
73 """Shut down the watcher task and close websockets.
74
75 """
76 self._stop_watching()
77 if self.connection and self.connection.is_open:
78 await self._watch_shutdown.wait()
79 log.debug('Closing model connection')
80 await self.connection.close()
81 self.connection = None
82
83 def all_units_idle(self):
84 """Return True if all units are idle.
85
86 """
87 for unit in self.units.values():
88 unit_status = unit.data['agent-status']['current']
89 if unit_status != 'idle':
90 return False
91 return True
92
93 async def reset(self, force=False):
94 """Reset the model to a clean state.
95
96 :param bool force: Force-terminate machines.
97
98 This returns only after the model has reached a clean state. "Clean"
99 means no applications or machines exist in the model.
100
101 """
102 for app in self.applications.values():
103 await app.destroy()
104 for machine in self.machines.values():
105 await machine.destroy(force=force)
106 await self.block_until(
107 lambda: len(self.machines) == 0
108 )
109
110 async def block_until(self, *conditions, timeout=None):
111 """Return only after all conditions are true.
112
113 """
114 async def _block():
115 while not all(c() for c in conditions):
116 await asyncio.sleep(.1)
117 await asyncio.wait_for(_block(), timeout)
118
119 @property
120 def applications(self):
121 """Return a map of application-name:Application for all applications
122 currently in the model.
123
124 """
125 return self.state.get('application', {})
126
127 @property
128 def machines(self):
129 """Return a map of machine-id:Machine for all machines currently in
130 the model.
131
132 """
133 return self.state.get('machine', {})
134
135 @property
136 def units(self):
137 """Return a map of unit-id:Unit for all units currently in
138 the model.
139
140 """
141 return self.state.get('unit', {})
142
143 def add_observer(self, callable_):
144 """Register an "on-model-change" callback
145
146 Once a watch is started (Model.watch() is called), ``callable_``
147 will be called each time the model changes. callable_ should
148 be Awaitable and accept the following positional arguments:
149
150 delta - An instance of :class:`juju.delta.EntityDelta`
151 containing the raw delta data recv'd from the Juju
152 websocket.
153
154 old_obj - If the delta modifies an existing object in the model,
155 old_obj will be a copy of that object, as it was before the
156 delta was applied. Will be None if the delta creates a new
157 entity in the model.
158
159 new_obj - A copy of the new or updated object, after the delta
160 is applied. Will be None if the delta removes an entity
161 from the model.
162
163 model - The :class:`Model` itself.
164
165 """
166 self.observers.add(callable_)
167
168 def _watch(self):
169 """Start an asynchronous watch against this model.
170
171 See :meth:`add_observer` to register an onchange callback.
172
173 """
174 async def _start_watch():
175 self._watch_shutdown.clear()
176 try:
177 allwatcher = watcher.AllWatcher()
178 self._watch_conn = await self.connection.clone()
179 allwatcher.connect(self._watch_conn)
180 while True:
181 results = await allwatcher.Next()
182 for delta in results.deltas:
183 delta = get_entity_delta(delta)
184 old_obj, new_obj = self._apply_delta(delta)
185 # XXX: Might not want to shield at this level
186 # We are shielding because when the watcher is
187 # canceled (on disconnect()), we don't want all of
188 # its children (every observer callback) to be
189 # canceled with it. So we shield them. But this means
190 # they can *never* be canceled.
191 await asyncio.shield(
192 self._notify_observers(delta, old_obj, new_obj))
193 self._watch_received.set()
194 except CancelledError:
195 log.debug('Closing watcher connection')
196 await self._watch_conn.close()
197 self._watch_shutdown.set()
198 self._watch_conn = None
199
200 log.debug('Starting watcher task')
201 self._watcher_task = self.loop.create_task(_start_watch())
202
203 def _stop_watching(self):
204 """Stop the asynchronous watch against this model.
205
206 """
207 log.debug('Stopping watcher task')
208 if self._watcher_task:
209 self._watcher_task.cancel()
210
211 def _apply_delta(self, delta):
212 """Apply delta to our model state and return the a copy of the
213 affected object as it was before and after the update, e.g.:
214
215 old_obj, new_obj = self._apply_delta(delta)
216
217 old_obj may be None if the delta is for the creation of a new object,
218 e.g. a new application or unit is deployed.
219
220 new_obj may be None if no object was created or updated, or if an
221 object was deleted as a result of the delta being applied.
222
223 """
224 old_obj, new_obj = None, None
225
226 if (delta.entity in self.state and
227 delta.get_id() in self.state[delta.entity]):
228 old_obj = self.state[delta.entity][delta.get_id()]
229 if delta.type == 'remove':
230 del self.state[delta.entity][delta.get_id()]
231 return old_obj, new_obj
232
233 new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = (
234 self._create_model_entity(delta))
235
236 return old_obj, new_obj
237
238 def _create_model_entity(self, delta):
239 """Return an object instance representing the entity created or
240 updated by ``delta``
241
242 """
243 entity_class = delta.get_entity_class()
244 return entity_class(delta.data, self)
245
246 async def _notify_observers(self, delta, old_obj, new_obj):
247 """Call observing callbacks, notifying them of a change in model state
248
249 :param delta: The raw change from the watcher
250 (:class:`juju.client.overrides.Delta`)
251 :param old_obj: The object in the model that this delta updates.
252 May be None.
253 :param new_obj: The object in the model that is created or updated
254 by applying this delta.
255
256 """
257 for o in self.observers:
258 asyncio.ensure_future(o(delta, old_obj, new_obj, self))
259
260 def add_machine(
261 self, spec=None, constraints=None, disks=None, series=None,
262 count=1):
263 """Start a new, empty machine and optionally a container, or add a
264 container to a machine.
265
266 :param str spec: Machine specification
267 Examples::
268
269 (None) - starts a new machine
270 'lxc' - starts a new machine with on lxc container
271 'lxc:4' - starts a new lxc container on machine 4
272 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
273 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
274 'maas2.name' - acquire machine maas2.name on MAAS
275 :param constraints: Machine constraints
276 :type constraints: :class:`juju.Constraints`
277 :param list disks: List of disk :class:`constraints <juju.Constraints>`
278 :param str series: Series
279 :param int count: Number of machines to deploy
280
281 Supported container types are: lxc, lxd, kvm
282
283 When deploying a container to an existing machine, constraints cannot
284 be used.
285
286 """
287 pass
288 add_machines = add_machine
289
290 async def add_relation(self, relation1, relation2):
291 """Add a relation between two applications.
292
293 :param str relation1: '<application>[:<relation_name>]'
294 :param str relation2: '<application>[:<relation_name>]'
295
296 """
297 app_facade = client.ApplicationFacade()
298 app_facade.connect(self.connection)
299
300 log.debug(
301 'Adding relation %s <-> %s', relation1, relation2)
302
303 return await app_facade.AddRelation([relation1, relation2])
304
305 def add_space(self, name, *cidrs):
306 """Add a new network space.
307
308 Adds a new space with the given name and associates the given
309 (optional) list of existing subnet CIDRs with it.
310
311 :param str name: Name of the space
312 :param \*cidrs: Optional list of existing subnet CIDRs
313
314 """
315 pass
316
317 def add_ssh_key(self, key):
318 """Add a public SSH key to this model.
319
320 :param str key: The public ssh key
321
322 """
323 pass
324 add_ssh_keys = add_ssh_key
325
326 def add_subnet(self, cidr_or_id, space, *zones):
327 """Add an existing subnet to this model.
328
329 :param str cidr_or_id: CIDR or provider ID of the existing subnet
330 :param str space: Network space with which to associate
331 :param str \*zones: Zone(s) in which the subnet resides
332
333 """
334 pass
335
336 def get_backups(self):
337 """Retrieve metadata for backups in this model.
338
339 """
340 pass
341
342 def block(self, *commands):
343 """Add a new block to this model.
344
345 :param str \*commands: The commands to block. Valid values are
346 'all-changes', 'destroy-model', 'remove-object'
347
348 """
349 pass
350
351 def get_blocks(self):
352 """List blocks for this model.
353
354 """
355 pass
356
357 def get_cached_images(self, arch=None, kind=None, series=None):
358 """Return a list of cached OS images.
359
360 :param str arch: Filter by image architecture
361 :param str kind: Filter by image kind, e.g. 'lxd'
362 :param str series: Filter by image series, e.g. 'xenial'
363
364 """
365 pass
366
367 def create_backup(self, note=None, no_download=False):
368 """Create a backup of this model.
369
370 :param str note: A note to store with the backup
371 :param bool no_download: Do not download the backup archive
372 :return str: Path to downloaded archive
373
374 """
375 pass
376
377 def create_storage_pool(self, name, provider_type, **pool_config):
378 """Create or define a storage pool.
379
380 :param str name: Name to give the storage pool
381 :param str provider_type: Pool provider type
382 :param \*\*pool_config: key/value pool configuration pairs
383
384 """
385 pass
386
387 def debug_log(
388 self, no_tail=False, exclude_module=None, include_module=None,
389 include=None, level=None, limit=0, lines=10, replay=False,
390 exclude=None):
391 """Get log messages for this model.
392
393 :param bool no_tail: Stop after returning existing log messages
394 :param list exclude_module: Do not show log messages for these logging
395 modules
396 :param list include_module: Only show log messages for these logging
397 modules
398 :param list include: Only show log messages for these entities
399 :param str level: Log level to show, valid options are 'TRACE',
400 'DEBUG', 'INFO', 'WARNING', 'ERROR,
401 :param int limit: Return this many of the most recent (possibly
402 filtered) lines are shown
403 :param int lines: Yield this many of the most recent lines, and keep
404 yielding
405 :param bool replay: Yield the entire log, and keep yielding
406 :param list exclude: Do not show log messages for these entities
407
408 """
409 pass
410
411 async def deploy(
412 self, entity_url, service_name=None, bind=None, budget=None,
413 channel=None, config=None, constraints=None, force=False,
414 num_units=1, plan=None, resources=None, series=None, storage=None,
415 to=None):
416 """Deploy a new service or bundle.
417
418 :param str entity_url: Charm or bundle url
419 :param str service_name: Name to give the service
420 :param dict bind: <charm endpoint>:<network space> pairs
421 :param dict budget: <budget name>:<limit> pairs
422 :param str channel: Charm store channel from which to retrieve
423 the charm or bundle, e.g. 'development'
424 :param dict config: Charm configuration dictionary
425 :param constraints: Service constraints
426 :type constraints: :class:`juju.Constraints`
427 :param bool force: Allow charm to be deployed to a machine running
428 an unsupported series
429 :param int num_units: Number of units to deploy
430 :param str plan: Plan under which to deploy charm
431 :param dict resources: <resource name>:<file path> pairs
432 :param str series: Series on which to deploy
433 :param dict storage: Storage constraints TODO how do these look?
434 :param str to: Placement directive, e.g.::
435
436 '23' - machine 23
437 'lxc:7' - new lxc container on machine 7
438 '24/lxc/3' - lxc container 3 or machine 24
439
440 If None, a new machine is provisioned.
441
442
443 TODO::
444
445 - entity_url must have a revision; look up latest automatically if
446 not provided by caller
447 - service_name is required; fill this in automatically if not
448 provided by caller
449 - series is required; how do we pick a default?
450
451 """
452 if constraints:
453 constraints = client.Value(**constraints)
454
455 if to:
456 placement = [
457 client.Placement(**p) for p in to
458 ]
459 else:
460 placement = []
461
462 if storage:
463 storage = {
464 k: client.Constraints(**v)
465 for k, v in storage.items()
466 }
467
468 app_facade = client.ApplicationFacade()
469 client_facade = client.ClientFacade()
470 app_facade.connect(self.connection)
471 client_facade.connect(self.connection)
472
473 log.debug(
474 'Deploying %s', entity_url)
475
476 await client_facade.AddCharm(channel, entity_url)
477 app = client.ApplicationDeploy(
478 application=service_name,
479 channel=channel,
480 charm_url=entity_url,
481 config=config,
482 constraints=constraints,
483 endpoint_bindings=bind,
484 num_units=num_units,
485 placement=placement,
486 resources=resources,
487 series=series,
488 storage=storage,
489 )
490
491 return await app_facade.Deploy([app])
492
493 def destroy(self):
494 """Terminate all machines and resources for this model.
495
496 """
497 pass
498
499 def get_backup(self, archive_id):
500 """Download a backup archive file.
501
502 :param str archive_id: The id of the archive to download
503 :return str: Path to the archive file
504
505 """
506 pass
507
508 def enable_ha(
509 self, num_controllers=0, constraints=None, series=None, to=None):
510 """Ensure sufficient controllers exist to provide redundancy.
511
512 :param int num_controllers: Number of controllers to make available
513 :param constraints: Constraints to apply to the controller machines
514 :type constraints: :class:`juju.Constraints`
515 :param str series: Series of the controller machines
516 :param list to: Placement directives for controller machines, e.g.::
517
518 '23' - machine 23
519 'lxc:7' - new lxc container on machine 7
520 '24/lxc/3' - lxc container 3 or machine 24
521
522 If None, a new machine is provisioned.
523
524 """
525 pass
526
527 def get_config(self):
528 """Return the configuration settings for this model.
529
530 """
531 pass
532
533 def get_constraints(self):
534 """Return the machine constraints for this model.
535
536 """
537 pass
538
539 def grant(self, username, acl='read'):
540 """Grant a user access to this model.
541
542 :param str username: Username
543 :param str acl: Access control ('read' or 'write')
544
545 """
546 pass
547
548 def import_ssh_key(self, identity):
549 """Add a public SSH key from a trusted indentity source to this model.
550
551 :param str identity: User identity in the form <lp|gh>:<username>
552
553 """
554 pass
555 import_ssh_keys = import_ssh_key
556
557 def get_machines(self, machine, utc=False):
558 """Return list of machines in this model.
559
560 :param str machine: Machine id, e.g. '0'
561 :param bool utc: Display time as UTC in RFC3339 format
562
563 """
564 pass
565
566 def get_shares(self):
567 """Return list of all users with access to this model.
568
569 """
570 pass
571
572 def get_spaces(self):
573 """Return list of all known spaces, including associated subnets.
574
575 """
576 pass
577
578 def get_ssh_key(self):
579 """Return known SSH keys for this model.
580
581 """
582 pass
583 get_ssh_keys = get_ssh_key
584
585 def get_storage(self, filesystem=False, volume=False):
586 """Return details of storage instances.
587
588 :param bool filesystem: Include filesystem storage
589 :param bool volume: Include volume storage
590
591 """
592 pass
593
594 def get_storage_pools(self, names=None, providers=None):
595 """Return list of storage pools.
596
597 :param list names: Only include pools with these names
598 :param list providers: Only include pools for these providers
599
600 """
601 pass
602
603 def get_subnets(self, space=None, zone=None):
604 """Return list of known subnets.
605
606 :param str space: Only include subnets in this space
607 :param str zone: Only include subnets in this zone
608
609 """
610 pass
611
612 def remove_blocks(self):
613 """Remove all blocks from this model.
614
615 """
616 pass
617
618 def remove_backup(self, backup_id):
619 """Delete a backup.
620
621 :param str backup_id: The id of the backup to remove
622
623 """
624 pass
625
626 def remove_cached_images(self, arch=None, kind=None, series=None):
627 """Remove cached OS images.
628
629 :param str arch: Architecture of the images to remove
630 :param str kind: Image kind to remove, e.g. 'lxd'
631 :param str series: Image series to remove, e.g. 'xenial'
632
633 """
634 pass
635
636 def remove_machine(self, *machine_ids):
637 """Remove a machine from this model.
638
639 :param str \*machine_ids: Ids of the machines to remove
640
641 """
642 pass
643 remove_machines = remove_machine
644
645 def remove_ssh_key(self, *keys):
646 """Remove a public SSH key(s) from this model.
647
648 :param str \*keys: Keys to remove
649
650 """
651 pass
652 remove_ssh_keys = remove_ssh_key
653
654 def restore_backup(
655 self, bootstrap=False, constraints=None, archive=None,
656 backup_id=None, upload_tools=False):
657 """Restore a backup archive to a new controller.
658
659 :param bool bootstrap: Bootstrap a new state machine
660 :param constraints: Model constraints
661 :type constraints: :class:`juju.Constraints`
662 :param str archive: Path to backup archive to restore
663 :param str backup_id: Id of backup to restore
664 :param bool upload_tools: Upload tools if bootstrapping a new machine
665
666 """
667 pass
668
669 def retry_provisioning(self):
670 """Retry provisioning for failed machines.
671
672 """
673 pass
674
675 def revoke(self, username, acl='read'):
676 """Revoke a user's access to this model.
677
678 :param str username: Username to revoke
679 :param str acl: Access control ('read' or 'write')
680
681 """
682 pass
683
684 def run(self, command, timeout=None):
685 """Run command on all machines in this model.
686
687 :param str command: The command to run
688 :param int timeout: Time to wait before command is considered failed
689
690 """
691 pass
692
693 def set_config(self, **config):
694 """Set configuration keys on this model.
695
696 :param \*\*config: Config key/values
697
698 """
699 pass
700
701 def set_constraints(self, constraints):
702 """Set machine constraints on this model.
703
704 :param :class:`juju.Constraints` constraints: Machine constraints
705
706 """
707 pass
708
709 def get_action_output(self, action_uuid, wait=-1):
710 """Get the results of an action by ID.
711
712 :param str action_uuid: Id of the action
713 :param int wait: Time in seconds to wait for action to complete
714
715 """
716 pass
717
718 def get_action_status(self, uuid_or_prefix=None, name=None):
719 """Get the status of all actions, filtered by ID, ID prefix, or action name.
720
721 :param str uuid_or_prefix: Filter by action uuid or prefix
722 :param str name: Filter by action name
723
724 """
725 pass
726
727 def get_budget(self, budget_name):
728 """Get budget usage info.
729
730 :param str budget_name: Name of budget
731
732 """
733 pass
734
735 def get_status(self, filter_=None, utc=False):
736 """Return the status of the model.
737
738 :param str filter_: Service or unit name or wildcard ('*')
739 :param bool utc: Display time as UTC in RFC3339 format
740
741 """
742 pass
743 status = get_status
744
745 def sync_tools(
746 self, all_=False, destination=None, dry_run=False, public=False,
747 source=None, stream=None, version=None):
748 """Copy Juju tools into this model.
749
750 :param bool all_: Copy all versions, not just the latest
751 :param str destination: Path to local destination directory
752 :param bool dry_run: Don't do the actual copy
753 :param bool public: Tools are for a public cloud, so generate mirrors
754 information
755 :param str source: Path to local source directory
756 :param str stream: Simplestreams stream for which to sync metadata
757 :param str version: Copy a specific major.minor version
758
759 """
760 pass
761
762 def unblock(self, *commands):
763 """Unblock an operation that would alter this model.
764
765 :param str \*commands: The commands to unblock. Valid values are
766 'all-changes', 'destroy-model', 'remove-object'
767
768 """
769 pass
770
771 def unset_config(self, *keys):
772 """Unset configuration on this model.
773
774 :param str \*keys: The keys to unset
775
776 """
777 pass
778
779 def upgrade_gui(self):
780 """Upgrade the Juju GUI for this model.
781
782 """
783 pass
784
785 def upgrade_juju(
786 self, dry_run=False, reset_previous_upgrade=False,
787 upload_tools=False, version=None):
788 """Upgrade Juju on all machines in a model.
789
790 :param bool dry_run: Don't do the actual upgrade
791 :param bool reset_previous_upgrade: Clear the previous (incomplete)
792 upgrade status
793 :param bool upload_tools: Upload local version of tools
794 :param str version: Upgrade to a specific version
795
796 """
797 pass
798
799 def upload_backup(self, archive_path):
800 """Store a backup archive remotely in Juju.
801
802 :param str archive_path: Path to local archive
803
804 """
805 pass