Add relate example
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import logging
3 from concurrent.futures import CancelledError
4
5 from .client import client
6 from .client import watcher
7 from .client import connection
8 from .delta import get_entity_delta
9
10 log = logging.getLogger(__name__)
11
12
13 class ModelObserver(object):
14 def __call__(self, delta, old, new, model):
15 if old is None and new is not None:
16 type_ = 'add'
17 else:
18 type_ = delta.type
19 handler_name = 'on_{}_{}'.format(delta.entity, type_)
20 method = getattr(self, handler_name, self.on_change)
21 log.debug(
22 'Model changed: %s %s %s',
23 delta.entity, delta.type, delta.get_id())
24 method(delta, old, new, model)
25
26 def on_change(self, delta, old, new, model):
27 pass
28
29
30 class ModelEntity(object):
31 """An object in the Model tree"""
32
33 def __init__(self, data, model):
34 """Initialize a new entity
35
36 :param data: dict of data from a watcher delta
37 :param model: The model instance in whose object tree this
38 entity resides
39
40 """
41 self.data = data
42 self.model = model
43 self.connection = model.connection
44
45 def __getattr__(self, name):
46 return self.data[name]
47
48
49 class Model(object):
50 def __init__(self, loop=None):
51 """Instantiate a new connected Model.
52
53 :param loop: an asyncio event loop
54
55 """
56 self.loop = loop or asyncio.get_event_loop()
57 self.connection = None
58 self.observers = set()
59 self.state = dict()
60 self._watcher_task = None
61 self._watch_shutdown = asyncio.Event(loop=loop)
62 self._watch_received = asyncio.Event(loop=loop)
63
64 async def connect_current(self):
65 self.connection = await connection.Connection.connect_current()
66 self._watch()
67 await self._watch_received.wait()
68
69 async def disconnect(self):
70 self._stop_watching()
71 if self.connection and self.connection.is_open:
72 await self._watch_shutdown.wait()
73 log.debug('Closing model connection')
74 await asyncio.wait_for(self.connection.close(), None)
75 self.connection = None
76
77 def all_units_idle(self):
78 """Return True if all units are idle.
79
80 """
81 for unit in self.units.values():
82 unit_status = unit.data['agent-status']['current']
83 if unit_status != 'idle':
84 return False
85 return True
86
87 async def reset(self, force=False):
88 for app in self.applications.values():
89 await app.destroy()
90 for machine in self.machines.values():
91 await machine.destroy(force=force)
92
93 async def block_until(self, func):
94 async def _block():
95 while not func():
96 await asyncio.sleep(.1)
97 await asyncio.wait_for(_block(), None)
98
99 @property
100 def applications(self):
101 return self.state.get('application', {})
102
103 @property
104 def machines(self):
105 return self.state.get('machine', {})
106
107 @property
108 def units(self):
109 return self.state.get('unit', {})
110
111 def add_observer(self, callable_):
112 """Register an "on-model-change" callback
113
114 Once a watch is started (Model.watch() is called), ``callable_``
115 will be called each time the model changes. callable_ should
116 accept the following positional arguments:
117
118 delta - An instance of :class:`juju.delta.EntityDelta`
119 containing the raw delta data recv'd from the Juju
120 websocket.
121
122 old_obj - If the delta modifies an existing object in the model,
123 old_obj will be a copy of that object, as it was before the
124 delta was applied. Will be None if the delta creates a new
125 entity in the model.
126
127 new_obj - A copy of the new or updated object, after the delta
128 is applied. Will be None if the delta removes an entity
129 from the model.
130
131 model - The :class:`Model` itself.
132
133 """
134 self.observers.add(callable_)
135
136 def _watch(self):
137 """Start an asynchronous watch against this model.
138
139 See :meth:`add_observer` to register an onchange callback.
140
141 """
142 async def _start_watch():
143 self._watch_shutdown.clear()
144 try:
145 allwatcher = watcher.AllWatcher()
146 self._watch_conn = await self.connection.clone()
147 allwatcher.connect(self._watch_conn)
148 while True:
149 results = await allwatcher.Next()
150 for delta in results.deltas:
151 delta = get_entity_delta(delta)
152 old_obj, new_obj = self._apply_delta(delta)
153 self._notify_observers(delta, old_obj, new_obj)
154 self._watch_received.set()
155 except CancelledError:
156 log.debug('Closing watcher connection')
157 await asyncio.wait_for(self._watch_conn.close(), None)
158 self._watch_shutdown.set()
159 self._watch_conn = None
160
161 log.debug('Starting watcher task')
162 self._watcher_task = self.loop.create_task(_start_watch())
163
164 def _stop_watching(self):
165 """Stop the asynchronous watch against this model.
166
167 """
168 log.debug('Stopping watcher task')
169 if self._watcher_task:
170 self._watcher_task.cancel()
171
172 def _apply_delta(self, delta):
173 """Apply delta to our model state and return the a copy of the
174 affected object as it was before and after the update, e.g.:
175
176 old_obj, new_obj = self._apply_delta(delta)
177
178 old_obj may be None if the delta is for the creation of a new object,
179 e.g. a new application or unit is deployed.
180
181 new_obj may be None if no object was created or updated, or if an
182 object was deleted as a result of the delta being applied.
183
184 """
185 old_obj, new_obj = None, None
186
187 if (delta.entity in self.state and
188 delta.get_id() in self.state[delta.entity]):
189 old_obj = self.state[delta.entity][delta.get_id()]
190 if delta.type == 'remove':
191 del self.state[delta.entity][delta.get_id()]
192 return old_obj, new_obj
193
194 new_obj = self.state.setdefault(delta.entity, {})[delta.get_id()] = (
195 self._create_model_entity(delta))
196
197 return old_obj, new_obj
198
199 def _create_model_entity(self, delta):
200 """Return an object instance representing the entity created or
201 updated by ``delta``
202
203 """
204 entity_class = delta.get_entity_class()
205 return entity_class(delta.data, self)
206
207 def _notify_observers(self, delta, old_obj, new_obj):
208 """Call observing callbacks, notifying them of a change in model state
209
210 :param delta: The raw change from the watcher
211 (:class:`juju.client.overrides.Delta`)
212 :param old_obj: The object in the model that this delta updates.
213 May be None.
214 :param new_obj: The object in the model that is created or updated
215 by applying this delta.
216
217 """
218 for o in self.observers:
219 o(delta, old_obj, new_obj, self)
220
221 def add_machine(
222 self, spec=None, constraints=None, disks=None, series=None,
223 count=1):
224 """Start a new, empty machine and optionally a container, or add a
225 container to a machine.
226
227 :param str spec: Machine specification
228 Examples::
229
230 (None) - starts a new machine
231 'lxc' - starts a new machine with on lxc container
232 'lxc:4' - starts a new lxc container on machine 4
233 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
234 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
235 'maas2.name' - acquire machine maas2.name on MAAS
236 :param constraints: Machine constraints
237 :type constraints: :class:`juju.Constraints`
238 :param list disks: List of disk :class:`constraints <juju.Constraints>`
239 :param str series: Series
240 :param int count: Number of machines to deploy
241
242 Supported container types are: lxc, lxd, kvm
243
244 When deploying a container to an existing machine, constraints cannot
245 be used.
246
247 """
248 pass
249 add_machines = add_machine
250
251 async def add_relation(self, relation1, relation2):
252 """Add a relation between two applications.
253
254 :param str relation1: '<application>[:<relation_name>]'
255 :param str relation2: '<application>[:<relation_name>]'
256
257 """
258 app_facade = client.ApplicationFacade()
259 app_facade.connect(self.connection)
260
261 log.debug(
262 'Adding relation %s <-> %s', relation1, relation2)
263
264 return await app_facade.AddRelation([relation1, relation2])
265
266 def add_space(self, name, *cidrs):
267 """Add a new network space.
268
269 Adds a new space with the given name and associates the given
270 (optional) list of existing subnet CIDRs with it.
271
272 :param str name: Name of the space
273 :param \*cidrs: Optional list of existing subnet CIDRs
274
275 """
276 pass
277
278 def add_ssh_key(self, key):
279 """Add a public SSH key to this model.
280
281 :param str key: The public ssh key
282
283 """
284 pass
285 add_ssh_keys = add_ssh_key
286
287 def add_subnet(self, cidr_or_id, space, *zones):
288 """Add an existing subnet to this model.
289
290 :param str cidr_or_id: CIDR or provider ID of the existing subnet
291 :param str space: Network space with which to associate
292 :param str \*zones: Zone(s) in which the subnet resides
293
294 """
295 pass
296
297 def get_backups(self):
298 """Retrieve metadata for backups in this model.
299
300 """
301 pass
302
303 def block(self, *commands):
304 """Add a new block to this model.
305
306 :param str \*commands: The commands to block. Valid values are
307 'all-changes', 'destroy-model', 'remove-object'
308
309 """
310 pass
311
312 def get_blocks(self):
313 """List blocks for this model.
314
315 """
316 pass
317
318 def get_cached_images(self, arch=None, kind=None, series=None):
319 """Return a list of cached OS images.
320
321 :param str arch: Filter by image architecture
322 :param str kind: Filter by image kind, e.g. 'lxd'
323 :param str series: Filter by image series, e.g. 'xenial'
324
325 """
326 pass
327
328 def create_backup(self, note=None, no_download=False):
329 """Create a backup of this model.
330
331 :param str note: A note to store with the backup
332 :param bool no_download: Do not download the backup archive
333 :return str: Path to downloaded archive
334
335 """
336 pass
337
338 def create_storage_pool(self, name, provider_type, **pool_config):
339 """Create or define a storage pool.
340
341 :param str name: Name to give the storage pool
342 :param str provider_type: Pool provider type
343 :param \*\*pool_config: key/value pool configuration pairs
344
345 """
346 pass
347
348 def debug_log(
349 self, no_tail=False, exclude_module=None, include_module=None,
350 include=None, level=None, limit=0, lines=10, replay=False,
351 exclude=None):
352 """Get log messages for this model.
353
354 :param bool no_tail: Stop after returning existing log messages
355 :param list exclude_module: Do not show log messages for these logging
356 modules
357 :param list include_module: Only show log messages for these logging
358 modules
359 :param list include: Only show log messages for these entities
360 :param str level: Log level to show, valid options are 'TRACE',
361 'DEBUG', 'INFO', 'WARNING', 'ERROR,
362 :param int limit: Return this many of the most recent (possibly
363 filtered) lines are shown
364 :param int lines: Yield this many of the most recent lines, and keep
365 yielding
366 :param bool replay: Yield the entire log, and keep yielding
367 :param list exclude: Do not show log messages for these entities
368
369 """
370 pass
371
372 async def deploy(
373 self, entity_url, service_name=None, bind=None, budget=None,
374 channel=None, config=None, constraints=None, force=False,
375 num_units=1, plan=None, resources=None, series=None, storage=None,
376 to=None):
377 """Deploy a new service or bundle.
378
379 :param str entity_url: Charm or bundle url
380 :param str service_name: Name to give the service
381 :param dict bind: <charm endpoint>:<network space> pairs
382 :param dict budget: <budget name>:<limit> pairs
383 :param str channel: Charm store channel from which to retrieve
384 the charm or bundle, e.g. 'development'
385 :param dict config: Charm configuration dictionary
386 :param constraints: Service constraints
387 :type constraints: :class:`juju.Constraints`
388 :param bool force: Allow charm to be deployed to a machine running
389 an unsupported series
390 :param int num_units: Number of units to deploy
391 :param str plan: Plan under which to deploy charm
392 :param dict resources: <resource name>:<file path> pairs
393 :param str series: Series on which to deploy
394 :param dict storage: Storage constraints TODO how do these look?
395 :param str to: Placement directive, e.g.::
396
397 '23' - machine 23
398 'lxc:7' - new lxc container on machine 7
399 '24/lxc/3' - lxc container 3 or machine 24
400
401 If None, a new machine is provisioned.
402
403
404 TODO::
405
406 - entity_url must have a revision; look up latest automatically if
407 not provided by caller
408 - service_name is required; fill this in automatically if not
409 provided by caller
410 - series is required; how do we pick a default?
411
412 """
413 if constraints:
414 constraints = client.Value(**constraints)
415
416 if to:
417 placement = [
418 client.Placement(**p) for p in to
419 ]
420 else:
421 placement = []
422
423 if storage:
424 storage = {
425 k: client.Constraints(**v)
426 for k, v in storage.items()
427 }
428
429 app_facade = client.ApplicationFacade()
430 client_facade = client.ClientFacade()
431 app_facade.connect(self.connection)
432 client_facade.connect(self.connection)
433
434 log.debug(
435 'Deploying %s', entity_url)
436
437 await client_facade.AddCharm(channel, entity_url)
438 app = client.ApplicationDeploy(
439 application=service_name,
440 channel=channel,
441 charm_url=entity_url,
442 config=config,
443 constraints=constraints,
444 endpoint_bindings=bind,
445 num_units=num_units,
446 placement=placement,
447 resources=resources,
448 series=series,
449 storage=storage,
450 )
451
452 return await app_facade.Deploy([app])
453
454 def destroy(self):
455 """Terminate all machines and resources for this model.
456
457 """
458 pass
459
460 def get_backup(self, archive_id):
461 """Download a backup archive file.
462
463 :param str archive_id: The id of the archive to download
464 :return str: Path to the archive file
465
466 """
467 pass
468
469 def enable_ha(
470 self, num_controllers=0, constraints=None, series=None, to=None):
471 """Ensure sufficient controllers exist to provide redundancy.
472
473 :param int num_controllers: Number of controllers to make available
474 :param constraints: Constraints to apply to the controller machines
475 :type constraints: :class:`juju.Constraints`
476 :param str series: Series of the controller machines
477 :param list to: Placement directives for controller machines, e.g.::
478
479 '23' - machine 23
480 'lxc:7' - new lxc container on machine 7
481 '24/lxc/3' - lxc container 3 or machine 24
482
483 If None, a new machine is provisioned.
484
485 """
486 pass
487
488 def get_config(self):
489 """Return the configuration settings for this model.
490
491 """
492 pass
493
494 def get_constraints(self):
495 """Return the machine constraints for this model.
496
497 """
498 pass
499
500 def grant(self, username, acl='read'):
501 """Grant a user access to this model.
502
503 :param str username: Username
504 :param str acl: Access control ('read' or 'write')
505
506 """
507 pass
508
509 def import_ssh_key(self, identity):
510 """Add a public SSH key from a trusted indentity source to this model.
511
512 :param str identity: User identity in the form <lp|gh>:<username>
513
514 """
515 pass
516 import_ssh_keys = import_ssh_key
517
518 def get_machines(self, machine, utc=False):
519 """Return list of machines in this model.
520
521 :param str machine: Machine id, e.g. '0'
522 :param bool utc: Display time as UTC in RFC3339 format
523
524 """
525 pass
526
527 def get_shares(self):
528 """Return list of all users with access to this model.
529
530 """
531 pass
532
533 def get_spaces(self):
534 """Return list of all known spaces, including associated subnets.
535
536 """
537 pass
538
539 def get_ssh_key(self):
540 """Return known SSH keys for this model.
541
542 """
543 pass
544 get_ssh_keys = get_ssh_key
545
546 def get_storage(self, filesystem=False, volume=False):
547 """Return details of storage instances.
548
549 :param bool filesystem: Include filesystem storage
550 :param bool volume: Include volume storage
551
552 """
553 pass
554
555 def get_storage_pools(self, names=None, providers=None):
556 """Return list of storage pools.
557
558 :param list names: Only include pools with these names
559 :param list providers: Only include pools for these providers
560
561 """
562 pass
563
564 def get_subnets(self, space=None, zone=None):
565 """Return list of known subnets.
566
567 :param str space: Only include subnets in this space
568 :param str zone: Only include subnets in this zone
569
570 """
571 pass
572
573 def remove_blocks(self):
574 """Remove all blocks from this model.
575
576 """
577 pass
578
579 def remove_backup(self, backup_id):
580 """Delete a backup.
581
582 :param str backup_id: The id of the backup to remove
583
584 """
585 pass
586
587 def remove_cached_images(self, arch=None, kind=None, series=None):
588 """Remove cached OS images.
589
590 :param str arch: Architecture of the images to remove
591 :param str kind: Image kind to remove, e.g. 'lxd'
592 :param str series: Image series to remove, e.g. 'xenial'
593
594 """
595 pass
596
597 def remove_machine(self, *machine_ids):
598 """Remove a machine from this model.
599
600 :param str \*machine_ids: Ids of the machines to remove
601
602 """
603 pass
604 remove_machines = remove_machine
605
606 def remove_ssh_key(self, *keys):
607 """Remove a public SSH key(s) from this model.
608
609 :param str \*keys: Keys to remove
610
611 """
612 pass
613 remove_ssh_keys = remove_ssh_key
614
615 def restore_backup(
616 self, bootstrap=False, constraints=None, archive=None,
617 backup_id=None, upload_tools=False):
618 """Restore a backup archive to a new controller.
619
620 :param bool bootstrap: Bootstrap a new state machine
621 :param constraints: Model constraints
622 :type constraints: :class:`juju.Constraints`
623 :param str archive: Path to backup archive to restore
624 :param str backup_id: Id of backup to restore
625 :param bool upload_tools: Upload tools if bootstrapping a new machine
626
627 """
628 pass
629
630 def retry_provisioning(self):
631 """Retry provisioning for failed machines.
632
633 """
634 pass
635
636 def revoke(self, username, acl='read'):
637 """Revoke a user's access to this model.
638
639 :param str username: Username to revoke
640 :param str acl: Access control ('read' or 'write')
641
642 """
643 pass
644
645 def run(self, command, timeout=None):
646 """Run command on all machines in this model.
647
648 :param str command: The command to run
649 :param int timeout: Time to wait before command is considered failed
650
651 """
652 pass
653
654 def set_config(self, **config):
655 """Set configuration keys on this model.
656
657 :param \*\*config: Config key/values
658
659 """
660 pass
661
662 def set_constraints(self, constraints):
663 """Set machine constraints on this model.
664
665 :param :class:`juju.Constraints` constraints: Machine constraints
666
667 """
668 pass
669
670 def get_action_output(self, action_uuid, wait=-1):
671 """Get the results of an action by ID.
672
673 :param str action_uuid: Id of the action
674 :param int wait: Time in seconds to wait for action to complete
675
676 """
677 pass
678
679 def get_action_status(self, uuid_or_prefix=None, name=None):
680 """Get the status of all actions, filtered by ID, ID prefix, or action name.
681
682 :param str uuid_or_prefix: Filter by action uuid or prefix
683 :param str name: Filter by action name
684
685 """
686 pass
687
688 def get_budget(self, budget_name):
689 """Get budget usage info.
690
691 :param str budget_name: Name of budget
692
693 """
694 pass
695
696 def get_status(self, filter_=None, utc=False):
697 """Return the status of the model.
698
699 :param str filter_: Service or unit name or wildcard ('*')
700 :param bool utc: Display time as UTC in RFC3339 format
701
702 """
703 pass
704 status = get_status
705
706 def sync_tools(
707 self, all_=False, destination=None, dry_run=False, public=False,
708 source=None, stream=None, version=None):
709 """Copy Juju tools into this model.
710
711 :param bool all_: Copy all versions, not just the latest
712 :param str destination: Path to local destination directory
713 :param bool dry_run: Don't do the actual copy
714 :param bool public: Tools are for a public cloud, so generate mirrors
715 information
716 :param str source: Path to local source directory
717 :param str stream: Simplestreams stream for which to sync metadata
718 :param str version: Copy a specific major.minor version
719
720 """
721 pass
722
723 def unblock(self, *commands):
724 """Unblock an operation that would alter this model.
725
726 :param str \*commands: The commands to unblock. Valid values are
727 'all-changes', 'destroy-model', 'remove-object'
728
729 """
730 pass
731
732 def unset_config(self, *keys):
733 """Unset configuration on this model.
734
735 :param str \*keys: The keys to unset
736
737 """
738 pass
739
740 def upgrade_gui(self):
741 """Upgrade the Juju GUI for this model.
742
743 """
744 pass
745
746 def upgrade_juju(
747 self, dry_run=False, reset_previous_upgrade=False,
748 upload_tools=False, version=None):
749 """Upgrade Juju on all machines in a model.
750
751 :param bool dry_run: Don't do the actual upgrade
752 :param bool reset_previous_upgrade: Clear the previous (incomplete)
753 upgrade status
754 :param bool upload_tools: Upload local version of tools
755 :param str version: Upgrade to a specific version
756
757 """
758 pass
759
760 def upload_backup(self, archive_path):
761 """Store a backup archive remotely in Juju.
762
763 :param str archive_path: Path to local archive
764
765 """
766 pass