4 from concurrent
.futures
import CancelledError
6 from .client
import client
7 from .client
import watcher
8 from .client
import connection
9 from .delta
import get_entity_delta
10 from .delta
import get_entity_class
11 from .exceptions
import DeadEntityException
13 log
= logging
.getLogger(__name__
)
16 class ModelObserver(object):
17 async def __call__(self
, delta
, old
, new
, model
):
18 if old
is None and new
is not None:
22 handler_name
= 'on_{}_{}'.format(delta
.entity
, type_
)
23 method
= getattr(self
, handler_name
, self
.on_change
)
24 await method(delta
, old
, new
, model
)
26 async def on_change(self
, delta
, old
, new
, model
):
30 class ModelState(object):
31 """Holds the state of the model, including the delta history of all
32 entities in the model.
35 def __init__(self
, model
):
39 def _live_entity_map(self
, entity_type
):
40 """Return an id:Entity map of all the living entities of
45 entity_id
: self
.get_entity(entity_type
, entity_id
)
46 for entity_id
, history
in self
.state
.get(entity_type
, {}).items()
47 if history
[-1] is not None
51 def applications(self
):
52 """Return a map of application-name:Application for all applications
53 currently in the model.
56 return self
._live
_entity
_map
('application')
60 """Return a map of machine-id:Machine for all machines currently in
64 return self
._live
_entity
_map
('machine')
68 """Return a map of unit-id:Unit for all units currently in
72 return self
._live
_entity
_map
('unit')
74 def entity_history(self
, entity_type
, entity_id
):
75 """Return the history deque for an entity.
78 return self
.state
[entity_type
][entity_id
]
80 def entity_data(self
, entity_type
, entity_id
, history_index
):
81 """Return the data dict for an entity at a specific index of its
85 return self
.entity_history(entity_type
, entity_id
)[history_index
]
87 def apply_delta(self
, delta
):
88 """Apply delta to our state and return a copy of the
89 affected object as it was before and after the update, e.g.:
91 old_obj, new_obj = self.apply_delta(delta)
93 old_obj may be None if the delta is for the creation of a new object,
94 e.g. a new application or unit is deployed.
96 new_obj will never be None, but may be dead (new_obj.dead == True)
97 if the object was deleted as a result of the delta being applied.
102 .setdefault(delta
.entity
, {})
103 .setdefault(delta
.get_id(), collections
.deque())
106 history
.append(delta
.data
)
107 if delta
.type == 'remove':
110 entity
= self
.get_entity(delta
.entity
, delta
.get_id())
111 return entity
.previous(), entity
114 self
, entity_type
, entity_id
, history_index
=-1, connected
=True):
115 """Return an object instance representing the entity created or
119 if history_index
< 0 and history_index
!= -1:
120 history_index
+= len(self
.entity_history(entity_type
, entity_id
))
123 self
.entity_data(entity_type
, entity_id
, history_index
)
127 entity_class
= get_entity_class(entity_type
)
129 entity_id
, self
.model
, history_index
=history_index
,
133 class ModelEntity(object):
134 """An object in the Model tree"""
136 def __init__(self
, entity_id
, model
, history_index
=-1, connected
=True):
137 """Initialize a new entity
139 :param entity_id str: The unique id of the object in the model
140 :param model: The model instance in whose object tree this
142 :history_index int: The index of this object's state in the model's
143 history deque for this entity
144 :connected bool: Flag indicating whether this object gets live updates
148 self
.entity_id
= entity_id
150 self
._history
_index
= history_index
151 self
.connected
= connected
152 self
.connection
= model
.connection
154 def __getattr__(self
, name
):
155 """Fetch object attributes from the underlying data dict held in the
159 if self
.data
is None:
160 raise DeadEntityException(
161 "Entity {}:{} is dead - its attributes can no longer be "
162 "accessed. Use the .previous() method on this object to get "
163 "a copy of the object at its previous state.".format(
164 self
.entity_type
, self
.entity_id
))
165 return self
.data
[name
]
168 def entity_type(self
):
169 """A string identifying the entity type of this object, e.g.
170 'application' or 'unit', etc.
173 return self
.__class
__.__name
__.lower()
177 """Return True if this object represents the current state of the
178 entity in the underlying model.
180 This will be True except when the object represents an entity at a
181 prior state in history, e.g. if the object was obtained by calling
182 .previous() on another object.
185 return self
._history
_index
== -1
189 """Returns True if this entity no longer exists in the underlying
195 self
.model
.state
.entity_data(
196 self
.entity_type
, self
.entity_id
, -1) is None
201 """Returns True if this entity still exists in the underlying
209 """The data dictionary for this entity.
212 return self
.model
.state
.entity_data(
213 self
.entity_type
, self
.entity_id
, self
._history
_index
)
216 """Return a copy of this object as was at its previous state in
219 Returns None if this object is new (and therefore has no history).
221 The returned object is always "disconnected", i.e. does not receive
225 return self
.model
.state
.get_entity(
226 self
.entity_type
, self
.entity_id
, self
._history
_index
- 1,
230 """Return a copy of this object at its next state in
233 Returns None if this object is already the latest.
235 The returned object is "disconnected", i.e. does not receive
236 live updates, unless it is current (latest).
239 if self
._history
_index
== -1:
242 new_index
= self
._history
_index
+ 1
244 new_index
== len(self
.model
.state
.entity_history(
245 self
.entity_type
, self
.entity_id
)) - 1
247 return self
.model
.state
.get_entity(
248 self
.entity_type
, self
.entity_id
, self
._history
_index
- 1,
252 """Return a copy of this object at its current state in the model.
254 Returns self if this object is already the latest.
256 The returned object is always "connected", i.e. receives
257 live updates from the model.
260 if self
._history
_index
== -1:
263 return self
.model
.state
.get_entity(self
.entity_type
, self
.entity_id
)
267 def __init__(self
, loop
=None):
268 """Instantiate a new connected Model.
270 :param loop: an asyncio event loop
273 self
.loop
= loop
or asyncio
.get_event_loop()
274 self
.connection
= None
275 self
.observers
= set()
276 self
.state
= ModelState(self
)
277 self
._watcher
_task
= None
278 self
._watch
_shutdown
= asyncio
.Event(loop
=loop
)
279 self
._watch
_received
= asyncio
.Event(loop
=loop
)
281 async def connect_current(self
):
282 """Connect to the current Juju model.
285 self
.connection
= await connection
.Connection
.connect_current()
287 await self
._watch
_received
.wait()
289 async def disconnect(self
):
290 """Shut down the watcher task and close websockets.
293 self
._stop
_watching
()
294 if self
.connection
and self
.connection
.is_open
:
295 await self
._watch
_shutdown
.wait()
296 log
.debug('Closing model connection')
297 await self
.connection
.close()
298 self
.connection
= None
300 def all_units_idle(self
):
301 """Return True if all units are idle.
304 for unit
in self
.units
.values():
305 unit_status
= unit
.data
['agent-status']['current']
306 if unit_status
!= 'idle':
310 async def reset(self
, force
=False):
311 """Reset the model to a clean state.
313 :param bool force: Force-terminate machines.
315 This returns only after the model has reached a clean state. "Clean"
316 means no applications or machines exist in the model.
319 log
.debug('Resetting model')
320 for app
in self
.applications
.values():
322 for machine
in self
.machines
.values():
323 await machine
.destroy(force
=force
)
324 await self
.block_until(
325 lambda: len(self
.machines
) == 0
328 async def block_until(self
, *conditions
, timeout
=None):
329 """Return only after all conditions are true.
333 while not all(c() for c
in conditions
):
334 await asyncio
.sleep(.1)
335 await asyncio
.wait_for(_block(), timeout
)
338 def applications(self
):
339 """Return a map of application-name:Application for all applications
340 currently in the model.
343 return self
.state
.applications
347 """Return a map of machine-id:Machine for all machines currently in
351 return self
.state
.machines
355 """Return a map of unit-id:Unit for all units currently in
359 return self
.state
.units
361 def add_observer(self
, callable_
):
362 """Register an "on-model-change" callback
364 Once a watch is started (Model.watch() is called), ``callable_``
365 will be called each time the model changes. callable_ should
366 be Awaitable and accept the following positional arguments:
368 delta - An instance of :class:`juju.delta.EntityDelta`
369 containing the raw delta data recv'd from the Juju
372 old_obj - If the delta modifies an existing object in the model,
373 old_obj will be a copy of that object, as it was before the
374 delta was applied. Will be None if the delta creates a new
377 new_obj - A copy of the new or updated object, after the delta
378 is applied. Will be None if the delta removes an entity
381 model - The :class:`Model` itself.
384 self
.observers
.add(callable_
)
387 """Start an asynchronous watch against this model.
389 See :meth:`add_observer` to register an onchange callback.
392 async def _start_watch():
393 self
._watch
_shutdown
.clear()
395 allwatcher
= watcher
.AllWatcher()
396 self
._watch
_conn
= await self
.connection
.clone()
397 allwatcher
.connect(self
._watch
_conn
)
399 results
= await allwatcher
.Next()
400 for delta
in results
.deltas
:
401 delta
= get_entity_delta(delta
)
402 old_obj
, new_obj
= self
.state
.apply_delta(delta
)
403 # XXX: Might not want to shield at this level
404 # We are shielding because when the watcher is
405 # canceled (on disconnect()), we don't want all of
406 # its children (every observer callback) to be
407 # canceled with it. So we shield them. But this means
408 # they can *never* be canceled.
409 await asyncio
.shield(
410 self
._notify
_observers
(delta
, old_obj
, new_obj
))
411 self
._watch
_received
.set()
412 except CancelledError
:
413 log
.debug('Closing watcher connection')
414 await self
._watch
_conn
.close()
415 self
._watch
_shutdown
.set()
416 self
._watch
_conn
= None
418 log
.debug('Starting watcher task')
419 self
._watcher
_task
= self
.loop
.create_task(_start_watch())
421 def _stop_watching(self
):
422 """Stop the asynchronous watch against this model.
425 log
.debug('Stopping watcher task')
426 if self
._watcher
_task
:
427 self
._watcher
_task
.cancel()
429 async def _notify_observers(self
, delta
, old_obj
, new_obj
):
430 """Call observing callbacks, notifying them of a change in model state
432 :param delta: The raw change from the watcher
433 (:class:`juju.client.overrides.Delta`)
434 :param old_obj: The object in the model that this delta updates.
436 :param new_obj: The object in the model that is created or updated
437 by applying this delta.
441 'Model changed: %s %s %s',
442 delta
.entity
, delta
.type, delta
.get_id())
443 for o
in self
.observers
:
444 asyncio
.ensure_future(o(delta
, old_obj
, new_obj
, self
))
447 self
, spec
=None, constraints
=None, disks
=None, series
=None,
449 """Start a new, empty machine and optionally a container, or add a
450 container to a machine.
452 :param str spec: Machine specification
455 (None) - starts a new machine
456 'lxc' - starts a new machine with on lxc container
457 'lxc:4' - starts a new lxc container on machine 4
458 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
459 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
460 'maas2.name' - acquire machine maas2.name on MAAS
461 :param constraints: Machine constraints
462 :type constraints: :class:`juju.Constraints`
463 :param list disks: List of disk :class:`constraints <juju.Constraints>`
464 :param str series: Series
465 :param int count: Number of machines to deploy
467 Supported container types are: lxc, lxd, kvm
469 When deploying a container to an existing machine, constraints cannot
474 add_machines
= add_machine
476 async def add_relation(self
, relation1
, relation2
):
477 """Add a relation between two applications.
479 :param str relation1: '<application>[:<relation_name>]'
480 :param str relation2: '<application>[:<relation_name>]'
483 app_facade
= client
.ApplicationFacade()
484 app_facade
.connect(self
.connection
)
487 'Adding relation %s <-> %s', relation1
, relation2
)
489 return await app_facade
.AddRelation([relation1
, relation2
])
491 def add_space(self
, name
, *cidrs
):
492 """Add a new network space.
494 Adds a new space with the given name and associates the given
495 (optional) list of existing subnet CIDRs with it.
497 :param str name: Name of the space
498 :param \*cidrs: Optional list of existing subnet CIDRs
503 def add_ssh_key(self
, key
):
504 """Add a public SSH key to this model.
506 :param str key: The public ssh key
510 add_ssh_keys
= add_ssh_key
512 def add_subnet(self
, cidr_or_id
, space
, *zones
):
513 """Add an existing subnet to this model.
515 :param str cidr_or_id: CIDR or provider ID of the existing subnet
516 :param str space: Network space with which to associate
517 :param str \*zones: Zone(s) in which the subnet resides
522 def get_backups(self
):
523 """Retrieve metadata for backups in this model.
528 def block(self
, *commands
):
529 """Add a new block to this model.
531 :param str \*commands: The commands to block. Valid values are
532 'all-changes', 'destroy-model', 'remove-object'
537 def get_blocks(self
):
538 """List blocks for this model.
543 def get_cached_images(self
, arch
=None, kind
=None, series
=None):
544 """Return a list of cached OS images.
546 :param str arch: Filter by image architecture
547 :param str kind: Filter by image kind, e.g. 'lxd'
548 :param str series: Filter by image series, e.g. 'xenial'
553 def create_backup(self
, note
=None, no_download
=False):
554 """Create a backup of this model.
556 :param str note: A note to store with the backup
557 :param bool no_download: Do not download the backup archive
558 :return str: Path to downloaded archive
563 def create_storage_pool(self
, name
, provider_type
, **pool_config
):
564 """Create or define a storage pool.
566 :param str name: Name to give the storage pool
567 :param str provider_type: Pool provider type
568 :param \*\*pool_config: key/value pool configuration pairs
574 self
, no_tail
=False, exclude_module
=None, include_module
=None,
575 include
=None, level
=None, limit
=0, lines
=10, replay
=False,
577 """Get log messages for this model.
579 :param bool no_tail: Stop after returning existing log messages
580 :param list exclude_module: Do not show log messages for these logging
582 :param list include_module: Only show log messages for these logging
584 :param list include: Only show log messages for these entities
585 :param str level: Log level to show, valid options are 'TRACE',
586 'DEBUG', 'INFO', 'WARNING', 'ERROR,
587 :param int limit: Return this many of the most recent (possibly
588 filtered) lines are shown
589 :param int lines: Yield this many of the most recent lines, and keep
591 :param bool replay: Yield the entire log, and keep yielding
592 :param list exclude: Do not show log messages for these entities
598 self
, entity_url
, service_name
=None, bind
=None, budget
=None,
599 channel
=None, config
=None, constraints
=None, force
=False,
600 num_units
=1, plan
=None, resources
=None, series
=None, storage
=None,
602 """Deploy a new service or bundle.
604 :param str entity_url: Charm or bundle url
605 :param str service_name: Name to give the service
606 :param dict bind: <charm endpoint>:<network space> pairs
607 :param dict budget: <budget name>:<limit> pairs
608 :param str channel: Charm store channel from which to retrieve
609 the charm or bundle, e.g. 'development'
610 :param dict config: Charm configuration dictionary
611 :param constraints: Service constraints
612 :type constraints: :class:`juju.Constraints`
613 :param bool force: Allow charm to be deployed to a machine running
614 an unsupported series
615 :param int num_units: Number of units to deploy
616 :param str plan: Plan under which to deploy charm
617 :param dict resources: <resource name>:<file path> pairs
618 :param str series: Series on which to deploy
619 :param dict storage: Storage constraints TODO how do these look?
620 :param str to: Placement directive, e.g.::
623 'lxc:7' - new lxc container on machine 7
624 '24/lxc/3' - lxc container 3 or machine 24
626 If None, a new machine is provisioned.
631 - entity_url must have a revision; look up latest automatically if
632 not provided by caller
633 - service_name is required; fill this in automatically if not
635 - series is required; how do we pick a default?
639 constraints
= client
.Value(**constraints
)
643 client
.Placement(**p
) for p
in to
650 k
: client
.Constraints(**v
)
651 for k
, v
in storage
.items()
654 app_facade
= client
.ApplicationFacade()
655 client_facade
= client
.ClientFacade()
656 app_facade
.connect(self
.connection
)
657 client_facade
.connect(self
.connection
)
660 'Deploying %s', entity_url
)
662 await client_facade
.AddCharm(channel
, entity_url
)
663 app
= client
.ApplicationDeploy(
664 application
=service_name
,
666 charm_url
=entity_url
,
668 constraints
=constraints
,
669 endpoint_bindings
=bind
,
677 return await app_facade
.Deploy([app
])
680 """Terminate all machines and resources for this model.
685 def get_backup(self
, archive_id
):
686 """Download a backup archive file.
688 :param str archive_id: The id of the archive to download
689 :return str: Path to the archive file
695 self
, num_controllers
=0, constraints
=None, series
=None, to
=None):
696 """Ensure sufficient controllers exist to provide redundancy.
698 :param int num_controllers: Number of controllers to make available
699 :param constraints: Constraints to apply to the controller machines
700 :type constraints: :class:`juju.Constraints`
701 :param str series: Series of the controller machines
702 :param list to: Placement directives for controller machines, e.g.::
705 'lxc:7' - new lxc container on machine 7
706 '24/lxc/3' - lxc container 3 or machine 24
708 If None, a new machine is provisioned.
713 def get_config(self
):
714 """Return the configuration settings for this model.
719 def get_constraints(self
):
720 """Return the machine constraints for this model.
725 def grant(self
, username
, acl
='read'):
726 """Grant a user access to this model.
728 :param str username: Username
729 :param str acl: Access control ('read' or 'write')
734 def import_ssh_key(self
, identity
):
735 """Add a public SSH key from a trusted indentity source to this model.
737 :param str identity: User identity in the form <lp|gh>:<username>
741 import_ssh_keys
= import_ssh_key
743 def get_machines(self
, machine
, utc
=False):
744 """Return list of machines in this model.
746 :param str machine: Machine id, e.g. '0'
747 :param bool utc: Display time as UTC in RFC3339 format
752 def get_shares(self
):
753 """Return list of all users with access to this model.
758 def get_spaces(self
):
759 """Return list of all known spaces, including associated subnets.
764 def get_ssh_key(self
):
765 """Return known SSH keys for this model.
769 get_ssh_keys
= get_ssh_key
771 def get_storage(self
, filesystem
=False, volume
=False):
772 """Return details of storage instances.
774 :param bool filesystem: Include filesystem storage
775 :param bool volume: Include volume storage
780 def get_storage_pools(self
, names
=None, providers
=None):
781 """Return list of storage pools.
783 :param list names: Only include pools with these names
784 :param list providers: Only include pools for these providers
789 def get_subnets(self
, space
=None, zone
=None):
790 """Return list of known subnets.
792 :param str space: Only include subnets in this space
793 :param str zone: Only include subnets in this zone
798 def remove_blocks(self
):
799 """Remove all blocks from this model.
804 def remove_backup(self
, backup_id
):
807 :param str backup_id: The id of the backup to remove
812 def remove_cached_images(self
, arch
=None, kind
=None, series
=None):
813 """Remove cached OS images.
815 :param str arch: Architecture of the images to remove
816 :param str kind: Image kind to remove, e.g. 'lxd'
817 :param str series: Image series to remove, e.g. 'xenial'
822 def remove_machine(self
, *machine_ids
):
823 """Remove a machine from this model.
825 :param str \*machine_ids: Ids of the machines to remove
829 remove_machines
= remove_machine
831 def remove_ssh_key(self
, *keys
):
832 """Remove a public SSH key(s) from this model.
834 :param str \*keys: Keys to remove
838 remove_ssh_keys
= remove_ssh_key
841 self
, bootstrap
=False, constraints
=None, archive
=None,
842 backup_id
=None, upload_tools
=False):
843 """Restore a backup archive to a new controller.
845 :param bool bootstrap: Bootstrap a new state machine
846 :param constraints: Model constraints
847 :type constraints: :class:`juju.Constraints`
848 :param str archive: Path to backup archive to restore
849 :param str backup_id: Id of backup to restore
850 :param bool upload_tools: Upload tools if bootstrapping a new machine
855 def retry_provisioning(self
):
856 """Retry provisioning for failed machines.
861 def revoke(self
, username
, acl
='read'):
862 """Revoke a user's access to this model.
864 :param str username: Username to revoke
865 :param str acl: Access control ('read' or 'write')
870 def run(self
, command
, timeout
=None):
871 """Run command on all machines in this model.
873 :param str command: The command to run
874 :param int timeout: Time to wait before command is considered failed
879 def set_config(self
, **config
):
880 """Set configuration keys on this model.
882 :param \*\*config: Config key/values
887 def set_constraints(self
, constraints
):
888 """Set machine constraints on this model.
890 :param :class:`juju.Constraints` constraints: Machine constraints
895 def get_action_output(self
, action_uuid
, wait
=-1):
896 """Get the results of an action by ID.
898 :param str action_uuid: Id of the action
899 :param int wait: Time in seconds to wait for action to complete
904 def get_action_status(self
, uuid_or_prefix
=None, name
=None):
905 """Get the status of all actions, filtered by ID, ID prefix, or action name.
907 :param str uuid_or_prefix: Filter by action uuid or prefix
908 :param str name: Filter by action name
913 def get_budget(self
, budget_name
):
914 """Get budget usage info.
916 :param str budget_name: Name of budget
921 def get_status(self
, filter_
=None, utc
=False):
922 """Return the status of the model.
924 :param str filter_: Service or unit name or wildcard ('*')
925 :param bool utc: Display time as UTC in RFC3339 format
932 self
, all_
=False, destination
=None, dry_run
=False, public
=False,
933 source
=None, stream
=None, version
=None):
934 """Copy Juju tools into this model.
936 :param bool all_: Copy all versions, not just the latest
937 :param str destination: Path to local destination directory
938 :param bool dry_run: Don't do the actual copy
939 :param bool public: Tools are for a public cloud, so generate mirrors
941 :param str source: Path to local source directory
942 :param str stream: Simplestreams stream for which to sync metadata
943 :param str version: Copy a specific major.minor version
948 def unblock(self
, *commands
):
949 """Unblock an operation that would alter this model.
951 :param str \*commands: The commands to unblock. Valid values are
952 'all-changes', 'destroy-model', 'remove-object'
957 def unset_config(self
, *keys
):
958 """Unset configuration on this model.
960 :param str \*keys: The keys to unset
965 def upgrade_gui(self
):
966 """Upgrade the Juju GUI for this model.
972 self
, dry_run
=False, reset_previous_upgrade
=False,
973 upload_tools
=False, version
=None):
974 """Upgrade Juju on all machines in a model.
976 :param bool dry_run: Don't do the actual upgrade
977 :param bool reset_previous_upgrade: Clear the previous (incomplete)
979 :param bool upload_tools: Upload local version of tools
980 :param str version: Upgrade to a specific version
985 def upload_backup(self
, archive_path
):
986 """Store a backup archive remotely in Juju.
988 :param str archive_path: Path to local archive