Allow underscore to dash translation when accessing model attributes (#101)
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import collections
3 import json
4 import logging
5 import os
6 import re
7 import stat
8 import tempfile
9 import weakref
10 import zipfile
11 from concurrent.futures import CancelledError
12 from functools import partial
13 from pathlib import Path
14
15 import yaml
16 import theblues.charmstore
17 import theblues.errors
18
19 from .client import client
20 from .client import watcher
21 from .client import connection
22 from .constraints import parse as parse_constraints, normalize_key
23 from .delta import get_entity_delta
24 from .delta import get_entity_class
25 from .exceptions import DeadEntityException
26 from .errors import JujuError, JujuAPIError
27 from .placement import parse as parse_placement
28
29 log = logging.getLogger(__name__)
30
31
32 class _Observer(object):
33 """Wrapper around an observer callable.
34
35 This wrapper allows filter criteria to be associated with the
36 callable so that it's only called for changes that meet the criteria.
37
38 """
39 def __init__(self, callable_, entity_type, action, entity_id, predicate):
40 self.callable_ = callable_
41 self.entity_type = entity_type
42 self.action = action
43 self.entity_id = entity_id
44 self.predicate = predicate
45 if self.entity_id:
46 self.entity_id = str(self.entity_id)
47 if not self.entity_id.startswith('^'):
48 self.entity_id = '^' + self.entity_id
49 if not self.entity_id.endswith('$'):
50 self.entity_id += '$'
51
52 async def __call__(self, delta, old, new, model):
53 await self.callable_(delta, old, new, model)
54
55 def cares_about(self, delta):
56 """Return True if this observer "cares about" (i.e. wants to be
57 called) for a this delta.
58
59 """
60 if (self.entity_id and delta.get_id() and
61 not re.match(self.entity_id, str(delta.get_id()))):
62 return False
63
64 if self.entity_type and self.entity_type != delta.entity:
65 return False
66
67 if self.action and self.action != delta.type:
68 return False
69
70 if self.predicate and not self.predicate(delta):
71 return False
72
73 return True
74
75
76 class ModelObserver(object):
77 async def __call__(self, delta, old, new, model):
78 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
79 method = getattr(self, handler_name, self.on_change)
80 await method(delta, old, new, model)
81
82 async def on_change(self, delta, old, new, model):
83 """Generic model-change handler.
84
85 :param delta: :class:`juju.client.overrides.Delta`
86 :param old: :class:`juju.model.ModelEntity`
87 :param new: :class:`juju.model.ModelEntity`
88 :param model: :class:`juju.model.Model`
89
90 """
91 pass
92
93
94 class ModelState(object):
95 """Holds the state of the model, including the delta history of all
96 entities in the model.
97
98 """
99 def __init__(self, model):
100 self.model = model
101 self.state = dict()
102
103 def _live_entity_map(self, entity_type):
104 """Return an id:Entity map of all the living entities of
105 type ``entity_type``.
106
107 """
108 return {
109 entity_id: self.get_entity(entity_type, entity_id)
110 for entity_id, history in self.state.get(entity_type, {}).items()
111 if history[-1] is not None
112 }
113
114 @property
115 def applications(self):
116 """Return a map of application-name:Application for all applications
117 currently in the model.
118
119 """
120 return self._live_entity_map('application')
121
122 @property
123 def machines(self):
124 """Return a map of machine-id:Machine for all machines currently in
125 the model.
126
127 """
128 return self._live_entity_map('machine')
129
130 @property
131 def units(self):
132 """Return a map of unit-id:Unit for all units currently in
133 the model.
134
135 """
136 return self._live_entity_map('unit')
137
138 def entity_history(self, entity_type, entity_id):
139 """Return the history deque for an entity.
140
141 """
142 return self.state[entity_type][entity_id]
143
144 def entity_data(self, entity_type, entity_id, history_index):
145 """Return the data dict for an entity at a specific index of its
146 history.
147
148 """
149 return self.entity_history(entity_type, entity_id)[history_index]
150
151 def apply_delta(self, delta):
152 """Apply delta to our state and return a copy of the
153 affected object as it was before and after the update, e.g.:
154
155 old_obj, new_obj = self.apply_delta(delta)
156
157 old_obj may be None if the delta is for the creation of a new object,
158 e.g. a new application or unit is deployed.
159
160 new_obj will never be None, but may be dead (new_obj.dead == True)
161 if the object was deleted as a result of the delta being applied.
162
163 """
164 history = (
165 self.state
166 .setdefault(delta.entity, {})
167 .setdefault(delta.get_id(), collections.deque())
168 )
169
170 history.append(delta.data)
171 if delta.type == 'remove':
172 history.append(None)
173
174 entity = self.get_entity(delta.entity, delta.get_id())
175 return entity.previous(), entity
176
177 def get_entity(
178 self, entity_type, entity_id, history_index=-1, connected=True):
179 """Return an object instance for the given entity_type and id.
180
181 By default the object state matches the most recent state from
182 Juju. To get an instance of the object in an older state, pass
183 history_index, an index into the history deque for the entity.
184
185 """
186
187 if history_index < 0 and history_index != -1:
188 history_index += len(self.entity_history(entity_type, entity_id))
189 if history_index < 0:
190 return None
191
192 try:
193 self.entity_data(entity_type, entity_id, history_index)
194 except IndexError:
195 return None
196
197 entity_class = get_entity_class(entity_type)
198 return entity_class(
199 entity_id, self.model, history_index=history_index,
200 connected=connected)
201
202
203 class ModelEntity(object):
204 """An object in the Model tree"""
205
206 def __init__(self, entity_id, model, history_index=-1, connected=True):
207 """Initialize a new entity
208
209 :param entity_id str: The unique id of the object in the model
210 :param model: The model instance in whose object tree this
211 entity resides
212 :history_index int: The index of this object's state in the model's
213 history deque for this entity
214 :connected bool: Flag indicating whether this object gets live updates
215 from the model.
216
217 """
218 self.entity_id = entity_id
219 self.model = model
220 self._history_index = history_index
221 self.connected = connected
222 self.connection = model.connection
223
224 def __repr__(self):
225 return '<{} entity_id="{}">'.format(type(self).__name__,
226 self.entity_id)
227
228 def __getattr__(self, name):
229 """Fetch object attributes from the underlying data dict held in the
230 model.
231
232 """
233 try:
234 return self.safe_data[name]
235 except KeyError:
236 name = name.replace('_', '-')
237 if name in self.safe_data:
238 return self.safe_data[name]
239 else:
240 raise
241
242 def __bool__(self):
243 return bool(self.data)
244
245 def on_change(self, callable_):
246 """Add a change observer to this entity.
247
248 """
249 self.model.add_observer(
250 callable_, self.entity_type, 'change', self.entity_id)
251
252 def on_remove(self, callable_):
253 """Add a remove observer to this entity.
254
255 """
256 self.model.add_observer(
257 callable_, self.entity_type, 'remove', self.entity_id)
258
259 @property
260 def entity_type(self):
261 """A string identifying the entity type of this object, e.g.
262 'application' or 'unit', etc.
263
264 """
265 return self.__class__.__name__.lower()
266
267 @property
268 def current(self):
269 """Return True if this object represents the current state of the
270 entity in the underlying model.
271
272 This will be True except when the object represents an entity at a
273 non-latest state in history, e.g. if the object was obtained by calling
274 .previous() on another object.
275
276 """
277 return self._history_index == -1
278
279 @property
280 def dead(self):
281 """Returns True if this entity no longer exists in the underlying
282 model.
283
284 """
285 return (
286 self.data is None or
287 self.model.state.entity_data(
288 self.entity_type, self.entity_id, -1) is None
289 )
290
291 @property
292 def alive(self):
293 """Returns True if this entity still exists in the underlying
294 model.
295
296 """
297 return not self.dead
298
299 @property
300 def data(self):
301 """The data dictionary for this entity.
302
303 """
304 return self.model.state.entity_data(
305 self.entity_type, self.entity_id, self._history_index)
306
307 @property
308 def safe_data(self):
309 """The data dictionary for this entity.
310
311 If this `ModelEntity` points to the dead state, it will
312 raise `DeadEntityException`.
313
314 """
315 if self.data is None:
316 raise DeadEntityException(
317 "Entity {}:{} is dead - its attributes can no longer be "
318 "accessed. Use the .previous() method on this object to get "
319 "a copy of the object at its previous state.".format(
320 self.entity_type, self.entity_id))
321 return self.data
322
323 def previous(self):
324 """Return a copy of this object as was at its previous state in
325 history.
326
327 Returns None if this object is new (and therefore has no history).
328
329 The returned object is always "disconnected", i.e. does not receive
330 live updates.
331
332 """
333 return self.model.state.get_entity(
334 self.entity_type, self.entity_id, self._history_index - 1,
335 connected=False)
336
337 def next(self):
338 """Return a copy of this object at its next state in
339 history.
340
341 Returns None if this object is already the latest.
342
343 The returned object is "disconnected", i.e. does not receive
344 live updates, unless it is current (latest).
345
346 """
347 if self._history_index == -1:
348 return None
349
350 new_index = self._history_index + 1
351 connected = (
352 new_index == len(self.model.state.entity_history(
353 self.entity_type, self.entity_id)) - 1
354 )
355 return self.model.state.get_entity(
356 self.entity_type, self.entity_id, self._history_index - 1,
357 connected=connected)
358
359 def latest(self):
360 """Return a copy of this object at its current state in the model.
361
362 Returns self if this object is already the latest.
363
364 The returned object is always "connected", i.e. receives
365 live updates from the model.
366
367 """
368 if self._history_index == -1:
369 return self
370
371 return self.model.state.get_entity(self.entity_type, self.entity_id)
372
373
374 class Model(object):
375 def __init__(self, loop=None):
376 """Instantiate a new connected Model.
377
378 :param loop: an asyncio event loop
379
380 """
381 self.loop = loop or asyncio.get_event_loop()
382 self.connection = None
383 self.observers = weakref.WeakValueDictionary()
384 self.state = ModelState(self)
385 self.info = None
386 self._watcher_task = None
387 self._watch_shutdown = asyncio.Event(loop=self.loop)
388 self._watch_received = asyncio.Event(loop=self.loop)
389 self._charmstore = CharmStore(self.loop)
390
391 async def connect(self, *args, **kw):
392 """Connect to an arbitrary Juju model.
393
394 args and kw are passed through to Connection.connect()
395
396 """
397 if 'loop' not in kw:
398 kw['loop'] = self.loop
399 self.connection = await connection.Connection.connect(*args, **kw)
400 await self._after_connect()
401
402 async def connect_current(self):
403 """Connect to the current Juju model.
404
405 """
406 self.connection = await connection.Connection.connect_current(
407 self.loop)
408 await self._after_connect()
409
410 async def connect_model(self, model_name):
411 """Connect to a specific Juju model by name.
412
413 :param model_name: Format [controller:][user/]model
414
415 """
416 self.connection = await connection.Connection.connect_model(model_name,
417 self.loop)
418 await self._after_connect()
419
420 async def _after_connect(self):
421 """Run initialization steps after connecting to websocket.
422
423 """
424 self._watch()
425 await self._watch_received.wait()
426 await self.get_info()
427
428 async def disconnect(self):
429 """Shut down the watcher task and close websockets.
430
431 """
432 self._stop_watching()
433 if self.connection and self.connection.is_open:
434 await self._watch_shutdown.wait()
435 log.debug('Closing model connection')
436 await self.connection.close()
437 self.connection = None
438
439 async def add_local_charm_dir(self, charm_dir, series):
440 """Upload a local charm to the model.
441
442 This will automatically generate an archive from
443 the charm dir.
444
445 :param charm_dir: Path to the charm directory
446 :param series: Charm series
447
448 """
449 fh = tempfile.NamedTemporaryFile()
450 CharmArchiveGenerator(charm_dir).make_archive(fh.name)
451 with fh:
452 func = partial(
453 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
454 charm_url = await self.loop.run_in_executor(None, func)
455
456 log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
457 return charm_url
458
459 def add_local_charm(self, charm_file, series, size=None):
460 """Upload a local charm archive to the model.
461
462 Returns the 'local:...' url that should be used to deploy the charm.
463
464 :param charm_file: Path to charm zip archive
465 :param series: Charm series
466 :param size: Size of the archive, in bytes
467 :return str: 'local:...' url for deploying the charm
468 :raises: :class:`JujuError` if the upload fails
469
470 Uses an https endpoint at the same host:port as the wss.
471 Supports large file uploads.
472
473 .. warning::
474
475 This method will block. Consider using :meth:`add_local_charm_dir`
476 instead.
477
478 """
479 conn, headers, path_prefix = self.connection.https_connection()
480 path = "%s/charms?series=%s" % (path_prefix, series)
481 headers['Content-Type'] = 'application/zip'
482 if size:
483 headers['Content-Length'] = size
484 conn.request("POST", path, charm_file, headers)
485 response = conn.getresponse()
486 result = response.read().decode()
487 if not response.status == 200:
488 raise JujuError(result)
489 result = json.loads(result)
490 return result['charm-url']
491
492 def all_units_idle(self):
493 """Return True if all units are idle.
494
495 """
496 for unit in self.units.values():
497 unit_status = unit.data['agent-status']['current']
498 if unit_status != 'idle':
499 return False
500 return True
501
502 async def reset(self, force=False):
503 """Reset the model to a clean state.
504
505 :param bool force: Force-terminate machines.
506
507 This returns only after the model has reached a clean state. "Clean"
508 means no applications or machines exist in the model.
509
510 """
511 log.debug('Resetting model')
512 for app in self.applications.values():
513 await app.destroy()
514 for machine in self.machines.values():
515 await machine.destroy(force=force)
516 await self.block_until(
517 lambda: len(self.machines) == 0
518 )
519
520 async def block_until(self, *conditions, timeout=None, wait_period=0.5):
521 """Return only after all conditions are true.
522
523 """
524 async def _block():
525 while not all(c() for c in conditions):
526 await asyncio.sleep(wait_period, loop=self.loop)
527 await asyncio.wait_for(_block(), timeout, loop=self.loop)
528
529 @property
530 def applications(self):
531 """Return a map of application-name:Application for all applications
532 currently in the model.
533
534 """
535 return self.state.applications
536
537 @property
538 def machines(self):
539 """Return a map of machine-id:Machine for all machines currently in
540 the model.
541
542 """
543 return self.state.machines
544
545 @property
546 def units(self):
547 """Return a map of unit-id:Unit for all units currently in
548 the model.
549
550 """
551 return self.state.units
552
553 async def get_info(self):
554 """Return a client.ModelInfo object for this Model.
555
556 Retrieves latest info for this Model from the api server. The
557 return value is cached on the Model.info attribute so that the
558 valued may be accessed again without another api call, if
559 desired.
560
561 This method is called automatically when the Model is connected,
562 resulting in Model.info being initialized without requiring an
563 explicit call to this method.
564
565 """
566 facade = client.ClientFacade()
567 facade.connect(self.connection)
568
569 self.info = await facade.ModelInfo()
570 log.debug('Got ModelInfo: %s', vars(self.info))
571
572 return self.info
573
574 def add_observer(
575 self, callable_, entity_type=None, action=None, entity_id=None,
576 predicate=None):
577 """Register an "on-model-change" callback
578
579 Once the model is connected, ``callable_``
580 will be called each time the model changes. ``callable_`` should
581 be Awaitable and accept the following positional arguments:
582
583 delta - An instance of :class:`juju.delta.EntityDelta`
584 containing the raw delta data recv'd from the Juju
585 websocket.
586
587 old_obj - If the delta modifies an existing object in the model,
588 old_obj will be a copy of that object, as it was before the
589 delta was applied. Will be None if the delta creates a new
590 entity in the model.
591
592 new_obj - A copy of the new or updated object, after the delta
593 is applied. Will be None if the delta removes an entity
594 from the model.
595
596 model - The :class:`Model` itself.
597
598 Events for which ``callable_`` is called can be specified by passing
599 entity_type, action, and/or entitiy_id filter criteria, e.g.::
600
601 add_observer(
602 myfunc,
603 entity_type='application', action='add', entity_id='ubuntu')
604
605 For more complex filtering conditions, pass a predicate function. It
606 will be called with a delta as its only argument. If the predicate
607 function returns True, the ``callable_`` will be called.
608
609 """
610 observer = _Observer(
611 callable_, entity_type, action, entity_id, predicate)
612 self.observers[observer] = callable_
613
614 def _watch(self):
615 """Start an asynchronous watch against this model.
616
617 See :meth:`add_observer` to register an onchange callback.
618
619 """
620 async def _start_watch():
621 self._watch_shutdown.clear()
622 try:
623 allwatcher = watcher.AllWatcher()
624 self._watch_conn = await self.connection.clone()
625 allwatcher.connect(self._watch_conn)
626 while True:
627 results = await allwatcher.Next()
628 for delta in results.deltas:
629 delta = get_entity_delta(delta)
630 old_obj, new_obj = self.state.apply_delta(delta)
631 # XXX: Might not want to shield at this level
632 # We are shielding because when the watcher is
633 # canceled (on disconnect()), we don't want all of
634 # its children (every observer callback) to be
635 # canceled with it. So we shield them. But this means
636 # they can *never* be canceled.
637 await asyncio.shield(
638 self._notify_observers(delta, old_obj, new_obj),
639 loop=self.loop)
640 self._watch_received.set()
641 except CancelledError:
642 log.debug('Closing watcher connection')
643 await self._watch_conn.close()
644 self._watch_shutdown.set()
645 self._watch_conn = None
646
647 log.debug('Starting watcher task')
648 self._watcher_task = self.loop.create_task(_start_watch())
649
650 def _stop_watching(self):
651 """Stop the asynchronous watch against this model.
652
653 """
654 log.debug('Stopping watcher task')
655 if self._watcher_task:
656 self._watcher_task.cancel()
657
658 async def _notify_observers(self, delta, old_obj, new_obj):
659 """Call observing callbacks, notifying them of a change in model state
660
661 :param delta: The raw change from the watcher
662 (:class:`juju.client.overrides.Delta`)
663 :param old_obj: The object in the model that this delta updates.
664 May be None.
665 :param new_obj: The object in the model that is created or updated
666 by applying this delta.
667
668 """
669 if new_obj and not old_obj:
670 delta.type = 'add'
671
672 log.debug(
673 'Model changed: %s %s %s',
674 delta.entity, delta.type, delta.get_id())
675
676 for o in self.observers:
677 if o.cares_about(delta):
678 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
679 loop=self.loop)
680
681 async def _wait(self, entity_type, entity_id, action, predicate=None):
682 """
683 Block the calling routine until a given action has happened to the
684 given entity
685
686 :param entity_type: The entity's type.
687 :param entity_id: The entity's id.
688 :param action: the type of action (e.g., 'add', 'change', or 'remove')
689 :param predicate: optional callable that must take as an
690 argument a delta, and must return a boolean, indicating
691 whether the delta contains the specific action we're looking
692 for. For example, you might check to see whether a 'change'
693 has a 'completed' status. See the _Observer class for details.
694
695 """
696 q = asyncio.Queue(loop=self.loop)
697
698 async def callback(delta, old, new, model):
699 await q.put(delta.get_id())
700
701 self.add_observer(callback, entity_type, action, entity_id, predicate)
702 entity_id = await q.get()
703 # object might not be in the entity_map if we were waiting for a
704 # 'remove' action
705 return self.state._live_entity_map(entity_type).get(entity_id)
706
707 async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
708 """Wait for a new object to appear in the Model and return it.
709
710 Waits for an object of type ``entity_type`` with id ``entity_id``.
711 If ``entity_id`` is ``None``, it will wait for the first new entity
712 of the correct type.
713
714 This coroutine blocks until the new object appears in the model.
715
716 """
717 # if the entity is already in the model, just return it
718 if entity_id in self.state._live_entity_map(entity_type):
719 return self.state._live_entity_map(entity_type)[entity_id]
720 # if we know the entity_id, we can trigger on any action that puts
721 # the enitty into the model; otherwise, we have to watch for the
722 # next "add" action on that entity_type
723 action = 'add' if entity_id is None else None
724 return await self._wait(entity_type, entity_id, action, predicate)
725
726 async def wait_for_action(self, action_id):
727 """Given an action, wait for it to complete."""
728
729 if action_id.startswith("action-"):
730 # if we've been passed action.tag, transform it into the
731 # id that the api deltas will use.
732 action_id = action_id[7:]
733
734 def predicate(delta):
735 return delta.data['status'] in ('completed', 'failed')
736
737 return await self._wait('action', action_id, 'change', predicate)
738
739 async def add_machine(
740 self, spec=None, constraints=None, disks=None, series=None):
741 """Start a new, empty machine and optionally a container, or add a
742 container to a machine.
743
744 :param str spec: Machine specification
745 Examples::
746
747 (None) - starts a new machine
748 'lxd' - starts a new machine with one lxd container
749 'lxd:4' - starts a new lxd container on machine 4
750 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
751 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
752 'maas2.name' - acquire machine maas2.name on MAAS
753
754 :param dict constraints: Machine constraints
755 Example::
756
757 constraints={
758 'mem': 256 * MB,
759 }
760
761 :param list disks: List of disk constraint dictionaries
762 Example::
763
764 disks=[{
765 'pool': 'rootfs',
766 'size': 10 * GB,
767 'count': 1,
768 }]
769
770 :param str series: Series, e.g. 'xenial'
771
772 Supported container types are: lxd, kvm
773
774 When deploying a container to an existing machine, constraints cannot
775 be used.
776
777 """
778 params = client.AddMachineParams()
779 params.jobs = ['JobHostUnits']
780
781 if spec:
782 placement = parse_placement(spec)
783 if placement:
784 params.placement = placement[0]
785
786 if constraints:
787 params.constraints = client.Value.from_json(constraints)
788
789 if disks:
790 params.disks = [
791 client.Constraints.from_json(o) for o in disks]
792
793 if series:
794 params.series = series
795
796 # Submit the request.
797 client_facade = client.ClientFacade()
798 client_facade.connect(self.connection)
799 results = await client_facade.AddMachines([params])
800 error = results.machines[0].error
801 if error:
802 raise ValueError("Error adding machine: %s", error.message)
803 machine_id = results.machines[0].machine
804 log.debug('Added new machine %s', machine_id)
805 return await self._wait_for_new('machine', machine_id)
806
807 async def add_relation(self, relation1, relation2):
808 """Add a relation between two applications.
809
810 :param str relation1: '<application>[:<relation_name>]'
811 :param str relation2: '<application>[:<relation_name>]'
812
813 """
814 app_facade = client.ApplicationFacade()
815 app_facade.connect(self.connection)
816
817 log.debug(
818 'Adding relation %s <-> %s', relation1, relation2)
819
820 try:
821 result = await app_facade.AddRelation([relation1, relation2])
822 except JujuAPIError as e:
823 if 'relation already exists' not in e.message:
824 raise
825 log.debug(
826 'Relation %s <-> %s already exists', relation1, relation2)
827 # TODO: if relation already exists we should return the
828 # Relation ModelEntity here
829 return None
830
831 def predicate(delta):
832 endpoints = {}
833 for endpoint in delta.data['endpoints']:
834 endpoints[endpoint['application-name']] = endpoint['relation']
835 return endpoints == result.endpoints
836
837 return await self._wait_for_new('relation', None, predicate)
838
839 def add_space(self, name, *cidrs):
840 """Add a new network space.
841
842 Adds a new space with the given name and associates the given
843 (optional) list of existing subnet CIDRs with it.
844
845 :param str name: Name of the space
846 :param \*cidrs: Optional list of existing subnet CIDRs
847
848 """
849 raise NotImplementedError()
850
851 def add_ssh_key(self, key):
852 """Add a public SSH key to this model.
853
854 :param str key: The public ssh key
855
856 """
857 raise NotImplementedError()
858 add_ssh_keys = add_ssh_key
859
860 def add_subnet(self, cidr_or_id, space, *zones):
861 """Add an existing subnet to this model.
862
863 :param str cidr_or_id: CIDR or provider ID of the existing subnet
864 :param str space: Network space with which to associate
865 :param str \*zones: Zone(s) in which the subnet resides
866
867 """
868 raise NotImplementedError()
869
870 def get_backups(self):
871 """Retrieve metadata for backups in this model.
872
873 """
874 raise NotImplementedError()
875
876 def block(self, *commands):
877 """Add a new block to this model.
878
879 :param str \*commands: The commands to block. Valid values are
880 'all-changes', 'destroy-model', 'remove-object'
881
882 """
883 raise NotImplementedError()
884
885 def get_blocks(self):
886 """List blocks for this model.
887
888 """
889 raise NotImplementedError()
890
891 def get_cached_images(self, arch=None, kind=None, series=None):
892 """Return a list of cached OS images.
893
894 :param str arch: Filter by image architecture
895 :param str kind: Filter by image kind, e.g. 'lxd'
896 :param str series: Filter by image series, e.g. 'xenial'
897
898 """
899 raise NotImplementedError()
900
901 def create_backup(self, note=None, no_download=False):
902 """Create a backup of this model.
903
904 :param str note: A note to store with the backup
905 :param bool no_download: Do not download the backup archive
906 :return str: Path to downloaded archive
907
908 """
909 raise NotImplementedError()
910
911 def create_storage_pool(self, name, provider_type, **pool_config):
912 """Create or define a storage pool.
913
914 :param str name: Name to give the storage pool
915 :param str provider_type: Pool provider type
916 :param \*\*pool_config: key/value pool configuration pairs
917
918 """
919 raise NotImplementedError()
920
921 def debug_log(
922 self, no_tail=False, exclude_module=None, include_module=None,
923 include=None, level=None, limit=0, lines=10, replay=False,
924 exclude=None):
925 """Get log messages for this model.
926
927 :param bool no_tail: Stop after returning existing log messages
928 :param list exclude_module: Do not show log messages for these logging
929 modules
930 :param list include_module: Only show log messages for these logging
931 modules
932 :param list include: Only show log messages for these entities
933 :param str level: Log level to show, valid options are 'TRACE',
934 'DEBUG', 'INFO', 'WARNING', 'ERROR,
935 :param int limit: Return this many of the most recent (possibly
936 filtered) lines are shown
937 :param int lines: Yield this many of the most recent lines, and keep
938 yielding
939 :param bool replay: Yield the entire log, and keep yielding
940 :param list exclude: Do not show log messages for these entities
941
942 """
943 raise NotImplementedError()
944
945 def _get_series(self, entity_url, entity):
946 # try to get the series from the provided charm URL
947 if entity_url.startswith('cs:'):
948 parts = entity_url[3:].split('/')
949 else:
950 parts = entity_url.split('/')
951 if parts[0].startswith('~'):
952 parts.pop(0)
953 if len(parts) > 1:
954 # series was specified in the URL
955 return parts[0]
956 # series was not supplied at all, so use the newest
957 # supported series according to the charm store
958 ss = entity['Meta']['supported-series']
959 return ss['SupportedSeries'][0]
960
961 async def deploy(
962 self, entity_url, application_name=None, bind=None, budget=None,
963 channel=None, config=None, constraints=None, force=False,
964 num_units=1, plan=None, resources=None, series=None, storage=None,
965 to=None):
966 """Deploy a new service or bundle.
967
968 :param str entity_url: Charm or bundle url
969 :param str application_name: Name to give the service
970 :param dict bind: <charm endpoint>:<network space> pairs
971 :param dict budget: <budget name>:<limit> pairs
972 :param str channel: Charm store channel from which to retrieve
973 the charm or bundle, e.g. 'development'
974 :param dict config: Charm configuration dictionary
975 :param constraints: Service constraints
976 :type constraints: :class:`juju.Constraints`
977 :param bool force: Allow charm to be deployed to a machine running
978 an unsupported series
979 :param int num_units: Number of units to deploy
980 :param str plan: Plan under which to deploy charm
981 :param dict resources: <resource name>:<file path> pairs
982 :param str series: Series on which to deploy
983 :param dict storage: Storage constraints TODO how do these look?
984 :param to: Placement directive as a string. For example:
985
986 '23' - place on machine 23
987 'lxd:7' - place in new lxd container on machine 7
988 '24/lxd/3' - place in container 3 on machine 24
989
990 If None, a new machine is provisioned.
991
992
993 TODO::
994
995 - application_name is required; fill this in automatically if not
996 provided by caller
997 - series is required; how do we pick a default?
998
999 """
1000 if storage:
1001 storage = {
1002 k: client.Constraints(**v)
1003 for k, v in storage.items()
1004 }
1005
1006 is_local = (
1007 entity_url.startswith('local:') or
1008 os.path.isdir(entity_url)
1009 )
1010 if is_local:
1011 entity_id = entity_url
1012 else:
1013 entity = await self.charmstore.entity(entity_url)
1014 entity_id = entity['Id']
1015
1016 client_facade = client.ClientFacade()
1017 client_facade.connect(self.connection)
1018
1019 is_bundle = ((is_local and
1020 (Path(entity_id) / 'bundle.yaml').exists()) or
1021 (not is_local and 'bundle/' in entity_id))
1022
1023 if is_bundle:
1024 handler = BundleHandler(self)
1025 await handler.fetch_plan(entity_id)
1026 await handler.execute_plan()
1027 extant_apps = {app for app in self.applications}
1028 pending_apps = set(handler.applications) - extant_apps
1029 if pending_apps:
1030 # new apps will usually be in the model by now, but if some
1031 # haven't made it yet we'll need to wait on them to be added
1032 await asyncio.gather(*[
1033 asyncio.ensure_future(
1034 self._wait_for_new('application', app_name),
1035 loop=self.loop)
1036 for app_name in pending_apps
1037 ], loop=self.loop)
1038 return [app for name, app in self.applications.items()
1039 if name in handler.applications]
1040 else:
1041 if not is_local:
1042 if not application_name:
1043 application_name = entity['Meta']['charm-metadata']['Name']
1044 if not series:
1045 series = self._get_series(entity_url, entity)
1046 if not channel:
1047 channel = 'stable'
1048 await client_facade.AddCharm(channel, entity_id)
1049 else:
1050 # We have a local charm dir that needs to be uploaded
1051 charm_dir = os.path.abspath(
1052 os.path.expanduser(entity_id))
1053 series = series or get_charm_series(charm_dir)
1054 if not series:
1055 raise JujuError(
1056 "Couldn't determine series for charm at {}. "
1057 "Pass a 'series' kwarg to Model.deploy().".format(
1058 charm_dir))
1059 entity_id = await self.add_local_charm_dir(charm_dir, series)
1060 return await self._deploy(
1061 charm_url=entity_id,
1062 application=application_name,
1063 series=series,
1064 config=config or {},
1065 constraints=constraints,
1066 endpoint_bindings=bind,
1067 resources=resources,
1068 storage=storage,
1069 channel=channel,
1070 num_units=num_units,
1071 placement=parse_placement(to),
1072 )
1073
1074 async def _deploy(self, charm_url, application, series, config,
1075 constraints, endpoint_bindings, resources, storage,
1076 channel=None, num_units=None, placement=None):
1077 """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
1078 """
1079 log.info('Deploying %s', charm_url)
1080
1081 # stringify all config values for API, and convert to YAML
1082 config = {k: str(v) for k, v in config.items()}
1083 config = yaml.dump({application: config},
1084 default_flow_style=False)
1085
1086 app_facade = client.ApplicationFacade()
1087 app_facade.connect(self.connection)
1088
1089 app = client.ApplicationDeploy(
1090 charm_url=charm_url,
1091 application=application,
1092 series=series,
1093 channel=channel,
1094 config_yaml=config,
1095 constraints=parse_constraints(constraints),
1096 endpoint_bindings=endpoint_bindings,
1097 num_units=num_units,
1098 resources=resources,
1099 storage=storage,
1100 placement=placement,
1101 )
1102
1103 result = await app_facade.Deploy([app])
1104 errors = [r.error.message for r in result.results if r.error]
1105 if errors:
1106 raise JujuError('\n'.join(errors))
1107 return await self._wait_for_new('application', application)
1108
1109 def destroy(self):
1110 """Terminate all machines and resources for this model.
1111
1112 """
1113 raise NotImplementedError()
1114
1115 async def destroy_unit(self, *unit_names):
1116 """Destroy units by name.
1117
1118 """
1119 app_facade = client.ApplicationFacade()
1120 app_facade.connect(self.connection)
1121
1122 log.debug(
1123 'Destroying unit%s %s',
1124 's' if len(unit_names) == 1 else '',
1125 ' '.join(unit_names))
1126
1127 return await app_facade.DestroyUnits(list(unit_names))
1128 destroy_units = destroy_unit
1129
1130 def get_backup(self, archive_id):
1131 """Download a backup archive file.
1132
1133 :param str archive_id: The id of the archive to download
1134 :return str: Path to the archive file
1135
1136 """
1137 raise NotImplementedError()
1138
1139 def enable_ha(
1140 self, num_controllers=0, constraints=None, series=None, to=None):
1141 """Ensure sufficient controllers exist to provide redundancy.
1142
1143 :param int num_controllers: Number of controllers to make available
1144 :param constraints: Constraints to apply to the controller machines
1145 :type constraints: :class:`juju.Constraints`
1146 :param str series: Series of the controller machines
1147 :param list to: Placement directives for controller machines, e.g.::
1148
1149 '23' - machine 23
1150 'lxc:7' - new lxc container on machine 7
1151 '24/lxc/3' - lxc container 3 or machine 24
1152
1153 If None, a new machine is provisioned.
1154
1155 """
1156 raise NotImplementedError()
1157
1158 def get_config(self):
1159 """Return the configuration settings for this model.
1160
1161 """
1162 raise NotImplementedError()
1163
1164 def get_constraints(self):
1165 """Return the machine constraints for this model.
1166
1167 """
1168 raise NotImplementedError()
1169
1170 def grant(self, username, acl='read'):
1171 """Grant a user access to this model.
1172
1173 :param str username: Username
1174 :param str acl: Access control ('read' or 'write')
1175
1176 """
1177 raise NotImplementedError()
1178
1179 def import_ssh_key(self, identity):
1180 """Add a public SSH key from a trusted indentity source to this model.
1181
1182 :param str identity: User identity in the form <lp|gh>:<username>
1183
1184 """
1185 raise NotImplementedError()
1186 import_ssh_keys = import_ssh_key
1187
1188 def get_machines(self, machine, utc=False):
1189 """Return list of machines in this model.
1190
1191 :param str machine: Machine id, e.g. '0'
1192 :param bool utc: Display time as UTC in RFC3339 format
1193
1194 """
1195 raise NotImplementedError()
1196
1197 def get_shares(self):
1198 """Return list of all users with access to this model.
1199
1200 """
1201 raise NotImplementedError()
1202
1203 def get_spaces(self):
1204 """Return list of all known spaces, including associated subnets.
1205
1206 """
1207 raise NotImplementedError()
1208
1209 def get_ssh_key(self):
1210 """Return known SSH keys for this model.
1211
1212 """
1213 raise NotImplementedError()
1214 get_ssh_keys = get_ssh_key
1215
1216 def get_storage(self, filesystem=False, volume=False):
1217 """Return details of storage instances.
1218
1219 :param bool filesystem: Include filesystem storage
1220 :param bool volume: Include volume storage
1221
1222 """
1223 raise NotImplementedError()
1224
1225 def get_storage_pools(self, names=None, providers=None):
1226 """Return list of storage pools.
1227
1228 :param list names: Only include pools with these names
1229 :param list providers: Only include pools for these providers
1230
1231 """
1232 raise NotImplementedError()
1233
1234 def get_subnets(self, space=None, zone=None):
1235 """Return list of known subnets.
1236
1237 :param str space: Only include subnets in this space
1238 :param str zone: Only include subnets in this zone
1239
1240 """
1241 raise NotImplementedError()
1242
1243 def remove_blocks(self):
1244 """Remove all blocks from this model.
1245
1246 """
1247 raise NotImplementedError()
1248
1249 def remove_backup(self, backup_id):
1250 """Delete a backup.
1251
1252 :param str backup_id: The id of the backup to remove
1253
1254 """
1255 raise NotImplementedError()
1256
1257 def remove_cached_images(self, arch=None, kind=None, series=None):
1258 """Remove cached OS images.
1259
1260 :param str arch: Architecture of the images to remove
1261 :param str kind: Image kind to remove, e.g. 'lxd'
1262 :param str series: Image series to remove, e.g. 'xenial'
1263
1264 """
1265 raise NotImplementedError()
1266
1267 def remove_machine(self, *machine_ids):
1268 """Remove a machine from this model.
1269
1270 :param str \*machine_ids: Ids of the machines to remove
1271
1272 """
1273 raise NotImplementedError()
1274 remove_machines = remove_machine
1275
1276 def remove_ssh_key(self, *keys):
1277 """Remove a public SSH key(s) from this model.
1278
1279 :param str \*keys: Keys to remove
1280
1281 """
1282 raise NotImplementedError()
1283 remove_ssh_keys = remove_ssh_key
1284
1285 def restore_backup(
1286 self, bootstrap=False, constraints=None, archive=None,
1287 backup_id=None, upload_tools=False):
1288 """Restore a backup archive to a new controller.
1289
1290 :param bool bootstrap: Bootstrap a new state machine
1291 :param constraints: Model constraints
1292 :type constraints: :class:`juju.Constraints`
1293 :param str archive: Path to backup archive to restore
1294 :param str backup_id: Id of backup to restore
1295 :param bool upload_tools: Upload tools if bootstrapping a new machine
1296
1297 """
1298 raise NotImplementedError()
1299
1300 def retry_provisioning(self):
1301 """Retry provisioning for failed machines.
1302
1303 """
1304 raise NotImplementedError()
1305
1306 def revoke(self, username, acl='read'):
1307 """Revoke a user's access to this model.
1308
1309 :param str username: Username to revoke
1310 :param str acl: Access control ('read' or 'write')
1311
1312 """
1313 raise NotImplementedError()
1314
1315 def run(self, command, timeout=None):
1316 """Run command on all machines in this model.
1317
1318 :param str command: The command to run
1319 :param int timeout: Time to wait before command is considered failed
1320
1321 """
1322 raise NotImplementedError()
1323
1324 def set_config(self, **config):
1325 """Set configuration keys on this model.
1326
1327 :param \*\*config: Config key/values
1328
1329 """
1330 raise NotImplementedError()
1331
1332 def set_constraints(self, constraints):
1333 """Set machine constraints on this model.
1334
1335 :param :class:`juju.Constraints` constraints: Machine constraints
1336
1337 """
1338 raise NotImplementedError()
1339
1340 def get_action_output(self, action_uuid, wait=-1):
1341 """Get the results of an action by ID.
1342
1343 :param str action_uuid: Id of the action
1344 :param int wait: Time in seconds to wait for action to complete
1345
1346 """
1347 raise NotImplementedError()
1348
1349 def get_action_status(self, uuid_or_prefix=None, name=None):
1350 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1351
1352 :param str uuid_or_prefix: Filter by action uuid or prefix
1353 :param str name: Filter by action name
1354
1355 """
1356 raise NotImplementedError()
1357
1358 def get_budget(self, budget_name):
1359 """Get budget usage info.
1360
1361 :param str budget_name: Name of budget
1362
1363 """
1364 raise NotImplementedError()
1365
1366 async def get_status(self, filters=None, utc=False):
1367 """Return the status of the model.
1368
1369 :param str filters: Optional list of applications, units, or machines
1370 to include, which can use wildcards ('*').
1371 :param bool utc: Display time as UTC in RFC3339 format
1372
1373 """
1374 client_facade = client.ClientFacade()
1375 client_facade.connect(self.connection)
1376 return await client_facade.FullStatus(filters)
1377
1378 def sync_tools(
1379 self, all_=False, destination=None, dry_run=False, public=False,
1380 source=None, stream=None, version=None):
1381 """Copy Juju tools into this model.
1382
1383 :param bool all_: Copy all versions, not just the latest
1384 :param str destination: Path to local destination directory
1385 :param bool dry_run: Don't do the actual copy
1386 :param bool public: Tools are for a public cloud, so generate mirrors
1387 information
1388 :param str source: Path to local source directory
1389 :param str stream: Simplestreams stream for which to sync metadata
1390 :param str version: Copy a specific major.minor version
1391
1392 """
1393 raise NotImplementedError()
1394
1395 def unblock(self, *commands):
1396 """Unblock an operation that would alter this model.
1397
1398 :param str \*commands: The commands to unblock. Valid values are
1399 'all-changes', 'destroy-model', 'remove-object'
1400
1401 """
1402 raise NotImplementedError()
1403
1404 def unset_config(self, *keys):
1405 """Unset configuration on this model.
1406
1407 :param str \*keys: The keys to unset
1408
1409 """
1410 raise NotImplementedError()
1411
1412 def upgrade_gui(self):
1413 """Upgrade the Juju GUI for this model.
1414
1415 """
1416 raise NotImplementedError()
1417
1418 def upgrade_juju(
1419 self, dry_run=False, reset_previous_upgrade=False,
1420 upload_tools=False, version=None):
1421 """Upgrade Juju on all machines in a model.
1422
1423 :param bool dry_run: Don't do the actual upgrade
1424 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1425 upgrade status
1426 :param bool upload_tools: Upload local version of tools
1427 :param str version: Upgrade to a specific version
1428
1429 """
1430 raise NotImplementedError()
1431
1432 def upload_backup(self, archive_path):
1433 """Store a backup archive remotely in Juju.
1434
1435 :param str archive_path: Path to local archive
1436
1437 """
1438 raise NotImplementedError()
1439
1440 @property
1441 def charmstore(self):
1442 return self._charmstore
1443
1444 async def get_metrics(self, *tags):
1445 """Retrieve metrics.
1446
1447 :param str \*tags: Tags of entities from which to retrieve metrics.
1448 No tags retrieves the metrics of all units in the model.
1449 :return: Dictionary of unit_name:metrics
1450
1451 """
1452 log.debug("Retrieving metrics for %s",
1453 ', '.join(tags) if tags else "all units")
1454
1455 metrics_facade = client.MetricsDebugFacade()
1456 metrics_facade.connect(self.connection)
1457
1458 entities = [client.Entity(tag) for tag in tags]
1459 metrics_result = await metrics_facade.GetMetrics(entities)
1460
1461 metrics = collections.defaultdict(list)
1462
1463 for entity_metrics in metrics_result.results:
1464 error = entity_metrics.error
1465 if error:
1466 if "is not a valid tag" in error:
1467 raise ValueError(error.message)
1468 else:
1469 raise Exception(error.message)
1470
1471 for metric in entity_metrics.metrics:
1472 metrics[metric.unit].append(vars(metric))
1473
1474 return metrics
1475
1476
1477 def get_charm_series(path):
1478 """Inspects the charm directory at ``path`` and returns a default
1479 series from its metadata.yaml (the first item in the 'series' list).
1480
1481 Returns None if no series can be determined.
1482
1483 """
1484 md = Path(path) / "metadata.yaml"
1485 if not md.exists():
1486 return None
1487 data = yaml.load(md.open())
1488 series = data.get('series')
1489 return series[0] if series else None
1490
1491
1492 class BundleHandler(object):
1493 """
1494 Handle bundles by using the API to translate bundle YAML into a plan of
1495 steps and then dispatching each of those using the API.
1496 """
1497 def __init__(self, model):
1498 self.model = model
1499 self.charmstore = model.charmstore
1500 self.plan = []
1501 self.references = {}
1502 self._units_by_app = {}
1503 for unit_name, unit in model.units.items():
1504 app_units = self._units_by_app.setdefault(unit.application, [])
1505 app_units.append(unit_name)
1506 self.client_facade = client.ClientFacade()
1507 self.client_facade.connect(model.connection)
1508 self.app_facade = client.ApplicationFacade()
1509 self.app_facade.connect(model.connection)
1510 self.ann_facade = client.AnnotationsFacade()
1511 self.ann_facade.connect(model.connection)
1512
1513 async def _handle_local_charms(self, bundle):
1514 """Search for references to local charms (i.e. filesystem paths)
1515 in the bundle. Upload the local charms to the model, and replace
1516 the filesystem paths with appropriate 'local:' paths in the bundle.
1517
1518 Return the modified bundle.
1519
1520 :param dict bundle: Bundle dictionary
1521 :return: Modified bundle dictionary
1522
1523 """
1524 apps, args = [], []
1525
1526 default_series = bundle.get('series')
1527 for app_name in self.applications:
1528 app_dict = bundle['services'][app_name]
1529 charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
1530 if not os.path.isdir(charm_dir):
1531 continue
1532 series = (
1533 app_dict.get('series') or
1534 default_series or
1535 get_charm_series(charm_dir)
1536 )
1537 if not series:
1538 raise JujuError(
1539 "Couldn't determine series for charm at {}. "
1540 "Add a 'series' key to the bundle.".format(charm_dir))
1541
1542 # Keep track of what we need to update. We keep a list of apps
1543 # that need to be updated, and a corresponding list of args
1544 # needed to update those apps.
1545 apps.append(app_name)
1546 args.append((charm_dir, series))
1547
1548 if apps:
1549 # If we have apps to update, spawn all the coroutines concurrently
1550 # and wait for them to finish.
1551 charm_urls = await asyncio.gather(*[
1552 self.model.add_local_charm_dir(*params)
1553 for params in args
1554 ], loop=self.model.loop)
1555 # Update the 'charm:' entry for each app with the new 'local:' url.
1556 for app_name, charm_url in zip(apps, charm_urls):
1557 bundle['services'][app_name]['charm'] = charm_url
1558
1559 return bundle
1560
1561 async def fetch_plan(self, entity_id):
1562 is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
1563 if is_local:
1564 bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
1565 else:
1566 bundle_yaml = await self.charmstore.files(entity_id,
1567 filename='bundle.yaml',
1568 read_file=True)
1569 self.bundle = yaml.safe_load(bundle_yaml)
1570 self.bundle = await self._handle_local_charms(self.bundle)
1571
1572 self.plan = await self.client_facade.GetBundleChanges(
1573 yaml.dump(self.bundle))
1574
1575 async def execute_plan(self):
1576 for step in self.plan.changes:
1577 method = getattr(self, step.method)
1578 result = await method(*step.args)
1579 self.references[step.id_] = result
1580
1581 @property
1582 def applications(self):
1583 return list(self.bundle['services'].keys())
1584
1585 def resolve(self, reference):
1586 if reference and reference.startswith('$'):
1587 reference = self.references[reference[1:]]
1588 return reference
1589
1590 async def addCharm(self, charm, series):
1591 """
1592 :param charm string:
1593 Charm holds the URL of the charm to be added.
1594
1595 :param series string:
1596 Series holds the series of the charm to be added
1597 if the charm default is not sufficient.
1598 """
1599 # We don't add local charms because they've already been added
1600 # by self._handle_local_charms
1601 if charm.startswith('local:'):
1602 return charm
1603
1604 entity_id = await self.charmstore.entityId(charm)
1605 log.debug('Adding %s', entity_id)
1606 await self.client_facade.AddCharm(None, entity_id)
1607 return entity_id
1608
1609 async def addMachines(self, params=None):
1610 """
1611 :param params dict:
1612 Dictionary specifying the machine to add. All keys are optional.
1613 Keys include:
1614
1615 series: string specifying the machine OS series.
1616
1617 constraints: string holding machine constraints, if any. We'll
1618 parse this into the json friendly dict that the juju api
1619 expects.
1620
1621 container_type: string holding the type of the container (for
1622 instance ""lxd" or kvm"). It is not specified for top level
1623 machines.
1624
1625 parent_id: string holding a placeholder pointing to another
1626 machine change or to a unit change. This value is only
1627 specified in the case this machine is a container, in
1628 which case also ContainerType is set.
1629
1630 """
1631 params = params or {}
1632
1633 # Normalize keys
1634 params = {normalize_key(k): params[k] for k in params.keys()}
1635
1636 # Fix up values, as necessary.
1637 if 'parent_id' in params:
1638 params['parent_id'] = self.resolve(params['parent_id'])
1639
1640 params['constraints'] = parse_constraints(
1641 params.get('constraints'))
1642 params['jobs'] = params.get('jobs', ['JobHostUnits'])
1643
1644 if params.get('container_type') == 'lxc':
1645 log.warning('Juju 2.0 does not support lxc containers. '
1646 'Converting containers to lxd.')
1647 params['container_type'] = 'lxd'
1648
1649 # Submit the request.
1650 params = client.AddMachineParams(**params)
1651 results = await self.client_facade.AddMachines([params])
1652 error = results.machines[0].error
1653 if error:
1654 raise ValueError("Error adding machine: %s", error.message)
1655 machine = results.machines[0].machine
1656 log.debug('Added new machine %s', machine)
1657 return machine
1658
1659 async def addRelation(self, endpoint1, endpoint2):
1660 """
1661 :param endpoint1 string:
1662 :param endpoint2 string:
1663 Endpoint1 and Endpoint2 hold relation endpoints in the
1664 "application:interface" form, where the application is always a
1665 placeholder pointing to an application change, and the interface is
1666 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1667 """
1668 endpoints = [endpoint1, endpoint2]
1669 # resolve indirect references
1670 for i in range(len(endpoints)):
1671 parts = endpoints[i].split(':')
1672 parts[0] = self.resolve(parts[0])
1673 endpoints[i] = ':'.join(parts)
1674
1675 log.info('Relating %s <-> %s', *endpoints)
1676 return await self.model.add_relation(*endpoints)
1677
1678 async def deploy(self, charm, series, application, options, constraints,
1679 storage, endpoint_bindings, resources):
1680 """
1681 :param charm string:
1682 Charm holds the URL of the charm to be used to deploy this
1683 application.
1684
1685 :param series string:
1686 Series holds the series of the application to be deployed
1687 if the charm default is not sufficient.
1688
1689 :param application string:
1690 Application holds the application name.
1691
1692 :param options map[string]interface{}:
1693 Options holds application options.
1694
1695 :param constraints string:
1696 Constraints holds the optional application constraints.
1697
1698 :param storage map[string]string:
1699 Storage holds the optional storage constraints.
1700
1701 :param endpoint_bindings map[string]string:
1702 EndpointBindings holds the optional endpoint bindings
1703
1704 :param resources map[string]int:
1705 Resources identifies the revision to use for each resource
1706 of the application's charm.
1707 """
1708 # resolve indirect references
1709 charm = self.resolve(charm)
1710 await self.model._deploy(
1711 charm_url=charm,
1712 application=application,
1713 series=series,
1714 config=options,
1715 constraints=constraints,
1716 endpoint_bindings=endpoint_bindings,
1717 resources=resources,
1718 storage=storage,
1719 )
1720 return application
1721
1722 async def addUnit(self, application, to):
1723 """
1724 :param application string:
1725 Application holds the application placeholder name for which a unit
1726 is added.
1727
1728 :param to string:
1729 To holds the optional location where to add the unit, as a
1730 placeholder pointing to another unit change or to a machine change.
1731 """
1732 application = self.resolve(application)
1733 placement = self.resolve(to)
1734 if self._units_by_app.get(application):
1735 # enough units for this application already exist;
1736 # claim one, and carry on
1737 # NB: this should probably honor placement, but the juju client
1738 # doesn't, so we're not bothering, either
1739 unit_name = self._units_by_app[application].pop()
1740 log.debug('Reusing unit %s for %s', unit_name, application)
1741 return self.model.units[unit_name]
1742
1743 log.debug('Adding new unit for %s%s', application,
1744 ' to %s' % placement if placement else '')
1745 return await self.model.applications[application].add_unit(
1746 count=1,
1747 to=placement,
1748 )
1749
1750 async def expose(self, application):
1751 """
1752 :param application string:
1753 Application holds the placeholder name of the application that must
1754 be exposed.
1755 """
1756 application = self.resolve(application)
1757 log.info('Exposing %s', application)
1758 return await self.model.applications[application].expose()
1759
1760 async def setAnnotations(self, id_, entity_type, annotations):
1761 """
1762 :param id_ string:
1763 Id is the placeholder for the application or machine change
1764 corresponding to the entity to be annotated.
1765
1766 :param entity_type EntityType:
1767 EntityType holds the type of the entity, "application" or
1768 "machine".
1769
1770 :param annotations map[string]string:
1771 Annotations holds the annotations as key/value pairs.
1772 """
1773 entity_id = self.resolve(id_)
1774 try:
1775 entity = self.model.state.get_entity(entity_type, entity_id)
1776 except KeyError:
1777 entity = await self.model._wait_for_new(entity_type, entity_id)
1778 return await entity.set_annotations(annotations)
1779
1780
1781 class CharmStore(object):
1782 """
1783 Async wrapper around theblues.charmstore.CharmStore
1784 """
1785 def __init__(self, loop):
1786 self.loop = loop
1787 self._cs = theblues.charmstore.CharmStore(timeout=5)
1788
1789 def __getattr__(self, name):
1790 """
1791 Wrap method calls in coroutines that use run_in_executor to make them
1792 async.
1793 """
1794 attr = getattr(self._cs, name)
1795 if not callable(attr):
1796 wrapper = partial(getattr, self._cs, name)
1797 setattr(self, name, wrapper)
1798 else:
1799 async def coro(*args, **kwargs):
1800 method = partial(attr, *args, **kwargs)
1801 for attempt in range(1, 4):
1802 try:
1803 return await self.loop.run_in_executor(None, method)
1804 except theblues.errors.ServerError:
1805 if attempt == 3:
1806 raise
1807 await asyncio.sleep(1, loop=self.loop)
1808 setattr(self, name, coro)
1809 wrapper = coro
1810 return wrapper
1811
1812
1813 class CharmArchiveGenerator(object):
1814 def __init__(self, path):
1815 self.path = os.path.abspath(os.path.expanduser(path))
1816
1817 def make_archive(self, path):
1818 """Create archive of directory and write to ``path``.
1819
1820 :param path: Path to archive
1821
1822 Ignored::
1823
1824 * build/\* - This is used for packing the charm itself and any
1825 similar tasks.
1826 * \*/.\* - Hidden files are all ignored for now. This will most
1827 likely be changed into a specific ignore list
1828 (.bzr, etc)
1829
1830 """
1831 zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
1832 for dirpath, dirnames, filenames in os.walk(self.path):
1833 relative_path = dirpath[len(self.path) + 1:]
1834 if relative_path and not self._ignore(relative_path):
1835 zf.write(dirpath, relative_path)
1836 for name in filenames:
1837 archive_name = os.path.join(relative_path, name)
1838 if not self._ignore(archive_name):
1839 real_path = os.path.join(dirpath, name)
1840 self._check_type(real_path)
1841 if os.path.islink(real_path):
1842 self._check_link(real_path)
1843 self._write_symlink(
1844 zf, os.readlink(real_path), archive_name)
1845 else:
1846 zf.write(real_path, archive_name)
1847 zf.close()
1848 return path
1849
1850 def _check_type(self, path):
1851 """Check the path
1852 """
1853 s = os.stat(path)
1854 if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
1855 return path
1856 raise ValueError("Invalid Charm at % %s" % (
1857 path, "Invalid file type for a charm"))
1858
1859 def _check_link(self, path):
1860 link_path = os.readlink(path)
1861 if link_path[0] == "/":
1862 raise ValueError(
1863 "Invalid Charm at %s: %s" % (
1864 path, "Absolute links are invalid"))
1865 path_dir = os.path.dirname(path)
1866 link_path = os.path.join(path_dir, link_path)
1867 if not link_path.startswith(os.path.abspath(self.path)):
1868 raise ValueError(
1869 "Invalid charm at %s %s" % (
1870 path, "Only internal symlinks are allowed"))
1871
1872 def _write_symlink(self, zf, link_target, link_path):
1873 """Package symlinks with appropriate zipfile metadata."""
1874 info = zipfile.ZipInfo()
1875 info.filename = link_path
1876 info.create_system = 3
1877 # Magic code for symlinks / py2/3 compat
1878 # 27166663808 = (stat.S_IFLNK | 0755) << 16
1879 info.external_attr = 2716663808
1880 zf.writestr(info, link_target)
1881
1882 def _ignore(self, path):
1883 if path == "build" or path.startswith("build/"):
1884 return True
1885 if path.startswith('.'):
1886 return True