f162c7e8565592b66c28d7ae4d0ea18b66b59496
[osm/N2VC.git] / juju / model.py
1 import asyncio
2 import base64
3 import collections
4 import hashlib
5 import json
6 import logging
7 import os
8 import re
9 import stat
10 import tempfile
11 import weakref
12 import zipfile
13 from concurrent.futures import CancelledError
14 from functools import partial
15 from pathlib import Path
16
17 import yaml
18 import theblues.charmstore
19 import theblues.errors
20
21 from . import tag
22 from .client import client
23 from .client import connection
24 from .constraints import parse as parse_constraints, normalize_key
25 from .delta import get_entity_delta
26 from .delta import get_entity_class
27 from .exceptions import DeadEntityException
28 from .errors import JujuError, JujuAPIError
29 from .placement import parse as parse_placement
30
31 log = logging.getLogger(__name__)
32
33
34 class _Observer(object):
35 """Wrapper around an observer callable.
36
37 This wrapper allows filter criteria to be associated with the
38 callable so that it's only called for changes that meet the criteria.
39
40 """
41 def __init__(self, callable_, entity_type, action, entity_id, predicate):
42 self.callable_ = callable_
43 self.entity_type = entity_type
44 self.action = action
45 self.entity_id = entity_id
46 self.predicate = predicate
47 if self.entity_id:
48 self.entity_id = str(self.entity_id)
49 if not self.entity_id.startswith('^'):
50 self.entity_id = '^' + self.entity_id
51 if not self.entity_id.endswith('$'):
52 self.entity_id += '$'
53
54 async def __call__(self, delta, old, new, model):
55 await self.callable_(delta, old, new, model)
56
57 def cares_about(self, delta):
58 """Return True if this observer "cares about" (i.e. wants to be
59 called) for a this delta.
60
61 """
62 if (self.entity_id and delta.get_id() and
63 not re.match(self.entity_id, str(delta.get_id()))):
64 return False
65
66 if self.entity_type and self.entity_type != delta.entity:
67 return False
68
69 if self.action and self.action != delta.type:
70 return False
71
72 if self.predicate and not self.predicate(delta):
73 return False
74
75 return True
76
77
78 class ModelObserver(object):
79 async def __call__(self, delta, old, new, model):
80 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
81 method = getattr(self, handler_name, self.on_change)
82 await method(delta, old, new, model)
83
84 async def on_change(self, delta, old, new, model):
85 """Generic model-change handler.
86
87 :param delta: :class:`juju.client.overrides.Delta`
88 :param old: :class:`juju.model.ModelEntity`
89 :param new: :class:`juju.model.ModelEntity`
90 :param model: :class:`juju.model.Model`
91
92 """
93 pass
94
95
96 class ModelState(object):
97 """Holds the state of the model, including the delta history of all
98 entities in the model.
99
100 """
101 def __init__(self, model):
102 self.model = model
103 self.state = dict()
104
105 def _live_entity_map(self, entity_type):
106 """Return an id:Entity map of all the living entities of
107 type ``entity_type``.
108
109 """
110 return {
111 entity_id: self.get_entity(entity_type, entity_id)
112 for entity_id, history in self.state.get(entity_type, {}).items()
113 if history[-1] is not None
114 }
115
116 @property
117 def applications(self):
118 """Return a map of application-name:Application for all applications
119 currently in the model.
120
121 """
122 return self._live_entity_map('application')
123
124 @property
125 def machines(self):
126 """Return a map of machine-id:Machine for all machines currently in
127 the model.
128
129 """
130 return self._live_entity_map('machine')
131
132 @property
133 def units(self):
134 """Return a map of unit-id:Unit for all units currently in
135 the model.
136
137 """
138 return self._live_entity_map('unit')
139
140 def entity_history(self, entity_type, entity_id):
141 """Return the history deque for an entity.
142
143 """
144 return self.state[entity_type][entity_id]
145
146 def entity_data(self, entity_type, entity_id, history_index):
147 """Return the data dict for an entity at a specific index of its
148 history.
149
150 """
151 return self.entity_history(entity_type, entity_id)[history_index]
152
153 def apply_delta(self, delta):
154 """Apply delta to our state and return a copy of the
155 affected object as it was before and after the update, e.g.:
156
157 old_obj, new_obj = self.apply_delta(delta)
158
159 old_obj may be None if the delta is for the creation of a new object,
160 e.g. a new application or unit is deployed.
161
162 new_obj will never be None, but may be dead (new_obj.dead == True)
163 if the object was deleted as a result of the delta being applied.
164
165 """
166 history = (
167 self.state
168 .setdefault(delta.entity, {})
169 .setdefault(delta.get_id(), collections.deque())
170 )
171
172 history.append(delta.data)
173 if delta.type == 'remove':
174 history.append(None)
175
176 entity = self.get_entity(delta.entity, delta.get_id())
177 return entity.previous(), entity
178
179 def get_entity(
180 self, entity_type, entity_id, history_index=-1, connected=True):
181 """Return an object instance for the given entity_type and id.
182
183 By default the object state matches the most recent state from
184 Juju. To get an instance of the object in an older state, pass
185 history_index, an index into the history deque for the entity.
186
187 """
188
189 if history_index < 0 and history_index != -1:
190 history_index += len(self.entity_history(entity_type, entity_id))
191 if history_index < 0:
192 return None
193
194 try:
195 self.entity_data(entity_type, entity_id, history_index)
196 except IndexError:
197 return None
198
199 entity_class = get_entity_class(entity_type)
200 return entity_class(
201 entity_id, self.model, history_index=history_index,
202 connected=connected)
203
204
205 class ModelEntity(object):
206 """An object in the Model tree"""
207
208 def __init__(self, entity_id, model, history_index=-1, connected=True):
209 """Initialize a new entity
210
211 :param entity_id str: The unique id of the object in the model
212 :param model: The model instance in whose object tree this
213 entity resides
214 :history_index int: The index of this object's state in the model's
215 history deque for this entity
216 :connected bool: Flag indicating whether this object gets live updates
217 from the model.
218
219 """
220 self.entity_id = entity_id
221 self.model = model
222 self._history_index = history_index
223 self.connected = connected
224 self.connection = model.connection
225
226 def __repr__(self):
227 return '<{} entity_id="{}">'.format(type(self).__name__,
228 self.entity_id)
229
230 def __getattr__(self, name):
231 """Fetch object attributes from the underlying data dict held in the
232 model.
233
234 """
235 try:
236 return self.safe_data[name]
237 except KeyError:
238 name = name.replace('_', '-')
239 if name in self.safe_data:
240 return self.safe_data[name]
241 else:
242 raise
243
244 def __bool__(self):
245 return bool(self.data)
246
247 def on_change(self, callable_):
248 """Add a change observer to this entity.
249
250 """
251 self.model.add_observer(
252 callable_, self.entity_type, 'change', self.entity_id)
253
254 def on_remove(self, callable_):
255 """Add a remove observer to this entity.
256
257 """
258 self.model.add_observer(
259 callable_, self.entity_type, 'remove', self.entity_id)
260
261 @property
262 def entity_type(self):
263 """A string identifying the entity type of this object, e.g.
264 'application' or 'unit', etc.
265
266 """
267 return self.__class__.__name__.lower()
268
269 @property
270 def current(self):
271 """Return True if this object represents the current state of the
272 entity in the underlying model.
273
274 This will be True except when the object represents an entity at a
275 non-latest state in history, e.g. if the object was obtained by calling
276 .previous() on another object.
277
278 """
279 return self._history_index == -1
280
281 @property
282 def dead(self):
283 """Returns True if this entity no longer exists in the underlying
284 model.
285
286 """
287 return (
288 self.data is None or
289 self.model.state.entity_data(
290 self.entity_type, self.entity_id, -1) is None
291 )
292
293 @property
294 def alive(self):
295 """Returns True if this entity still exists in the underlying
296 model.
297
298 """
299 return not self.dead
300
301 @property
302 def data(self):
303 """The data dictionary for this entity.
304
305 """
306 return self.model.state.entity_data(
307 self.entity_type, self.entity_id, self._history_index)
308
309 @property
310 def safe_data(self):
311 """The data dictionary for this entity.
312
313 If this `ModelEntity` points to the dead state, it will
314 raise `DeadEntityException`.
315
316 """
317 if self.data is None:
318 raise DeadEntityException(
319 "Entity {}:{} is dead - its attributes can no longer be "
320 "accessed. Use the .previous() method on this object to get "
321 "a copy of the object at its previous state.".format(
322 self.entity_type, self.entity_id))
323 return self.data
324
325 def previous(self):
326 """Return a copy of this object as was at its previous state in
327 history.
328
329 Returns None if this object is new (and therefore has no history).
330
331 The returned object is always "disconnected", i.e. does not receive
332 live updates.
333
334 """
335 return self.model.state.get_entity(
336 self.entity_type, self.entity_id, self._history_index - 1,
337 connected=False)
338
339 def next(self):
340 """Return a copy of this object at its next state in
341 history.
342
343 Returns None if this object is already the latest.
344
345 The returned object is "disconnected", i.e. does not receive
346 live updates, unless it is current (latest).
347
348 """
349 if self._history_index == -1:
350 return None
351
352 new_index = self._history_index + 1
353 connected = (
354 new_index == len(self.model.state.entity_history(
355 self.entity_type, self.entity_id)) - 1
356 )
357 return self.model.state.get_entity(
358 self.entity_type, self.entity_id, self._history_index - 1,
359 connected=connected)
360
361 def latest(self):
362 """Return a copy of this object at its current state in the model.
363
364 Returns self if this object is already the latest.
365
366 The returned object is always "connected", i.e. receives
367 live updates from the model.
368
369 """
370 if self._history_index == -1:
371 return self
372
373 return self.model.state.get_entity(self.entity_type, self.entity_id)
374
375
376 class Model(object):
377 def __init__(self, loop=None):
378 """Instantiate a new connected Model.
379
380 :param loop: an asyncio event loop
381
382 """
383 self.loop = loop or asyncio.get_event_loop()
384 self.connection = None
385 self.observers = weakref.WeakValueDictionary()
386 self.state = ModelState(self)
387 self.info = None
388 self._watcher_task = None
389 self._watch_shutdown = asyncio.Event(loop=self.loop)
390 self._watch_received = asyncio.Event(loop=self.loop)
391 self._charmstore = CharmStore(self.loop)
392
393 async def connect(self, *args, **kw):
394 """Connect to an arbitrary Juju model.
395
396 args and kw are passed through to Connection.connect()
397
398 """
399 if 'loop' not in kw:
400 kw['loop'] = self.loop
401 self.connection = await connection.Connection.connect(*args, **kw)
402 await self._after_connect()
403
404 async def connect_current(self):
405 """Connect to the current Juju model.
406
407 """
408 self.connection = await connection.Connection.connect_current(
409 self.loop)
410 await self._after_connect()
411
412 async def connect_model(self, model_name):
413 """Connect to a specific Juju model by name.
414
415 :param model_name: Format [controller:][user/]model
416
417 """
418 self.connection = await connection.Connection.connect_model(model_name,
419 self.loop)
420 await self._after_connect()
421
422 async def _after_connect(self):
423 """Run initialization steps after connecting to websocket.
424
425 """
426 self._watch()
427 await self._watch_received.wait()
428 await self.get_info()
429
430 async def disconnect(self):
431 """Shut down the watcher task and close websockets.
432
433 """
434 self._stop_watching()
435 if self.connection and self.connection.is_open:
436 await self._watch_shutdown.wait()
437 log.debug('Closing model connection')
438 await self.connection.close()
439 self.connection = None
440
441 async def add_local_charm_dir(self, charm_dir, series):
442 """Upload a local charm to the model.
443
444 This will automatically generate an archive from
445 the charm dir.
446
447 :param charm_dir: Path to the charm directory
448 :param series: Charm series
449
450 """
451 fh = tempfile.NamedTemporaryFile()
452 CharmArchiveGenerator(charm_dir).make_archive(fh.name)
453 with fh:
454 func = partial(
455 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
456 charm_url = await self.loop.run_in_executor(None, func)
457
458 log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
459 return charm_url
460
461 def add_local_charm(self, charm_file, series, size=None):
462 """Upload a local charm archive to the model.
463
464 Returns the 'local:...' url that should be used to deploy the charm.
465
466 :param charm_file: Path to charm zip archive
467 :param series: Charm series
468 :param size: Size of the archive, in bytes
469 :return str: 'local:...' url for deploying the charm
470 :raises: :class:`JujuError` if the upload fails
471
472 Uses an https endpoint at the same host:port as the wss.
473 Supports large file uploads.
474
475 .. warning::
476
477 This method will block. Consider using :meth:`add_local_charm_dir`
478 instead.
479
480 """
481 conn, headers, path_prefix = self.connection.https_connection()
482 path = "%s/charms?series=%s" % (path_prefix, series)
483 headers['Content-Type'] = 'application/zip'
484 if size:
485 headers['Content-Length'] = size
486 conn.request("POST", path, charm_file, headers)
487 response = conn.getresponse()
488 result = response.read().decode()
489 if not response.status == 200:
490 raise JujuError(result)
491 result = json.loads(result)
492 return result['charm-url']
493
494 def all_units_idle(self):
495 """Return True if all units are idle.
496
497 """
498 for unit in self.units.values():
499 unit_status = unit.data['agent-status']['current']
500 if unit_status != 'idle':
501 return False
502 return True
503
504 async def reset(self, force=False):
505 """Reset the model to a clean state.
506
507 :param bool force: Force-terminate machines.
508
509 This returns only after the model has reached a clean state. "Clean"
510 means no applications or machines exist in the model.
511
512 """
513 log.debug('Resetting model')
514 for app in self.applications.values():
515 await app.destroy()
516 for machine in self.machines.values():
517 await machine.destroy(force=force)
518 await self.block_until(
519 lambda: len(self.machines) == 0
520 )
521
522 async def block_until(self, *conditions, timeout=None, wait_period=0.5):
523 """Return only after all conditions are true.
524
525 """
526 async def _block():
527 while not all(c() for c in conditions):
528 await asyncio.sleep(wait_period, loop=self.loop)
529 await asyncio.wait_for(_block(), timeout, loop=self.loop)
530
531 @property
532 def applications(self):
533 """Return a map of application-name:Application for all applications
534 currently in the model.
535
536 """
537 return self.state.applications
538
539 @property
540 def machines(self):
541 """Return a map of machine-id:Machine for all machines currently in
542 the model.
543
544 """
545 return self.state.machines
546
547 @property
548 def units(self):
549 """Return a map of unit-id:Unit for all units currently in
550 the model.
551
552 """
553 return self.state.units
554
555 async def get_info(self):
556 """Return a client.ModelInfo object for this Model.
557
558 Retrieves latest info for this Model from the api server. The
559 return value is cached on the Model.info attribute so that the
560 valued may be accessed again without another api call, if
561 desired.
562
563 This method is called automatically when the Model is connected,
564 resulting in Model.info being initialized without requiring an
565 explicit call to this method.
566
567 """
568 facade = client.ClientFacade.from_connection(self.connection)
569
570 self.info = await facade.ModelInfo()
571 log.debug('Got ModelInfo: %s', vars(self.info))
572
573 return self.info
574
575 def add_observer(
576 self, callable_, entity_type=None, action=None, entity_id=None,
577 predicate=None):
578 """Register an "on-model-change" callback
579
580 Once the model is connected, ``callable_``
581 will be called each time the model changes. ``callable_`` should
582 be Awaitable and accept the following positional arguments:
583
584 delta - An instance of :class:`juju.delta.EntityDelta`
585 containing the raw delta data recv'd from the Juju
586 websocket.
587
588 old_obj - If the delta modifies an existing object in the model,
589 old_obj will be a copy of that object, as it was before the
590 delta was applied. Will be None if the delta creates a new
591 entity in the model.
592
593 new_obj - A copy of the new or updated object, after the delta
594 is applied. Will be None if the delta removes an entity
595 from the model.
596
597 model - The :class:`Model` itself.
598
599 Events for which ``callable_`` is called can be specified by passing
600 entity_type, action, and/or entitiy_id filter criteria, e.g.::
601
602 add_observer(
603 myfunc,
604 entity_type='application', action='add', entity_id='ubuntu')
605
606 For more complex filtering conditions, pass a predicate function. It
607 will be called with a delta as its only argument. If the predicate
608 function returns True, the ``callable_`` will be called.
609
610 """
611 observer = _Observer(
612 callable_, entity_type, action, entity_id, predicate)
613 self.observers[observer] = callable_
614
615 def _watch(self):
616 """Start an asynchronous watch against this model.
617
618 See :meth:`add_observer` to register an onchange callback.
619
620 """
621 async def _start_watch():
622 self._watch_shutdown.clear()
623 try:
624 self._watch_conn = await self.connection.clone()
625 allwatcher = client.AllWatcherFacade.from_connection(
626 self._watch_conn)
627 while True:
628 results = await allwatcher.Next()
629 for delta in results.deltas:
630 delta = get_entity_delta(delta)
631 old_obj, new_obj = self.state.apply_delta(delta)
632 # XXX: Might not want to shield at this level
633 # We are shielding because when the watcher is
634 # canceled (on disconnect()), we don't want all of
635 # its children (every observer callback) to be
636 # canceled with it. So we shield them. But this means
637 # they can *never* be canceled.
638 await asyncio.shield(
639 self._notify_observers(delta, old_obj, new_obj),
640 loop=self.loop)
641 self._watch_received.set()
642 except CancelledError:
643 log.debug('Closing watcher connection')
644 await self._watch_conn.close()
645 self._watch_shutdown.set()
646 self._watch_conn = None
647 except Exception as e:
648 log.exception('Error in watcher')
649 raise
650
651 log.debug('Starting watcher task')
652 self._watcher_task = self.loop.create_task(_start_watch())
653
654 def _stop_watching(self):
655 """Stop the asynchronous watch against this model.
656
657 """
658 log.debug('Stopping watcher task')
659 if self._watcher_task:
660 self._watcher_task.cancel()
661
662 async def _notify_observers(self, delta, old_obj, new_obj):
663 """Call observing callbacks, notifying them of a change in model state
664
665 :param delta: The raw change from the watcher
666 (:class:`juju.client.overrides.Delta`)
667 :param old_obj: The object in the model that this delta updates.
668 May be None.
669 :param new_obj: The object in the model that is created or updated
670 by applying this delta.
671
672 """
673 if new_obj and not old_obj:
674 delta.type = 'add'
675
676 log.debug(
677 'Model changed: %s %s %s',
678 delta.entity, delta.type, delta.get_id())
679
680 for o in self.observers:
681 if o.cares_about(delta):
682 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
683 loop=self.loop)
684
685 async def _wait(self, entity_type, entity_id, action, predicate=None):
686 """
687 Block the calling routine until a given action has happened to the
688 given entity
689
690 :param entity_type: The entity's type.
691 :param entity_id: The entity's id.
692 :param action: the type of action (e.g., 'add', 'change', or 'remove')
693 :param predicate: optional callable that must take as an
694 argument a delta, and must return a boolean, indicating
695 whether the delta contains the specific action we're looking
696 for. For example, you might check to see whether a 'change'
697 has a 'completed' status. See the _Observer class for details.
698
699 """
700 q = asyncio.Queue(loop=self.loop)
701
702 async def callback(delta, old, new, model):
703 await q.put(delta.get_id())
704
705 self.add_observer(callback, entity_type, action, entity_id, predicate)
706 entity_id = await q.get()
707 # object might not be in the entity_map if we were waiting for a
708 # 'remove' action
709 return self.state._live_entity_map(entity_type).get(entity_id)
710
711 async def _wait_for_new(self, entity_type, entity_id=None, predicate=None):
712 """Wait for a new object to appear in the Model and return it.
713
714 Waits for an object of type ``entity_type`` with id ``entity_id``.
715 If ``entity_id`` is ``None``, it will wait for the first new entity
716 of the correct type.
717
718 This coroutine blocks until the new object appears in the model.
719
720 """
721 # if the entity is already in the model, just return it
722 if entity_id in self.state._live_entity_map(entity_type):
723 return self.state._live_entity_map(entity_type)[entity_id]
724 # if we know the entity_id, we can trigger on any action that puts
725 # the enitty into the model; otherwise, we have to watch for the
726 # next "add" action on that entity_type
727 action = 'add' if entity_id is None else None
728 return await self._wait(entity_type, entity_id, action, predicate)
729
730 async def wait_for_action(self, action_id):
731 """Given an action, wait for it to complete."""
732
733 if action_id.startswith("action-"):
734 # if we've been passed action.tag, transform it into the
735 # id that the api deltas will use.
736 action_id = action_id[7:]
737
738 def predicate(delta):
739 return delta.data['status'] in ('completed', 'failed')
740
741 return await self._wait('action', action_id, 'change', predicate)
742
743 async def add_machine(
744 self, spec=None, constraints=None, disks=None, series=None):
745 """Start a new, empty machine and optionally a container, or add a
746 container to a machine.
747
748 :param str spec: Machine specification
749 Examples::
750
751 (None) - starts a new machine
752 'lxd' - starts a new machine with one lxd container
753 'lxd:4' - starts a new lxd container on machine 4
754 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
755 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
756 'maas2.name' - acquire machine maas2.name on MAAS
757
758 :param dict constraints: Machine constraints
759 Example::
760
761 constraints={
762 'mem': 256 * MB,
763 }
764
765 :param list disks: List of disk constraint dictionaries
766 Example::
767
768 disks=[{
769 'pool': 'rootfs',
770 'size': 10 * GB,
771 'count': 1,
772 }]
773
774 :param str series: Series, e.g. 'xenial'
775
776 Supported container types are: lxd, kvm
777
778 When deploying a container to an existing machine, constraints cannot
779 be used.
780
781 """
782 params = client.AddMachineParams()
783 params.jobs = ['JobHostUnits']
784
785 if spec:
786 placement = parse_placement(spec)
787 if placement:
788 params.placement = placement[0]
789
790 if constraints:
791 params.constraints = client.Value.from_json(constraints)
792
793 if disks:
794 params.disks = [
795 client.Constraints.from_json(o) for o in disks]
796
797 if series:
798 params.series = series
799
800 # Submit the request.
801 client_facade = client.ClientFacade.from_connection(self.connection)
802 results = await client_facade.AddMachines([params])
803 error = results.machines[0].error
804 if error:
805 raise ValueError("Error adding machine: %s" % error.message)
806 machine_id = results.machines[0].machine
807 log.debug('Added new machine %s', machine_id)
808 return await self._wait_for_new('machine', machine_id)
809
810 async def add_relation(self, relation1, relation2):
811 """Add a relation between two applications.
812
813 :param str relation1: '<application>[:<relation_name>]'
814 :param str relation2: '<application>[:<relation_name>]'
815
816 """
817 app_facade = client.ApplicationFacade.from_connection(self.connection)
818
819 log.debug(
820 'Adding relation %s <-> %s', relation1, relation2)
821
822 try:
823 result = await app_facade.AddRelation([relation1, relation2])
824 except JujuAPIError as e:
825 if 'relation already exists' not in e.message:
826 raise
827 log.debug(
828 'Relation %s <-> %s already exists', relation1, relation2)
829 # TODO: if relation already exists we should return the
830 # Relation ModelEntity here
831 return None
832
833 def predicate(delta):
834 endpoints = {}
835 for endpoint in delta.data['endpoints']:
836 endpoints[endpoint['application-name']] = endpoint['relation']
837 return endpoints == result.endpoints
838
839 return await self._wait_for_new('relation', None, predicate)
840
841 def add_space(self, name, *cidrs):
842 """Add a new network space.
843
844 Adds a new space with the given name and associates the given
845 (optional) list of existing subnet CIDRs with it.
846
847 :param str name: Name of the space
848 :param \*cidrs: Optional list of existing subnet CIDRs
849
850 """
851 raise NotImplementedError()
852
853 async def add_ssh_key(self, user, key):
854 """Add a public SSH key to this model.
855
856 :param str user: The username of the user
857 :param str key: The public ssh key
858
859 """
860 key_facade = client.KeyManagerFacade.from_connection(self.connection)
861 return await key_facade.AddKeys([key], user)
862 add_ssh_keys = add_ssh_key
863
864 def add_subnet(self, cidr_or_id, space, *zones):
865 """Add an existing subnet to this model.
866
867 :param str cidr_or_id: CIDR or provider ID of the existing subnet
868 :param str space: Network space with which to associate
869 :param str \*zones: Zone(s) in which the subnet resides
870
871 """
872 raise NotImplementedError()
873
874 def get_backups(self):
875 """Retrieve metadata for backups in this model.
876
877 """
878 raise NotImplementedError()
879
880 def block(self, *commands):
881 """Add a new block to this model.
882
883 :param str \*commands: The commands to block. Valid values are
884 'all-changes', 'destroy-model', 'remove-object'
885
886 """
887 raise NotImplementedError()
888
889 def get_blocks(self):
890 """List blocks for this model.
891
892 """
893 raise NotImplementedError()
894
895 def get_cached_images(self, arch=None, kind=None, series=None):
896 """Return a list of cached OS images.
897
898 :param str arch: Filter by image architecture
899 :param str kind: Filter by image kind, e.g. 'lxd'
900 :param str series: Filter by image series, e.g. 'xenial'
901
902 """
903 raise NotImplementedError()
904
905 def create_backup(self, note=None, no_download=False):
906 """Create a backup of this model.
907
908 :param str note: A note to store with the backup
909 :param bool no_download: Do not download the backup archive
910 :return str: Path to downloaded archive
911
912 """
913 raise NotImplementedError()
914
915 def create_storage_pool(self, name, provider_type, **pool_config):
916 """Create or define a storage pool.
917
918 :param str name: Name to give the storage pool
919 :param str provider_type: Pool provider type
920 :param \*\*pool_config: key/value pool configuration pairs
921
922 """
923 raise NotImplementedError()
924
925 def debug_log(
926 self, no_tail=False, exclude_module=None, include_module=None,
927 include=None, level=None, limit=0, lines=10, replay=False,
928 exclude=None):
929 """Get log messages for this model.
930
931 :param bool no_tail: Stop after returning existing log messages
932 :param list exclude_module: Do not show log messages for these logging
933 modules
934 :param list include_module: Only show log messages for these logging
935 modules
936 :param list include: Only show log messages for these entities
937 :param str level: Log level to show, valid options are 'TRACE',
938 'DEBUG', 'INFO', 'WARNING', 'ERROR,
939 :param int limit: Return this many of the most recent (possibly
940 filtered) lines are shown
941 :param int lines: Yield this many of the most recent lines, and keep
942 yielding
943 :param bool replay: Yield the entire log, and keep yielding
944 :param list exclude: Do not show log messages for these entities
945
946 """
947 raise NotImplementedError()
948
949 def _get_series(self, entity_url, entity):
950 # try to get the series from the provided charm URL
951 if entity_url.startswith('cs:'):
952 parts = entity_url[3:].split('/')
953 else:
954 parts = entity_url.split('/')
955 if parts[0].startswith('~'):
956 parts.pop(0)
957 if len(parts) > 1:
958 # series was specified in the URL
959 return parts[0]
960 # series was not supplied at all, so use the newest
961 # supported series according to the charm store
962 ss = entity['Meta']['supported-series']
963 return ss['SupportedSeries'][0]
964
965 async def deploy(
966 self, entity_url, application_name=None, bind=None, budget=None,
967 channel=None, config=None, constraints=None, force=False,
968 num_units=1, plan=None, resources=None, series=None, storage=None,
969 to=None):
970 """Deploy a new service or bundle.
971
972 :param str entity_url: Charm or bundle url
973 :param str application_name: Name to give the service
974 :param dict bind: <charm endpoint>:<network space> pairs
975 :param dict budget: <budget name>:<limit> pairs
976 :param str channel: Charm store channel from which to retrieve
977 the charm or bundle, e.g. 'development'
978 :param dict config: Charm configuration dictionary
979 :param constraints: Service constraints
980 :type constraints: :class:`juju.Constraints`
981 :param bool force: Allow charm to be deployed to a machine running
982 an unsupported series
983 :param int num_units: Number of units to deploy
984 :param str plan: Plan under which to deploy charm
985 :param dict resources: <resource name>:<file path> pairs
986 :param str series: Series on which to deploy
987 :param dict storage: Storage constraints TODO how do these look?
988 :param to: Placement directive as a string. For example:
989
990 '23' - place on machine 23
991 'lxd:7' - place in new lxd container on machine 7
992 '24/lxd/3' - place in container 3 on machine 24
993
994 If None, a new machine is provisioned.
995
996
997 TODO::
998
999 - support local resources
1000
1001 """
1002 if storage:
1003 storage = {
1004 k: client.Constraints(**v)
1005 for k, v in storage.items()
1006 }
1007
1008 is_local = (
1009 entity_url.startswith('local:') or
1010 os.path.isdir(entity_url)
1011 )
1012 if is_local:
1013 entity_id = entity_url
1014 else:
1015 entity = await self.charmstore.entity(entity_url)
1016 entity_id = entity['Id']
1017
1018 client_facade = client.ClientFacade.from_connection(self.connection)
1019
1020 is_bundle = ((is_local and
1021 (Path(entity_id) / 'bundle.yaml').exists()) or
1022 (not is_local and 'bundle/' in entity_id))
1023
1024 if is_bundle:
1025 handler = BundleHandler(self)
1026 await handler.fetch_plan(entity_id)
1027 await handler.execute_plan()
1028 extant_apps = {app for app in self.applications}
1029 pending_apps = set(handler.applications) - extant_apps
1030 if pending_apps:
1031 # new apps will usually be in the model by now, but if some
1032 # haven't made it yet we'll need to wait on them to be added
1033 await asyncio.gather(*[
1034 asyncio.ensure_future(
1035 self._wait_for_new('application', app_name),
1036 loop=self.loop)
1037 for app_name in pending_apps
1038 ], loop=self.loop)
1039 return [app for name, app in self.applications.items()
1040 if name in handler.applications]
1041 else:
1042 if not is_local:
1043 if not application_name:
1044 application_name = entity['Meta']['charm-metadata']['Name']
1045 if not series:
1046 series = self._get_series(entity_url, entity)
1047 if not channel:
1048 channel = 'stable'
1049 await client_facade.AddCharm(channel, entity_id)
1050 # XXX: we're dropping local resources here, but we don't
1051 # actually support them yet anyway
1052 resources = await self._add_store_resources(application_name,
1053 entity_id,
1054 entity)
1055 else:
1056 # We have a local charm dir that needs to be uploaded
1057 charm_dir = os.path.abspath(
1058 os.path.expanduser(entity_id))
1059 series = series or get_charm_series(charm_dir)
1060 if not series:
1061 raise JujuError(
1062 "Couldn't determine series for charm at {}. "
1063 "Pass a 'series' kwarg to Model.deploy().".format(
1064 charm_dir))
1065 entity_id = await self.add_local_charm_dir(charm_dir, series)
1066 return await self._deploy(
1067 charm_url=entity_id,
1068 application=application_name,
1069 series=series,
1070 config=config or {},
1071 constraints=constraints,
1072 endpoint_bindings=bind,
1073 resources=resources,
1074 storage=storage,
1075 channel=channel,
1076 num_units=num_units,
1077 placement=parse_placement(to)
1078 )
1079
1080 async def _add_store_resources(self, application, entity_url, entity=None):
1081 if not entity:
1082 # avoid extra charm store call if one was already made
1083 entity = await self.charmstore.entity(entity_url)
1084 resources = [
1085 {
1086 'description': resource['Description'],
1087 'fingerprint': resource['Fingerprint'],
1088 'name': resource['Name'],
1089 'path': resource['Path'],
1090 'revision': resource['Revision'],
1091 'size': resource['Size'],
1092 'type_': resource['Type'],
1093 'origin': 'store',
1094 } for resource in entity['Meta']['resources']
1095 ]
1096
1097 if not resources:
1098 return None
1099
1100 resources_facade = client.ResourcesFacade.from_connection(
1101 self.connection)
1102 response = await resources_facade.AddPendingResources(
1103 tag.application(application),
1104 entity_url,
1105 [client.CharmResource(**resource) for resource in resources])
1106 resource_map = {resource['name']: pid
1107 for resource, pid
1108 in zip(resources, response.pending_ids)}
1109 return resource_map
1110
1111 async def _deploy(self, charm_url, application, series, config,
1112 constraints, endpoint_bindings, resources, storage,
1113 channel=None, num_units=None, placement=None):
1114 """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
1115 """
1116 log.info('Deploying %s', charm_url)
1117
1118 # stringify all config values for API, and convert to YAML
1119 config = {k: str(v) for k, v in config.items()}
1120 config = yaml.dump({application: config},
1121 default_flow_style=False)
1122
1123 app_facade = client.ApplicationFacade.from_connection(
1124 self.connection)
1125
1126 app = client.ApplicationDeploy(
1127 charm_url=charm_url,
1128 application=application,
1129 series=series,
1130 channel=channel,
1131 config_yaml=config,
1132 constraints=parse_constraints(constraints),
1133 endpoint_bindings=endpoint_bindings,
1134 num_units=num_units,
1135 resources=resources,
1136 storage=storage,
1137 placement=placement
1138 )
1139
1140 result = await app_facade.Deploy([app])
1141 errors = [r.error.message for r in result.results if r.error]
1142 if errors:
1143 raise JujuError('\n'.join(errors))
1144 return await self._wait_for_new('application', application)
1145
1146 async def destroy(self):
1147 """Terminate all machines and resources for this model.
1148 Is already implemented in controller.py.
1149 """
1150 raise NotImplementedError()
1151
1152 async def destroy_unit(self, *unit_names):
1153 """Destroy units by name.
1154
1155 """
1156 app_facade = client.ApplicationFacade.from_connection(self.connection)
1157
1158 log.debug(
1159 'Destroying unit%s %s',
1160 's' if len(unit_names) == 1 else '',
1161 ' '.join(unit_names))
1162
1163 return await app_facade.DestroyUnits(list(unit_names))
1164 destroy_units = destroy_unit
1165
1166 def get_backup(self, archive_id):
1167 """Download a backup archive file.
1168
1169 :param str archive_id: The id of the archive to download
1170 :return str: Path to the archive file
1171
1172 """
1173 raise NotImplementedError()
1174
1175 def enable_ha(
1176 self, num_controllers=0, constraints=None, series=None, to=None):
1177 """Ensure sufficient controllers exist to provide redundancy.
1178
1179 :param int num_controllers: Number of controllers to make available
1180 :param constraints: Constraints to apply to the controller machines
1181 :type constraints: :class:`juju.Constraints`
1182 :param str series: Series of the controller machines
1183 :param list to: Placement directives for controller machines, e.g.::
1184
1185 '23' - machine 23
1186 'lxc:7' - new lxc container on machine 7
1187 '24/lxc/3' - lxc container 3 or machine 24
1188
1189 If None, a new machine is provisioned.
1190
1191 """
1192 raise NotImplementedError()
1193
1194 def get_config(self):
1195 """Return the configuration settings for this model.
1196
1197 """
1198 raise NotImplementedError()
1199
1200 def get_constraints(self):
1201 """Return the machine constraints for this model.
1202
1203 """
1204 raise NotImplementedError()
1205
1206 async def grant(self, username, acl='read'):
1207 """Grant a user access to this model.
1208
1209 :param str username: Username
1210 :param str acl: Access control ('read' or 'write')
1211
1212 """
1213 controller_conn = await self.connection.controller()
1214 model_facade = client.ModelManagerFacade.from_connection(
1215 controller_conn)
1216 user = tag.user(username)
1217 model = tag.model(self.info.uuid)
1218 changes = client.ModifyModelAccess(acl, 'grant', model, user)
1219 await self.revoke(username)
1220 return await model_facade.ModifyModelAccess([changes])
1221
1222 def import_ssh_key(self, identity):
1223 """Add a public SSH key from a trusted indentity source to this model.
1224
1225 :param str identity: User identity in the form <lp|gh>:<username>
1226
1227 """
1228 raise NotImplementedError()
1229 import_ssh_keys = import_ssh_key
1230
1231 async def get_machines(self):
1232 """Return list of machines in this model.
1233
1234 """
1235 return list(self.state.machines.keys())
1236
1237 def get_shares(self):
1238 """Return list of all users with access to this model.
1239
1240 """
1241 raise NotImplementedError()
1242
1243 def get_spaces(self):
1244 """Return list of all known spaces, including associated subnets.
1245
1246 """
1247 raise NotImplementedError()
1248
1249 async def get_ssh_key(self, raw_ssh=False):
1250 """Return known SSH keys for this model.
1251 :param bool raw_ssh: if True, returns the raw ssh key,
1252 else it's fingerprint
1253
1254 """
1255 key_facade = client.KeyManagerFacade.from_connection(self.connection)
1256 entity = {'tag': tag.model(self.info.uuid)}
1257 entities = client.Entities([entity])
1258 return await key_facade.ListKeys(entities, raw_ssh)
1259 get_ssh_keys = get_ssh_key
1260
1261 def get_storage(self, filesystem=False, volume=False):
1262 """Return details of storage instances.
1263
1264 :param bool filesystem: Include filesystem storage
1265 :param bool volume: Include volume storage
1266
1267 """
1268 raise NotImplementedError()
1269
1270 def get_storage_pools(self, names=None, providers=None):
1271 """Return list of storage pools.
1272
1273 :param list names: Only include pools with these names
1274 :param list providers: Only include pools for these providers
1275
1276 """
1277 raise NotImplementedError()
1278
1279 def get_subnets(self, space=None, zone=None):
1280 """Return list of known subnets.
1281
1282 :param str space: Only include subnets in this space
1283 :param str zone: Only include subnets in this zone
1284
1285 """
1286 raise NotImplementedError()
1287
1288 def remove_blocks(self):
1289 """Remove all blocks from this model.
1290
1291 """
1292 raise NotImplementedError()
1293
1294 def remove_backup(self, backup_id):
1295 """Delete a backup.
1296
1297 :param str backup_id: The id of the backup to remove
1298
1299 """
1300 raise NotImplementedError()
1301
1302 def remove_cached_images(self, arch=None, kind=None, series=None):
1303 """Remove cached OS images.
1304
1305 :param str arch: Architecture of the images to remove
1306 :param str kind: Image kind to remove, e.g. 'lxd'
1307 :param str series: Image series to remove, e.g. 'xenial'
1308
1309 """
1310 raise NotImplementedError()
1311
1312 def remove_machine(self, *machine_ids):
1313 """Remove a machine from this model.
1314
1315 :param str \*machine_ids: Ids of the machines to remove
1316
1317 """
1318 raise NotImplementedError()
1319 remove_machines = remove_machine
1320
1321 async def remove_ssh_key(self, user, key):
1322 """Remove a public SSH key(s) from this model.
1323
1324 :param str key: Full ssh key
1325 :param str user: Juju user to which the key is registered
1326
1327 """
1328 key_facade = client.KeyManagerFacade.from_connection(self.connection)
1329 key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
1330 key = hashlib.md5(key).hexdigest()
1331 key = ':'.join(a+b for a, b in zip(key[::2], key[1::2]))
1332 await key_facade.DeleteKeys([key], user)
1333 remove_ssh_keys = remove_ssh_key
1334
1335 def restore_backup(
1336 self, bootstrap=False, constraints=None, archive=None,
1337 backup_id=None, upload_tools=False):
1338 """Restore a backup archive to a new controller.
1339
1340 :param bool bootstrap: Bootstrap a new state machine
1341 :param constraints: Model constraints
1342 :type constraints: :class:`juju.Constraints`
1343 :param str archive: Path to backup archive to restore
1344 :param str backup_id: Id of backup to restore
1345 :param bool upload_tools: Upload tools if bootstrapping a new machine
1346
1347 """
1348 raise NotImplementedError()
1349
1350 def retry_provisioning(self):
1351 """Retry provisioning for failed machines.
1352
1353 """
1354 raise NotImplementedError()
1355
1356 async def revoke(self, username):
1357 """Revoke a user's access to this model.
1358
1359 :param str username: Username to revoke
1360
1361 """
1362 controller_conn = await self.connection.controller()
1363 model_facade = client.ModelManagerFacade.from_connection(
1364 controller_conn)
1365 user = tag.user(username)
1366 model = tag.model(self.info.uuid)
1367 changes = client.ModifyModelAccess('read', 'revoke', model, user)
1368 return await model_facade.ModifyModelAccess([changes])
1369
1370 def run(self, command, timeout=None):
1371 """Run command on all machines in this model.
1372
1373 :param str command: The command to run
1374 :param int timeout: Time to wait before command is considered failed
1375
1376 """
1377 raise NotImplementedError()
1378
1379 def set_config(self, **config):
1380 """Set configuration keys on this model.
1381
1382 :param \*\*config: Config key/values
1383
1384 """
1385 raise NotImplementedError()
1386
1387 def set_constraints(self, constraints):
1388 """Set machine constraints on this model.
1389
1390 :param :class:`juju.Constraints` constraints: Machine constraints
1391
1392 """
1393 raise NotImplementedError()
1394
1395 def get_action_output(self, action_uuid, wait=-1):
1396 """Get the results of an action by ID.
1397
1398 :param str action_uuid: Id of the action
1399 :param int wait: Time in seconds to wait for action to complete
1400
1401 """
1402 raise NotImplementedError()
1403
1404 def get_action_status(self, uuid_or_prefix=None, name=None):
1405 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1406
1407 :param str uuid_or_prefix: Filter by action uuid or prefix
1408 :param str name: Filter by action name
1409
1410 """
1411 raise NotImplementedError()
1412
1413 def get_budget(self, budget_name):
1414 """Get budget usage info.
1415
1416 :param str budget_name: Name of budget
1417
1418 """
1419 raise NotImplementedError()
1420
1421 async def get_status(self, filters=None, utc=False):
1422 """Return the status of the model.
1423
1424 :param str filters: Optional list of applications, units, or machines
1425 to include, which can use wildcards ('*').
1426 :param bool utc: Display time as UTC in RFC3339 format
1427
1428 """
1429 client_facade = client.ClientFacade.from_connection(self.connection)
1430 return await client_facade.FullStatus(filters)
1431
1432 def sync_tools(
1433 self, all_=False, destination=None, dry_run=False, public=False,
1434 source=None, stream=None, version=None):
1435 """Copy Juju tools into this model.
1436
1437 :param bool all_: Copy all versions, not just the latest
1438 :param str destination: Path to local destination directory
1439 :param bool dry_run: Don't do the actual copy
1440 :param bool public: Tools are for a public cloud, so generate mirrors
1441 information
1442 :param str source: Path to local source directory
1443 :param str stream: Simplestreams stream for which to sync metadata
1444 :param str version: Copy a specific major.minor version
1445
1446 """
1447 raise NotImplementedError()
1448
1449 def unblock(self, *commands):
1450 """Unblock an operation that would alter this model.
1451
1452 :param str \*commands: The commands to unblock. Valid values are
1453 'all-changes', 'destroy-model', 'remove-object'
1454
1455 """
1456 raise NotImplementedError()
1457
1458 def unset_config(self, *keys):
1459 """Unset configuration on this model.
1460
1461 :param str \*keys: The keys to unset
1462
1463 """
1464 raise NotImplementedError()
1465
1466 def upgrade_gui(self):
1467 """Upgrade the Juju GUI for this model.
1468
1469 """
1470 raise NotImplementedError()
1471
1472 def upgrade_juju(
1473 self, dry_run=False, reset_previous_upgrade=False,
1474 upload_tools=False, version=None):
1475 """Upgrade Juju on all machines in a model.
1476
1477 :param bool dry_run: Don't do the actual upgrade
1478 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1479 upgrade status
1480 :param bool upload_tools: Upload local version of tools
1481 :param str version: Upgrade to a specific version
1482
1483 """
1484 raise NotImplementedError()
1485
1486 def upload_backup(self, archive_path):
1487 """Store a backup archive remotely in Juju.
1488
1489 :param str archive_path: Path to local archive
1490
1491 """
1492 raise NotImplementedError()
1493
1494 @property
1495 def charmstore(self):
1496 return self._charmstore
1497
1498 async def get_metrics(self, *tags):
1499 """Retrieve metrics.
1500
1501 :param str \*tags: Tags of entities from which to retrieve metrics.
1502 No tags retrieves the metrics of all units in the model.
1503 :return: Dictionary of unit_name:metrics
1504
1505 """
1506 log.debug("Retrieving metrics for %s",
1507 ', '.join(tags) if tags else "all units")
1508
1509 metrics_facade = client.MetricsDebugFacade.from_connection(
1510 self.connection)
1511
1512 entities = [client.Entity(tag) for tag in tags]
1513 metrics_result = await metrics_facade.GetMetrics(entities)
1514
1515 metrics = collections.defaultdict(list)
1516
1517 for entity_metrics in metrics_result.results:
1518 error = entity_metrics.error
1519 if error:
1520 if "is not a valid tag" in error:
1521 raise ValueError(error.message)
1522 else:
1523 raise Exception(error.message)
1524
1525 for metric in entity_metrics.metrics:
1526 metrics[metric.unit].append(vars(metric))
1527
1528 return metrics
1529
1530
1531 def get_charm_series(path):
1532 """Inspects the charm directory at ``path`` and returns a default
1533 series from its metadata.yaml (the first item in the 'series' list).
1534
1535 Returns None if no series can be determined.
1536
1537 """
1538 md = Path(path) / "metadata.yaml"
1539 if not md.exists():
1540 return None
1541 data = yaml.load(md.open())
1542 series = data.get('series')
1543 return series[0] if series else None
1544
1545
1546 class BundleHandler(object):
1547 """
1548 Handle bundles by using the API to translate bundle YAML into a plan of
1549 steps and then dispatching each of those using the API.
1550 """
1551 def __init__(self, model):
1552 self.model = model
1553 self.charmstore = model.charmstore
1554 self.plan = []
1555 self.references = {}
1556 self._units_by_app = {}
1557 for unit_name, unit in model.units.items():
1558 app_units = self._units_by_app.setdefault(unit.application, [])
1559 app_units.append(unit_name)
1560 self.client_facade = client.ClientFacade.from_connection(
1561 model.connection)
1562 self.app_facade = client.ApplicationFacade.from_connection(
1563 model.connection)
1564 self.ann_facade = client.AnnotationsFacade.from_connection(
1565 model.connection)
1566
1567 async def _handle_local_charms(self, bundle):
1568 """Search for references to local charms (i.e. filesystem paths)
1569 in the bundle. Upload the local charms to the model, and replace
1570 the filesystem paths with appropriate 'local:' paths in the bundle.
1571
1572 Return the modified bundle.
1573
1574 :param dict bundle: Bundle dictionary
1575 :return: Modified bundle dictionary
1576
1577 """
1578 apps, args = [], []
1579
1580 default_series = bundle.get('series')
1581 for app_name in self.applications:
1582 app_dict = bundle['services'][app_name]
1583 charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
1584 if not os.path.isdir(charm_dir):
1585 continue
1586 series = (
1587 app_dict.get('series') or
1588 default_series or
1589 get_charm_series(charm_dir)
1590 )
1591 if not series:
1592 raise JujuError(
1593 "Couldn't determine series for charm at {}. "
1594 "Add a 'series' key to the bundle.".format(charm_dir))
1595
1596 # Keep track of what we need to update. We keep a list of apps
1597 # that need to be updated, and a corresponding list of args
1598 # needed to update those apps.
1599 apps.append(app_name)
1600 args.append((charm_dir, series))
1601
1602 if apps:
1603 # If we have apps to update, spawn all the coroutines concurrently
1604 # and wait for them to finish.
1605 charm_urls = await asyncio.gather(*[
1606 self.model.add_local_charm_dir(*params)
1607 for params in args
1608 ], loop=self.model.loop)
1609 # Update the 'charm:' entry for each app with the new 'local:' url.
1610 for app_name, charm_url in zip(apps, charm_urls):
1611 bundle['services'][app_name]['charm'] = charm_url
1612
1613 return bundle
1614
1615 async def fetch_plan(self, entity_id):
1616 is_local = not entity_id.startswith('cs:') and os.path.isdir(entity_id)
1617 if is_local:
1618 bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
1619 else:
1620 bundle_yaml = await self.charmstore.files(entity_id,
1621 filename='bundle.yaml',
1622 read_file=True)
1623 self.bundle = yaml.safe_load(bundle_yaml)
1624 self.bundle = await self._handle_local_charms(self.bundle)
1625
1626 self.plan = await self.client_facade.GetBundleChanges(
1627 yaml.dump(self.bundle))
1628
1629 async def execute_plan(self):
1630 for step in self.plan.changes:
1631 method = getattr(self, step.method)
1632 result = await method(*step.args)
1633 self.references[step.id_] = result
1634
1635 @property
1636 def applications(self):
1637 return list(self.bundle['services'].keys())
1638
1639 def resolve(self, reference):
1640 if reference and reference.startswith('$'):
1641 reference = self.references[reference[1:]]
1642 return reference
1643
1644 async def addCharm(self, charm, series):
1645 """
1646 :param charm string:
1647 Charm holds the URL of the charm to be added.
1648
1649 :param series string:
1650 Series holds the series of the charm to be added
1651 if the charm default is not sufficient.
1652 """
1653 # We don't add local charms because they've already been added
1654 # by self._handle_local_charms
1655 if charm.startswith('local:'):
1656 return charm
1657
1658 entity_id = await self.charmstore.entityId(charm)
1659 log.debug('Adding %s', entity_id)
1660 await self.client_facade.AddCharm(None, entity_id)
1661 return entity_id
1662
1663 async def addMachines(self, params=None):
1664 """
1665 :param params dict:
1666 Dictionary specifying the machine to add. All keys are optional.
1667 Keys include:
1668
1669 series: string specifying the machine OS series.
1670
1671 constraints: string holding machine constraints, if any. We'll
1672 parse this into the json friendly dict that the juju api
1673 expects.
1674
1675 container_type: string holding the type of the container (for
1676 instance ""lxd" or kvm"). It is not specified for top level
1677 machines.
1678
1679 parent_id: string holding a placeholder pointing to another
1680 machine change or to a unit change. This value is only
1681 specified in the case this machine is a container, in
1682 which case also ContainerType is set.
1683
1684 """
1685 params = params or {}
1686
1687 # Normalize keys
1688 params = {normalize_key(k): params[k] for k in params.keys()}
1689
1690 # Fix up values, as necessary.
1691 if 'parent_id' in params:
1692 params['parent_id'] = self.resolve(params['parent_id'])
1693
1694 params['constraints'] = parse_constraints(
1695 params.get('constraints'))
1696 params['jobs'] = params.get('jobs', ['JobHostUnits'])
1697
1698 if params.get('container_type') == 'lxc':
1699 log.warning('Juju 2.0 does not support lxc containers. '
1700 'Converting containers to lxd.')
1701 params['container_type'] = 'lxd'
1702
1703 # Submit the request.
1704 params = client.AddMachineParams(**params)
1705 results = await self.client_facade.AddMachines([params])
1706 error = results.machines[0].error
1707 if error:
1708 raise ValueError("Error adding machine: %s" % error.message)
1709 machine = results.machines[0].machine
1710 log.debug('Added new machine %s', machine)
1711 return machine
1712
1713 async def addRelation(self, endpoint1, endpoint2):
1714 """
1715 :param endpoint1 string:
1716 :param endpoint2 string:
1717 Endpoint1 and Endpoint2 hold relation endpoints in the
1718 "application:interface" form, where the application is always a
1719 placeholder pointing to an application change, and the interface is
1720 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1721 """
1722 endpoints = [endpoint1, endpoint2]
1723 # resolve indirect references
1724 for i in range(len(endpoints)):
1725 parts = endpoints[i].split(':')
1726 parts[0] = self.resolve(parts[0])
1727 endpoints[i] = ':'.join(parts)
1728
1729 log.info('Relating %s <-> %s', *endpoints)
1730 return await self.model.add_relation(*endpoints)
1731
1732 async def deploy(self, charm, series, application, options, constraints,
1733 storage, endpoint_bindings, resources):
1734 """
1735 :param charm string:
1736 Charm holds the URL of the charm to be used to deploy this
1737 application.
1738
1739 :param series string:
1740 Series holds the series of the application to be deployed
1741 if the charm default is not sufficient.
1742
1743 :param application string:
1744 Application holds the application name.
1745
1746 :param options map[string]interface{}:
1747 Options holds application options.
1748
1749 :param constraints string:
1750 Constraints holds the optional application constraints.
1751
1752 :param storage map[string]string:
1753 Storage holds the optional storage constraints.
1754
1755 :param endpoint_bindings map[string]string:
1756 EndpointBindings holds the optional endpoint bindings
1757
1758 :param resources map[string]int:
1759 Resources identifies the revision to use for each resource
1760 of the application's charm.
1761 """
1762 # resolve indirect references
1763 charm = self.resolve(charm)
1764 # the bundle plan doesn't actually do anything with resources, even
1765 # though it ostensibly gives us something (None) for that param
1766 if not charm.startswith('local:'):
1767 resources = await self.model._add_store_resources(application,
1768 charm)
1769 await self.model._deploy(
1770 charm_url=charm,
1771 application=application,
1772 series=series,
1773 config=options,
1774 constraints=constraints,
1775 endpoint_bindings=endpoint_bindings,
1776 resources=resources,
1777 storage=storage,
1778 )
1779 return application
1780
1781 async def addUnit(self, application, to):
1782 """
1783 :param application string:
1784 Application holds the application placeholder name for which a unit
1785 is added.
1786
1787 :param to string:
1788 To holds the optional location where to add the unit, as a
1789 placeholder pointing to another unit change or to a machine change.
1790 """
1791 application = self.resolve(application)
1792 placement = self.resolve(to)
1793 if self._units_by_app.get(application):
1794 # enough units for this application already exist;
1795 # claim one, and carry on
1796 # NB: this should probably honor placement, but the juju client
1797 # doesn't, so we're not bothering, either
1798 unit_name = self._units_by_app[application].pop()
1799 log.debug('Reusing unit %s for %s', unit_name, application)
1800 return self.model.units[unit_name]
1801
1802 log.debug('Adding new unit for %s%s', application,
1803 ' to %s' % placement if placement else '')
1804 return await self.model.applications[application].add_unit(
1805 count=1,
1806 to=placement,
1807 )
1808
1809 async def expose(self, application):
1810 """
1811 :param application string:
1812 Application holds the placeholder name of the application that must
1813 be exposed.
1814 """
1815 application = self.resolve(application)
1816 log.info('Exposing %s', application)
1817 return await self.model.applications[application].expose()
1818
1819 async def setAnnotations(self, id_, entity_type, annotations):
1820 """
1821 :param id_ string:
1822 Id is the placeholder for the application or machine change
1823 corresponding to the entity to be annotated.
1824
1825 :param entity_type EntityType:
1826 EntityType holds the type of the entity, "application" or
1827 "machine".
1828
1829 :param annotations map[string]string:
1830 Annotations holds the annotations as key/value pairs.
1831 """
1832 entity_id = self.resolve(id_)
1833 try:
1834 entity = self.model.state.get_entity(entity_type, entity_id)
1835 except KeyError:
1836 entity = await self.model._wait_for_new(entity_type, entity_id)
1837 return await entity.set_annotations(annotations)
1838
1839
1840 class CharmStore(object):
1841 """
1842 Async wrapper around theblues.charmstore.CharmStore
1843 """
1844 def __init__(self, loop):
1845 self.loop = loop
1846 self._cs = theblues.charmstore.CharmStore(timeout=5)
1847
1848 def __getattr__(self, name):
1849 """
1850 Wrap method calls in coroutines that use run_in_executor to make them
1851 async.
1852 """
1853 attr = getattr(self._cs, name)
1854 if not callable(attr):
1855 wrapper = partial(getattr, self._cs, name)
1856 setattr(self, name, wrapper)
1857 else:
1858 async def coro(*args, **kwargs):
1859 method = partial(attr, *args, **kwargs)
1860 for attempt in range(1, 4):
1861 try:
1862 return await self.loop.run_in_executor(None, method)
1863 except theblues.errors.ServerError:
1864 if attempt == 3:
1865 raise
1866 await asyncio.sleep(1, loop=self.loop)
1867 setattr(self, name, coro)
1868 wrapper = coro
1869 return wrapper
1870
1871
1872 class CharmArchiveGenerator(object):
1873 def __init__(self, path):
1874 self.path = os.path.abspath(os.path.expanduser(path))
1875
1876 def make_archive(self, path):
1877 """Create archive of directory and write to ``path``.
1878
1879 :param path: Path to archive
1880
1881 Ignored::
1882
1883 * build/\* - This is used for packing the charm itself and any
1884 similar tasks.
1885 * \*/.\* - Hidden files are all ignored for now. This will most
1886 likely be changed into a specific ignore list
1887 (.bzr, etc)
1888
1889 """
1890 zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
1891 for dirpath, dirnames, filenames in os.walk(self.path):
1892 relative_path = dirpath[len(self.path) + 1:]
1893 if relative_path and not self._ignore(relative_path):
1894 zf.write(dirpath, relative_path)
1895 for name in filenames:
1896 archive_name = os.path.join(relative_path, name)
1897 if not self._ignore(archive_name):
1898 real_path = os.path.join(dirpath, name)
1899 self._check_type(real_path)
1900 if os.path.islink(real_path):
1901 self._check_link(real_path)
1902 self._write_symlink(
1903 zf, os.readlink(real_path), archive_name)
1904 else:
1905 zf.write(real_path, archive_name)
1906 zf.close()
1907 return path
1908
1909 def _check_type(self, path):
1910 """Check the path
1911 """
1912 s = os.stat(path)
1913 if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
1914 return path
1915 raise ValueError("Invalid Charm at % %s" % (
1916 path, "Invalid file type for a charm"))
1917
1918 def _check_link(self, path):
1919 link_path = os.readlink(path)
1920 if link_path[0] == "/":
1921 raise ValueError(
1922 "Invalid Charm at %s: %s" % (
1923 path, "Absolute links are invalid"))
1924 path_dir = os.path.dirname(path)
1925 link_path = os.path.join(path_dir, link_path)
1926 if not link_path.startswith(os.path.abspath(self.path)):
1927 raise ValueError(
1928 "Invalid charm at %s %s" % (
1929 path, "Only internal symlinks are allowed"))
1930
1931 def _write_symlink(self, zf, link_target, link_path):
1932 """Package symlinks with appropriate zipfile metadata."""
1933 info = zipfile.ZipInfo()
1934 info.filename = link_path
1935 info.create_system = 3
1936 # Magic code for symlinks / py2/3 compat
1937 # 27166663808 = (stat.S_IFLNK | 0755) << 16
1938 info.external_attr = 2716663808
1939 zf.writestr(info, link_target)
1940
1941 def _ignore(self, path):
1942 if path == "build" or path.startswith("build/"):
1943 return True
1944 if path.startswith('.'):
1945 return True