04f343797c0455910fe2ecea118061c4c654060a
3 from concurrent
.futures
import CancelledError
5 from .client
import client
6 from .client
import watcher
7 from .client
import connection
8 from .delta
import get_entity_delta
10 log
= logging
.getLogger(__name__
)
13 class ModelObserver(object):
14 def __call__(self
, delta
, old
, new
, model
):
15 if old
is None and new
is not None:
19 handler_name
= 'on_{}_{}'.format(delta
.entity
, type_
)
20 method
= getattr(self
, handler_name
, self
.on_change
)
22 'Model changed: %s %s %s',
23 delta
.entity
, delta
.type, delta
.get_id())
24 method(delta
, old
, new
, model
)
26 def on_change(self
, delta
, old
, new
, model
):
30 class ModelEntity(object):
31 """An object in the Model tree"""
33 def __init__(self
, data
, model
):
34 """Initialize a new entity
36 :param data: dict of data from a watcher delta
37 :param model: The model instance in whose object tree this
43 self
.connection
= model
.connection
45 def __getattr__(self
, name
):
46 return self
.data
[name
]
50 def __init__(self
, loop
=None):
51 """Instantiate a new connected Model.
53 :param loop: an asyncio event loop
56 self
.loop
= loop
or asyncio
.get_event_loop()
57 self
.connection
= None
58 self
.observers
= set()
60 self
._watcher
_task
= None
61 self
._watch
_shutdown
= asyncio
.Event(loop
=loop
)
62 self
._watch
_received
= asyncio
.Event(loop
=loop
)
64 async def connect_current(self
):
65 self
.connection
= await connection
.Connection
.connect_current()
67 await self
._watch
_received
.wait()
69 async def disconnect(self
):
71 if self
.connection
and self
.connection
.is_open
:
72 await self
._watch
_shutdown
.wait()
73 log
.debug('Closing model connection')
74 await asyncio
.wait_for(self
.connection
.close(), None)
75 self
.connection
= None
77 def all_units_idle(self
):
78 """Return True if all units are idle.
81 for unit
in self
.units
.values():
82 unit_status
= unit
.data
['agent-status']['current']
83 if unit_status
!= 'idle':
87 async def reset(self
, force
=False):
88 for app
in self
.applications
.values():
90 for machine
in self
.machines
.values():
91 await machine
.destroy(force
=force
)
93 async def block_until(self
, func
):
96 await asyncio
.sleep(.1)
97 await asyncio
.wait_for(_block(), None)
100 def applications(self
):
101 return self
.state
.get('application', {})
105 return self
.state
.get('machine', {})
109 return self
.state
.get('unit', {})
111 def add_observer(self
, callable_
):
112 """Register an "on-model-change" callback
114 Once a watch is started (Model.watch() is called), ``callable_``
115 will be called each time the model changes. callable_ should
116 accept the following positional arguments:
118 delta - An instance of :class:`juju.delta.EntityDelta`
119 containing the raw delta data recv'd from the Juju
122 old_obj - If the delta modifies an existing object in the model,
123 old_obj will be a copy of that object, as it was before the
124 delta was applied. Will be None if the delta creates a new
127 new_obj - A copy of the new or updated object, after the delta
128 is applied. Will be None if the delta removes an entity
131 model - The :class:`Model` itself.
134 self
.observers
.add(callable_
)
137 """Start an asynchronous watch against this model.
139 See :meth:`add_observer` to register an onchange callback.
142 async def _start_watch():
143 self
._watch
_shutdown
.clear()
145 allwatcher
= watcher
.AllWatcher()
146 self
._watch
_conn
= await self
.connection
.clone()
147 allwatcher
.connect(self
._watch
_conn
)
149 results
= await allwatcher
.Next()
150 for delta
in results
.deltas
:
151 delta
= get_entity_delta(delta
)
152 old_obj
, new_obj
= self
._apply
_delta
(delta
)
153 self
._notify
_observers
(delta
, old_obj
, new_obj
)
154 self
._watch
_received
.set()
155 except CancelledError
:
156 log
.debug('Closing watcher connection')
157 await asyncio
.wait_for(self
._watch
_conn
.close(), None)
158 self
._watch
_shutdown
.set()
159 self
._watch
_conn
= None
161 log
.debug('Starting watcher task')
162 self
._watcher
_task
= self
.loop
.create_task(_start_watch())
164 def _stop_watching(self
):
165 """Stop the asynchronous watch against this model.
168 log
.debug('Stopping watcher task')
169 if self
._watcher
_task
:
170 self
._watcher
_task
.cancel()
172 def _apply_delta(self
, delta
):
173 """Apply delta to our model state and return the a copy of the
174 affected object as it was before and after the update, e.g.:
176 old_obj, new_obj = self._apply_delta(delta)
178 old_obj may be None if the delta is for the creation of a new object,
179 e.g. a new application or unit is deployed.
181 new_obj may be None if no object was created or updated, or if an
182 object was deleted as a result of the delta being applied.
185 old_obj
, new_obj
= None, None
187 if (delta
.entity
in self
.state
and
188 delta
.get_id() in self
.state
[delta
.entity
]):
189 old_obj
= self
.state
[delta
.entity
][delta
.get_id()]
190 if delta
.type == 'remove':
191 del self
.state
[delta
.entity
][delta
.get_id()]
192 return old_obj
, new_obj
194 new_obj
= self
.state
.setdefault(delta
.entity
, {})[delta
.get_id()] = (
195 self
._create
_model
_entity
(delta
))
197 return old_obj
, new_obj
199 def _create_model_entity(self
, delta
):
200 """Return an object instance representing the entity created or
204 entity_class
= delta
.get_entity_class()
205 return entity_class(delta
.data
, self
)
207 def _notify_observers(self
, delta
, old_obj
, new_obj
):
208 """Call observing callbacks, notifying them of a change in model state
210 :param delta: The raw change from the watcher
211 (:class:`juju.client.overrides.Delta`)
212 :param old_obj: The object in the model that this delta updates.
214 :param new_obj: The object in the model that is created or updated
215 by applying this delta.
218 for o
in self
.observers
:
219 o(delta
, old_obj
, new_obj
, self
)
222 self
, spec
=None, constraints
=None, disks
=None, series
=None,
224 """Start a new, empty machine and optionally a container, or add a
225 container to a machine.
227 :param str spec: Machine specification
230 (None) - starts a new machine
231 'lxc' - starts a new machine with on lxc container
232 'lxc:4' - starts a new lxc container on machine 4
233 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
234 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
235 'maas2.name' - acquire machine maas2.name on MAAS
236 :param constraints: Machine constraints
237 :type constraints: :class:`juju.Constraints`
238 :param list disks: List of disk :class:`constraints <juju.Constraints>`
239 :param str series: Series
240 :param int count: Number of machines to deploy
242 Supported container types are: lxc, lxd, kvm
244 When deploying a container to an existing machine, constraints cannot
249 add_machines
= add_machine
251 async def add_relation(self
, relation1
, relation2
):
252 """Add a relation between two applications.
254 :param str relation1: '<application>[:<relation_name>]'
255 :param str relation2: '<application>[:<relation_name>]'
258 app_facade
= client
.ApplicationFacade()
259 app_facade
.connect(self
.connection
)
262 'Adding relation %s <-> %s', relation1
, relation2
)
264 return await app_facade
.AddRelation([relation1
, relation2
])
266 def add_space(self
, name
, *cidrs
):
267 """Add a new network space.
269 Adds a new space with the given name and associates the given
270 (optional) list of existing subnet CIDRs with it.
272 :param str name: Name of the space
273 :param \*cidrs: Optional list of existing subnet CIDRs
278 def add_ssh_key(self
, key
):
279 """Add a public SSH key to this model.
281 :param str key: The public ssh key
285 add_ssh_keys
= add_ssh_key
287 def add_subnet(self
, cidr_or_id
, space
, *zones
):
288 """Add an existing subnet to this model.
290 :param str cidr_or_id: CIDR or provider ID of the existing subnet
291 :param str space: Network space with which to associate
292 :param str \*zones: Zone(s) in which the subnet resides
297 def get_backups(self
):
298 """Retrieve metadata for backups in this model.
303 def block(self
, *commands
):
304 """Add a new block to this model.
306 :param str \*commands: The commands to block. Valid values are
307 'all-changes', 'destroy-model', 'remove-object'
312 def get_blocks(self
):
313 """List blocks for this model.
318 def get_cached_images(self
, arch
=None, kind
=None, series
=None):
319 """Return a list of cached OS images.
321 :param str arch: Filter by image architecture
322 :param str kind: Filter by image kind, e.g. 'lxd'
323 :param str series: Filter by image series, e.g. 'xenial'
328 def create_backup(self
, note
=None, no_download
=False):
329 """Create a backup of this model.
331 :param str note: A note to store with the backup
332 :param bool no_download: Do not download the backup archive
333 :return str: Path to downloaded archive
338 def create_storage_pool(self
, name
, provider_type
, **pool_config
):
339 """Create or define a storage pool.
341 :param str name: Name to give the storage pool
342 :param str provider_type: Pool provider type
343 :param \*\*pool_config: key/value pool configuration pairs
349 self
, no_tail
=False, exclude_module
=None, include_module
=None,
350 include
=None, level
=None, limit
=0, lines
=10, replay
=False,
352 """Get log messages for this model.
354 :param bool no_tail: Stop after returning existing log messages
355 :param list exclude_module: Do not show log messages for these logging
357 :param list include_module: Only show log messages for these logging
359 :param list include: Only show log messages for these entities
360 :param str level: Log level to show, valid options are 'TRACE',
361 'DEBUG', 'INFO', 'WARNING', 'ERROR,
362 :param int limit: Return this many of the most recent (possibly
363 filtered) lines are shown
364 :param int lines: Yield this many of the most recent lines, and keep
366 :param bool replay: Yield the entire log, and keep yielding
367 :param list exclude: Do not show log messages for these entities
373 self
, entity_url
, service_name
=None, bind
=None, budget
=None,
374 channel
=None, config
=None, constraints
=None, force
=False,
375 num_units
=1, plan
=None, resources
=None, series
=None, storage
=None,
377 """Deploy a new service or bundle.
379 :param str entity_url: Charm or bundle url
380 :param str service_name: Name to give the service
381 :param dict bind: <charm endpoint>:<network space> pairs
382 :param dict budget: <budget name>:<limit> pairs
383 :param str channel: Charm store channel from which to retrieve
384 the charm or bundle, e.g. 'development'
385 :param dict config: Charm configuration dictionary
386 :param constraints: Service constraints
387 :type constraints: :class:`juju.Constraints`
388 :param bool force: Allow charm to be deployed to a machine running
389 an unsupported series
390 :param int num_units: Number of units to deploy
391 :param str plan: Plan under which to deploy charm
392 :param dict resources: <resource name>:<file path> pairs
393 :param str series: Series on which to deploy
394 :param dict storage: Storage constraints TODO how do these look?
395 :param str to: Placement directive, e.g.::
398 'lxc:7' - new lxc container on machine 7
399 '24/lxc/3' - lxc container 3 or machine 24
401 If None, a new machine is provisioned.
406 - entity_url must have a revision; look up latest automatically if
407 not provided by caller
408 - service_name is required; fill this in automatically if not
410 - series is required; how do we pick a default?
414 constraints
= client
.Value(**constraints
)
418 client
.Placement(**p
) for p
in to
425 k
: client
.Constraints(**v
)
426 for k
, v
in storage
.items()
429 app_facade
= client
.ApplicationFacade()
430 client_facade
= client
.ClientFacade()
431 app_facade
.connect(self
.connection
)
432 client_facade
.connect(self
.connection
)
435 'Deploying %s', entity_url
)
437 await client_facade
.AddCharm(channel
, entity_url
)
438 app
= client
.ApplicationDeploy(
439 application
=service_name
,
441 charm_url
=entity_url
,
443 constraints
=constraints
,
444 endpoint_bindings
=bind
,
452 return await app_facade
.Deploy([app
])
455 """Terminate all machines and resources for this model.
460 def get_backup(self
, archive_id
):
461 """Download a backup archive file.
463 :param str archive_id: The id of the archive to download
464 :return str: Path to the archive file
470 self
, num_controllers
=0, constraints
=None, series
=None, to
=None):
471 """Ensure sufficient controllers exist to provide redundancy.
473 :param int num_controllers: Number of controllers to make available
474 :param constraints: Constraints to apply to the controller machines
475 :type constraints: :class:`juju.Constraints`
476 :param str series: Series of the controller machines
477 :param list to: Placement directives for controller machines, e.g.::
480 'lxc:7' - new lxc container on machine 7
481 '24/lxc/3' - lxc container 3 or machine 24
483 If None, a new machine is provisioned.
488 def get_config(self
):
489 """Return the configuration settings for this model.
494 def get_constraints(self
):
495 """Return the machine constraints for this model.
500 def grant(self
, username
, acl
='read'):
501 """Grant a user access to this model.
503 :param str username: Username
504 :param str acl: Access control ('read' or 'write')
509 def import_ssh_key(self
, identity
):
510 """Add a public SSH key from a trusted indentity source to this model.
512 :param str identity: User identity in the form <lp|gh>:<username>
516 import_ssh_keys
= import_ssh_key
518 def get_machines(self
, machine
, utc
=False):
519 """Return list of machines in this model.
521 :param str machine: Machine id, e.g. '0'
522 :param bool utc: Display time as UTC in RFC3339 format
527 def get_shares(self
):
528 """Return list of all users with access to this model.
533 def get_spaces(self
):
534 """Return list of all known spaces, including associated subnets.
539 def get_ssh_key(self
):
540 """Return known SSH keys for this model.
544 get_ssh_keys
= get_ssh_key
546 def get_storage(self
, filesystem
=False, volume
=False):
547 """Return details of storage instances.
549 :param bool filesystem: Include filesystem storage
550 :param bool volume: Include volume storage
555 def get_storage_pools(self
, names
=None, providers
=None):
556 """Return list of storage pools.
558 :param list names: Only include pools with these names
559 :param list providers: Only include pools for these providers
564 def get_subnets(self
, space
=None, zone
=None):
565 """Return list of known subnets.
567 :param str space: Only include subnets in this space
568 :param str zone: Only include subnets in this zone
573 def remove_blocks(self
):
574 """Remove all blocks from this model.
579 def remove_backup(self
, backup_id
):
582 :param str backup_id: The id of the backup to remove
587 def remove_cached_images(self
, arch
=None, kind
=None, series
=None):
588 """Remove cached OS images.
590 :param str arch: Architecture of the images to remove
591 :param str kind: Image kind to remove, e.g. 'lxd'
592 :param str series: Image series to remove, e.g. 'xenial'
597 def remove_machine(self
, *machine_ids
):
598 """Remove a machine from this model.
600 :param str \*machine_ids: Ids of the machines to remove
604 remove_machines
= remove_machine
606 def remove_ssh_key(self
, *keys
):
607 """Remove a public SSH key(s) from this model.
609 :param str \*keys: Keys to remove
613 remove_ssh_keys
= remove_ssh_key
616 self
, bootstrap
=False, constraints
=None, archive
=None,
617 backup_id
=None, upload_tools
=False):
618 """Restore a backup archive to a new controller.
620 :param bool bootstrap: Bootstrap a new state machine
621 :param constraints: Model constraints
622 :type constraints: :class:`juju.Constraints`
623 :param str archive: Path to backup archive to restore
624 :param str backup_id: Id of backup to restore
625 :param bool upload_tools: Upload tools if bootstrapping a new machine
630 def retry_provisioning(self
):
631 """Retry provisioning for failed machines.
636 def revoke(self
, username
, acl
='read'):
637 """Revoke a user's access to this model.
639 :param str username: Username to revoke
640 :param str acl: Access control ('read' or 'write')
645 def run(self
, command
, timeout
=None):
646 """Run command on all machines in this model.
648 :param str command: The command to run
649 :param int timeout: Time to wait before command is considered failed
654 def set_config(self
, **config
):
655 """Set configuration keys on this model.
657 :param \*\*config: Config key/values
662 def set_constraints(self
, constraints
):
663 """Set machine constraints on this model.
665 :param :class:`juju.Constraints` constraints: Machine constraints
670 def get_action_output(self
, action_uuid
, wait
=-1):
671 """Get the results of an action by ID.
673 :param str action_uuid: Id of the action
674 :param int wait: Time in seconds to wait for action to complete
679 def get_action_status(self
, uuid_or_prefix
=None, name
=None):
680 """Get the status of all actions, filtered by ID, ID prefix, or action name.
682 :param str uuid_or_prefix: Filter by action uuid or prefix
683 :param str name: Filter by action name
688 def get_budget(self
, budget_name
):
689 """Get budget usage info.
691 :param str budget_name: Name of budget
696 def get_status(self
, filter_
=None, utc
=False):
697 """Return the status of the model.
699 :param str filter_: Service or unit name or wildcard ('*')
700 :param bool utc: Display time as UTC in RFC3339 format
707 self
, all_
=False, destination
=None, dry_run
=False, public
=False,
708 source
=None, stream
=None, version
=None):
709 """Copy Juju tools into this model.
711 :param bool all_: Copy all versions, not just the latest
712 :param str destination: Path to local destination directory
713 :param bool dry_run: Don't do the actual copy
714 :param bool public: Tools are for a public cloud, so generate mirrors
716 :param str source: Path to local source directory
717 :param str stream: Simplestreams stream for which to sync metadata
718 :param str version: Copy a specific major.minor version
723 def unblock(self
, *commands
):
724 """Unblock an operation that would alter this model.
726 :param str \*commands: The commands to unblock. Valid values are
727 'all-changes', 'destroy-model', 'remove-object'
732 def unset_config(self
, *keys
):
733 """Unset configuration on this model.
735 :param str \*keys: The keys to unset
740 def upgrade_gui(self
):
741 """Upgrade the Juju GUI for this model.
747 self
, dry_run
=False, reset_previous_upgrade
=False,
748 upload_tools
=False, version
=None):
749 """Upgrade Juju on all machines in a model.
751 :param bool dry_run: Don't do the actual upgrade
752 :param bool reset_previous_upgrade: Clear the previous (incomplete)
754 :param bool upload_tools: Upload local version of tools
755 :param str version: Upgrade to a specific version
760 def upload_backup(self
, archive_path
):
761 """Store a backup archive remotely in Juju.
763 :param str archive_path: Path to local archive