ac225992fc8fd82209271d07d837e683e93aedf5
[osm/N2VC.git] / modules / libjuju / juju / model.py
1 import asyncio
2 import base64
3 import collections
4 import hashlib
5 import json
6 import logging
7 import os
8 import re
9 import stat
10 import tempfile
11 import weakref
12 import zipfile
13 from concurrent.futures import CancelledError
14 from functools import partial
15 from pathlib import Path
16
17 import theblues.charmstore
18 import theblues.errors
19 import websockets
20 import yaml
21
22 from . import tag, utils
23 from .client import client, connector
24 from .client.client import ConfigValue
25 from .constraints import parse as parse_constraints
26 from .constraints import normalize_key
27 from .delta import get_entity_class, get_entity_delta
28 from .errors import JujuAPIError, JujuError
29 from .exceptions import DeadEntityException
30 from .placement import parse as parse_placement
31
32 log = logging.getLogger(__name__)
33
34
35 class _Observer:
36 """Wrapper around an observer callable.
37
38 This wrapper allows filter criteria to be associated with the
39 callable so that it's only called for changes that meet the criteria.
40
41 """
42 def __init__(self, callable_, entity_type, action, entity_id, predicate):
43 self.callable_ = callable_
44 self.entity_type = entity_type
45 self.action = action
46 self.entity_id = entity_id
47 self.predicate = predicate
48 if self.entity_id:
49 self.entity_id = str(self.entity_id)
50 if not self.entity_id.startswith('^'):
51 self.entity_id = '^' + self.entity_id
52 if not self.entity_id.endswith('$'):
53 self.entity_id += '$'
54
55 async def __call__(self, delta, old, new, model):
56 await self.callable_(delta, old, new, model)
57
58 def cares_about(self, delta):
59 """Return True if this observer "cares about" (i.e. wants to be
60 called) for a this delta.
61
62 """
63 if (self.entity_id and delta.get_id() and
64 not re.match(self.entity_id, str(delta.get_id()))):
65 return False
66
67 if self.entity_type and self.entity_type != delta.entity:
68 return False
69
70 if self.action and self.action != delta.type:
71 return False
72
73 if self.predicate and not self.predicate(delta):
74 return False
75
76 return True
77
78
79 class ModelObserver:
80 """
81 Base class for creating observers that react to changes in a model.
82 """
83 async def __call__(self, delta, old, new, model):
84 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
85 method = getattr(self, handler_name, self.on_change)
86 await method(delta, old, new, model)
87
88 async def on_change(self, delta, old, new, model):
89 """Generic model-change handler.
90
91 This should be overridden in a subclass.
92
93 :param delta: :class:`juju.client.overrides.Delta`
94 :param old: :class:`juju.model.ModelEntity`
95 :param new: :class:`juju.model.ModelEntity`
96 :param model: :class:`juju.model.Model`
97
98 """
99 pass
100
101
102 class ModelState:
103 """Holds the state of the model, including the delta history of all
104 entities in the model.
105
106 """
107 def __init__(self, model):
108 self.model = model
109 self.state = dict()
110
111 def _live_entity_map(self, entity_type):
112 """Return an id:Entity map of all the living entities of
113 type ``entity_type``.
114
115 """
116 return {
117 entity_id: self.get_entity(entity_type, entity_id)
118 for entity_id, history in self.state.get(entity_type, {}).items()
119 if history[-1] is not None
120 }
121
122 @property
123 def applications(self):
124 """Return a map of application-name:Application for all applications
125 currently in the model.
126
127 """
128 return self._live_entity_map('application')
129
130 @property
131 def machines(self):
132 """Return a map of machine-id:Machine for all machines currently in
133 the model.
134
135 """
136 return self._live_entity_map('machine')
137
138 @property
139 def units(self):
140 """Return a map of unit-id:Unit for all units currently in
141 the model.
142
143 """
144 return self._live_entity_map('unit')
145
146 @property
147 def relations(self):
148 """Return a map of relation-id:Relation for all relations currently in
149 the model.
150
151 """
152 return self._live_entity_map('relation')
153
154 def entity_history(self, entity_type, entity_id):
155 """Return the history deque for an entity.
156
157 """
158 return self.state[entity_type][entity_id]
159
160 def entity_data(self, entity_type, entity_id, history_index):
161 """Return the data dict for an entity at a specific index of its
162 history.
163
164 """
165 return self.entity_history(entity_type, entity_id)[history_index]
166
167 def apply_delta(self, delta):
168 """Apply delta to our state and return a copy of the
169 affected object as it was before and after the update, e.g.:
170
171 old_obj, new_obj = self.apply_delta(delta)
172
173 old_obj may be None if the delta is for the creation of a new object,
174 e.g. a new application or unit is deployed.
175
176 new_obj will never be None, but may be dead (new_obj.dead == True)
177 if the object was deleted as a result of the delta being applied.
178
179 """
180 history = (
181 self.state
182 .setdefault(delta.entity, {})
183 .setdefault(delta.get_id(), collections.deque())
184 )
185
186 history.append(delta.data)
187 if delta.type == 'remove':
188 history.append(None)
189
190 entity = self.get_entity(delta.entity, delta.get_id())
191 return entity.previous(), entity
192
193 def get_entity(
194 self, entity_type, entity_id, history_index=-1, connected=True):
195 """Return an object instance for the given entity_type and id.
196
197 By default the object state matches the most recent state from
198 Juju. To get an instance of the object in an older state, pass
199 history_index, an index into the history deque for the entity.
200
201 """
202
203 if history_index < 0 and history_index != -1:
204 history_index += len(self.entity_history(entity_type, entity_id))
205 if history_index < 0:
206 return None
207
208 try:
209 self.entity_data(entity_type, entity_id, history_index)
210 except IndexError:
211 return None
212
213 entity_class = get_entity_class(entity_type)
214 return entity_class(
215 entity_id, self.model, history_index=history_index,
216 connected=connected)
217
218
219 class ModelEntity:
220 """An object in the Model tree"""
221
222 def __init__(self, entity_id, model, history_index=-1, connected=True):
223 """Initialize a new entity
224
225 :param entity_id str: The unique id of the object in the model
226 :param model: The model instance in whose object tree this
227 entity resides
228 :history_index int: The index of this object's state in the model's
229 history deque for this entity
230 :connected bool: Flag indicating whether this object gets live updates
231 from the model.
232
233 """
234 self.entity_id = entity_id
235 self.model = model
236 self._history_index = history_index
237 self.connected = connected
238 self.connection = model.connection()
239
240 def __repr__(self):
241 return '<{} entity_id="{}">'.format(type(self).__name__,
242 self.entity_id)
243
244 def __getattr__(self, name):
245 """Fetch object attributes from the underlying data dict held in the
246 model.
247
248 """
249 try:
250 return self.safe_data[name]
251 except KeyError:
252 name = name.replace('_', '-')
253 if name in self.safe_data:
254 return self.safe_data[name]
255 else:
256 raise
257
258 def __bool__(self):
259 return bool(self.data)
260
261 def on_change(self, callable_):
262 """Add a change observer to this entity.
263
264 """
265 self.model.add_observer(
266 callable_, self.entity_type, 'change', self.entity_id)
267
268 def on_remove(self, callable_):
269 """Add a remove observer to this entity.
270
271 """
272 self.model.add_observer(
273 callable_, self.entity_type, 'remove', self.entity_id)
274
275 @property
276 def entity_type(self):
277 """A string identifying the entity type of this object, e.g.
278 'application' or 'unit', etc.
279
280 """
281 return self.__class__.__name__.lower()
282
283 @property
284 def current(self):
285 """Return True if this object represents the current state of the
286 entity in the underlying model.
287
288 This will be True except when the object represents an entity at a
289 non-latest state in history, e.g. if the object was obtained by calling
290 .previous() on another object.
291
292 """
293 return self._history_index == -1
294
295 @property
296 def dead(self):
297 """Returns True if this entity no longer exists in the underlying
298 model.
299
300 """
301 return (
302 self.data is None or
303 self.model.state.entity_data(
304 self.entity_type, self.entity_id, -1) is None
305 )
306
307 @property
308 def alive(self):
309 """Returns True if this entity still exists in the underlying
310 model.
311
312 """
313 return not self.dead
314
315 @property
316 def data(self):
317 """The data dictionary for this entity.
318
319 """
320 return self.model.state.entity_data(
321 self.entity_type, self.entity_id, self._history_index)
322
323 @property
324 def safe_data(self):
325 """The data dictionary for this entity.
326
327 If this `ModelEntity` points to the dead state, it will
328 raise `DeadEntityException`.
329
330 """
331 if self.data is None:
332 raise DeadEntityException(
333 "Entity {}:{} is dead - its attributes can no longer be "
334 "accessed. Use the .previous() method on this object to get "
335 "a copy of the object at its previous state.".format(
336 self.entity_type, self.entity_id))
337 return self.data
338
339 def previous(self):
340 """Return a copy of this object as was at its previous state in
341 history.
342
343 Returns None if this object is new (and therefore has no history).
344
345 The returned object is always "disconnected", i.e. does not receive
346 live updates.
347
348 """
349 return self.model.state.get_entity(
350 self.entity_type, self.entity_id, self._history_index - 1,
351 connected=False)
352
353 def next(self):
354 """Return a copy of this object at its next state in
355 history.
356
357 Returns None if this object is already the latest.
358
359 The returned object is "disconnected", i.e. does not receive
360 live updates, unless it is current (latest).
361
362 """
363 if self._history_index == -1:
364 return None
365
366 new_index = self._history_index + 1
367 connected = (
368 new_index == len(self.model.state.entity_history(
369 self.entity_type, self.entity_id)) - 1
370 )
371 return self.model.state.get_entity(
372 self.entity_type, self.entity_id, self._history_index - 1,
373 connected=connected)
374
375 def latest(self):
376 """Return a copy of this object at its current state in the model.
377
378 Returns self if this object is already the latest.
379
380 The returned object is always "connected", i.e. receives
381 live updates from the model.
382
383 """
384 if self._history_index == -1:
385 return self
386
387 return self.model.state.get_entity(self.entity_type, self.entity_id)
388
389
390 class Model:
391 """
392 The main API for interacting with a Juju model.
393 """
394 def __init__(
395 self,
396 loop=None,
397 max_frame_size=None,
398 bakery_client=None,
399 jujudata=None,
400 ):
401 """Instantiate a new Model.
402
403 The connect method will need to be called before this
404 object can be used for anything interesting.
405
406 If jujudata is None, jujudata.FileJujuData will be used.
407
408 :param loop: an asyncio event loop
409 :param max_frame_size: See
410 `juju.client.connection.Connection.MAX_FRAME_SIZE`
411 :param bakery_client httpbakery.Client: The bakery client to use
412 for macaroon authorization.
413 :param jujudata JujuData: The source for current controller information.
414 """
415 self._connector = connector.Connector(
416 loop=loop,
417 max_frame_size=max_frame_size,
418 bakery_client=bakery_client,
419 jujudata=jujudata,
420 )
421 self._observers = weakref.WeakValueDictionary()
422 self.state = ModelState(self)
423 self._info = None
424 self._watch_stopping = asyncio.Event(loop=self._connector.loop)
425 self._watch_stopped = asyncio.Event(loop=self._connector.loop)
426 self._watch_received = asyncio.Event(loop=self._connector.loop)
427 self._watch_stopped.set()
428 self._charmstore = CharmStore(self._connector.loop)
429
430 def is_connected(self):
431 """Reports whether the Model is currently connected."""
432 return self._connector.is_connected()
433
434 @property
435 def loop(self):
436 return self._connector.loop
437
438 def connection(self):
439 """Return the current Connection object. It raises an exception
440 if the Model is disconnected"""
441 return self._connector.connection()
442
443 async def get_controller(self):
444 """Return a Controller instance for the currently connected model.
445 :return Controller:
446 """
447 from juju.controller import Controller
448 controller = Controller(jujudata=self._connector.jujudata)
449 kwargs = self.connection().connect_params()
450 kwargs.pop('uuid')
451 await controller._connect_direct(**kwargs)
452 return controller
453
454 async def __aenter__(self):
455 await self.connect()
456 return self
457
458 async def __aexit__(self, exc_type, exc, tb):
459 await self.disconnect()
460
461 async def connect(self, model_name=None, **kwargs):
462 """Connect to a juju model.
463
464 If any arguments are specified other than model_name, then
465 model_name must be None and an explicit connection will be made
466 using Connection.connect using those parameters (the 'uuid'
467 parameter must be specified).
468
469 Otherwise, if model_name is None, connect to the current model.
470
471 Otherwise, model_name must specify the name of a known
472 model.
473
474 :param model_name: Format [controller:][user/]model
475
476 """
477 await self.disconnect()
478 if not kwargs:
479 await self._connector.connect_model(model_name)
480 else:
481 if kwargs.get('uuid') is None:
482 raise ValueError('no UUID specified when connecting to model')
483 await self._connector.connect(**kwargs)
484 await self._after_connect()
485
486 async def connect_model(self, model_name):
487 """
488 .. deprecated:: 0.6.2
489 Use connect(model_name=model_name) instead.
490 """
491 return await self.connect(model_name=model_name)
492
493 async def connect_current(self):
494 """
495 .. deprecated:: 0.6.2
496 Use connect instead.
497 """
498 return await self.connect()
499
500 async def _connect_direct(self, **kwargs):
501 await self.disconnect()
502 await self._connector.connect(**kwargs)
503 await self._after_connect()
504
505 async def _after_connect(self):
506 self._watch()
507
508 # Wait for the first packet of data from the AllWatcher,
509 # which contains all information on the model.
510 # TODO this means that we can't do anything until
511 # we've received all the model data, which might be
512 # a whole load of unneeded data if all the client wants
513 # to do is make one RPC call.
514 await self._watch_received.wait()
515
516 await self.get_info()
517
518 async def disconnect(self):
519 """Shut down the watcher task and close websockets.
520
521 """
522 if not self._watch_stopped.is_set():
523 log.debug('Stopping watcher task')
524 self._watch_stopping.set()
525 await self._watch_stopped.wait()
526 self._watch_stopping.clear()
527
528 if self.is_connected():
529 log.debug('Closing model connection')
530 await self._connector.disconnect()
531 self.info = None
532
533 async def add_local_charm_dir(self, charm_dir, series):
534 """Upload a local charm to the model.
535
536 This will automatically generate an archive from
537 the charm dir.
538
539 :param charm_dir: Path to the charm directory
540 :param series: Charm series
541
542 """
543 fh = tempfile.NamedTemporaryFile()
544 CharmArchiveGenerator(charm_dir).make_archive(fh.name)
545 with fh:
546 func = partial(
547 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
548 charm_url = await self._connector.loop.run_in_executor(None, func)
549
550 log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
551 return charm_url
552
553 def add_local_charm(self, charm_file, series, size=None):
554 """Upload a local charm archive to the model.
555
556 Returns the 'local:...' url that should be used to deploy the charm.
557
558 :param charm_file: Path to charm zip archive
559 :param series: Charm series
560 :param size: Size of the archive, in bytes
561 :return str: 'local:...' url for deploying the charm
562 :raises: :class:`JujuError` if the upload fails
563
564 Uses an https endpoint at the same host:port as the wss.
565 Supports large file uploads.
566
567 .. warning::
568
569 This method will block. Consider using :meth:`add_local_charm_dir`
570 instead.
571
572 """
573 conn, headers, path_prefix = self.connection().https_connection()
574 path = "%s/charms?series=%s" % (path_prefix, series)
575 headers['Content-Type'] = 'application/zip'
576 if size:
577 headers['Content-Length'] = size
578 conn.request("POST", path, charm_file, headers)
579 response = conn.getresponse()
580 result = response.read().decode()
581 if not response.status == 200:
582 raise JujuError(result)
583 result = json.loads(result)
584 return result['charm-url']
585
586 def all_units_idle(self):
587 """Return True if all units are idle.
588
589 """
590 for unit in self.units.values():
591 unit_status = unit.data['agent-status']['current']
592 if unit_status != 'idle':
593 return False
594 return True
595
596 async def reset(self, force=False):
597 """Reset the model to a clean state.
598
599 :param bool force: Force-terminate machines.
600
601 This returns only after the model has reached a clean state. "Clean"
602 means no applications or machines exist in the model.
603
604 """
605 log.debug('Resetting model')
606 for app in self.applications.values():
607 await app.destroy()
608 for machine in self.machines.values():
609 await machine.destroy(force=force)
610 await self.block_until(
611 lambda: len(self.machines) == 0
612 )
613
614 async def block_until(self, *conditions, timeout=None, wait_period=0.5):
615 """Return only after all conditions are true.
616
617 Raises `websockets.ConnectionClosed` if disconnected.
618 """
619 def _disconnected():
620 return not (self.is_connected() and self.connection().is_open)
621
622 def done():
623 return _disconnected() or all(c() for c in conditions)
624
625 await utils.block_until(done,
626 timeout=timeout,
627 wait_period=wait_period,
628 loop=self.loop)
629 if _disconnected():
630 raise websockets.ConnectionClosed(1006, 'no reason')
631
632 @property
633 def applications(self):
634 """Return a map of application-name:Application for all applications
635 currently in the model.
636
637 """
638 return self.state.applications
639
640 @property
641 def machines(self):
642 """Return a map of machine-id:Machine for all machines currently in
643 the model.
644
645 """
646 return self.state.machines
647
648 @property
649 def units(self):
650 """Return a map of unit-id:Unit for all units currently in
651 the model.
652
653 """
654 return self.state.units
655
656 @property
657 def relations(self):
658 """Return a list of all Relations currently in the model.
659
660 """
661 return list(self.state.relations.values())
662
663 async def get_info(self):
664 """Return a client.ModelInfo object for this Model.
665
666 Retrieves latest info for this Model from the api server. The
667 return value is cached on the Model.info attribute so that the
668 valued may be accessed again without another api call, if
669 desired.
670
671 This method is called automatically when the Model is connected,
672 resulting in Model.info being initialized without requiring an
673 explicit call to this method.
674
675 """
676 facade = client.ClientFacade.from_connection(self.connection())
677
678 self.info = await facade.ModelInfo()
679 log.debug('Got ModelInfo: %s', vars(self.info))
680
681 return self.info
682
683 def add_observer(
684 self, callable_, entity_type=None, action=None, entity_id=None,
685 predicate=None):
686 """Register an "on-model-change" callback
687
688 Once the model is connected, ``callable_``
689 will be called each time the model changes. ``callable_`` should
690 be Awaitable and accept the following positional arguments:
691
692 delta - An instance of :class:`juju.delta.EntityDelta`
693 containing the raw delta data recv'd from the Juju
694 websocket.
695
696 old_obj - If the delta modifies an existing object in the model,
697 old_obj will be a copy of that object, as it was before the
698 delta was applied. Will be None if the delta creates a new
699 entity in the model.
700
701 new_obj - A copy of the new or updated object, after the delta
702 is applied. Will be None if the delta removes an entity
703 from the model.
704
705 model - The :class:`Model` itself.
706
707 Events for which ``callable_`` is called can be specified by passing
708 entity_type, action, and/or entitiy_id filter criteria, e.g.::
709
710 add_observer(
711 myfunc,
712 entity_type='application', action='add', entity_id='ubuntu')
713
714 For more complex filtering conditions, pass a predicate function. It
715 will be called with a delta as its only argument. If the predicate
716 function returns True, the ``callable_`` will be called.
717
718 """
719 observer = _Observer(
720 callable_, entity_type, action, entity_id, predicate)
721 self._observers[observer] = callable_
722
723 def _watch(self):
724 """Start an asynchronous watch against this model.
725
726 See :meth:`add_observer` to register an onchange callback.
727
728 """
729 async def _all_watcher():
730 try:
731 allwatcher = client.AllWatcherFacade.from_connection(
732 self.connection())
733 while not self._watch_stopping.is_set():
734 try:
735 results = await utils.run_with_interrupt(
736 allwatcher.Next(),
737 self._watch_stopping,
738 self._connector.loop)
739 except JujuAPIError as e:
740 if 'watcher was stopped' not in str(e):
741 raise
742 if self._watch_stopping.is_set():
743 # this shouldn't ever actually happen, because
744 # the event should trigger before the controller
745 # has a chance to tell us the watcher is stopped
746 # but handle it gracefully, just in case
747 break
748 # controller stopped our watcher for some reason
749 # but we're not actually stopping, so just restart it
750 log.warning(
751 'Watcher: watcher stopped, restarting')
752 del allwatcher.Id
753 continue
754 except websockets.ConnectionClosed:
755 monitor = self.connection().monitor
756 if monitor.status == monitor.ERROR:
757 # closed unexpectedly, try to reopen
758 log.warning(
759 'Watcher: connection closed, reopening')
760 await self.connection().reconnect()
761 if monitor.status != monitor.CONNECTED:
762 # reconnect failed; abort and shutdown
763 log.error('Watcher: automatic reconnect '
764 'failed; stopping watcher')
765 break
766 del allwatcher.Id
767 continue
768 else:
769 # closed on request, go ahead and shutdown
770 break
771 if self._watch_stopping.is_set():
772 try:
773 await allwatcher.Stop()
774 except websockets.ConnectionClosed:
775 pass # can't stop on a closed conn
776 break
777 for delta in results.deltas:
778 delta = get_entity_delta(delta)
779 old_obj, new_obj = self.state.apply_delta(delta)
780 await self._notify_observers(delta, old_obj, new_obj)
781 self._watch_received.set()
782 except CancelledError:
783 pass
784 except Exception:
785 log.exception('Error in watcher')
786 raise
787 finally:
788 self._watch_stopped.set()
789
790 log.debug('Starting watcher task')
791 self._watch_received.clear()
792 self._watch_stopping.clear()
793 self._watch_stopped.clear()
794 self._connector.loop.create_task(_all_watcher())
795
796 async def _notify_observers(self, delta, old_obj, new_obj):
797 """Call observing callbacks, notifying them of a change in model state
798
799 :param delta: The raw change from the watcher
800 (:class:`juju.client.overrides.Delta`)
801 :param old_obj: The object in the model that this delta updates.
802 May be None.
803 :param new_obj: The object in the model that is created or updated
804 by applying this delta.
805
806 """
807 if new_obj and not old_obj:
808 delta.type = 'add'
809
810 log.debug(
811 'Model changed: %s %s %s',
812 delta.entity, delta.type, delta.get_id())
813
814 for o in self._observers:
815 if o.cares_about(delta):
816 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
817 loop=self._connector.loop)
818
819 async def _wait(self, entity_type, entity_id, action, predicate=None):
820 """
821 Block the calling routine until a given action has happened to the
822 given entity
823
824 :param entity_type: The entity's type.
825 :param entity_id: The entity's id.
826 :param action: the type of action (e.g., 'add', 'change', or 'remove')
827 :param predicate: optional callable that must take as an
828 argument a delta, and must return a boolean, indicating
829 whether the delta contains the specific action we're looking
830 for. For example, you might check to see whether a 'change'
831 has a 'completed' status. See the _Observer class for details.
832
833 """
834 q = asyncio.Queue(loop=self._connector.loop)
835
836 async def callback(delta, old, new, model):
837 await q.put(delta.get_id())
838
839 self.add_observer(callback, entity_type, action, entity_id, predicate)
840 entity_id = await q.get()
841 # object might not be in the entity_map if we were waiting for a
842 # 'remove' action
843 return self.state._live_entity_map(entity_type).get(entity_id)
844
845 async def _wait_for_new(self, entity_type, entity_id):
846 """Wait for a new object to appear in the Model and return it.
847
848 Waits for an object of type ``entity_type`` with id ``entity_id``
849 to appear in the model. This is similar to watching for the
850 object using ``block_until``, but uses the watcher rather than
851 polling.
852
853 """
854 # if the entity is already in the model, just return it
855 if entity_id in self.state._live_entity_map(entity_type):
856 return self.state._live_entity_map(entity_type)[entity_id]
857 return await self._wait(entity_type, entity_id, None)
858
859 async def wait_for_action(self, action_id):
860 """Given an action, wait for it to complete."""
861
862 if action_id.startswith("action-"):
863 # if we've been passed action.tag, transform it into the
864 # id that the api deltas will use.
865 action_id = action_id[7:]
866
867 def predicate(delta):
868 return delta.data['status'] in ('completed', 'failed')
869
870 return await self._wait('action', action_id, None, predicate)
871
872 async def add_machine(
873 self, spec=None, constraints=None, disks=None, series=None):
874 """Start a new, empty machine and optionally a container, or add a
875 container to a machine.
876
877 :param str spec: Machine specification
878 Examples::
879
880 (None) - starts a new machine
881 'lxd' - starts a new machine with one lxd container
882 'lxd:4' - starts a new lxd container on machine 4
883 'ssh:user@10.10.0.3' - manually provisions a machine with ssh
884 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
885 'maas2.name' - acquire machine maas2.name on MAAS
886
887 :param dict constraints: Machine constraints, which can contain the
888 the following keys::
889
890 arch : str
891 container : str
892 cores : int
893 cpu_power : int
894 instance_type : str
895 mem : int
896 root_disk : int
897 spaces : list(str)
898 tags : list(str)
899 virt_type : str
900
901 Example::
902
903 constraints={
904 'mem': 256 * MB,
905 'tags': ['virtual'],
906 }
907
908 :param list disks: List of disk constraint dictionaries, which can
909 contain the following keys::
910
911 count : int
912 pool : str
913 size : int
914
915 Example::
916
917 disks=[{
918 'pool': 'rootfs',
919 'size': 10 * GB,
920 'count': 1,
921 }]
922
923 :param str series: Series, e.g. 'xenial'
924
925 Supported container types are: lxd, kvm
926
927 When deploying a container to an existing machine, constraints cannot
928 be used.
929
930 """
931 params = client.AddMachineParams()
932 params.jobs = ['JobHostUnits']
933
934 if spec:
935 placement = parse_placement(spec)
936 if placement:
937 params.placement = placement[0]
938
939 if constraints:
940 params.constraints = client.Value.from_json(constraints)
941
942 if disks:
943 params.disks = [
944 client.Constraints.from_json(o) for o in disks]
945
946 if series:
947 params.series = series
948
949 # Submit the request.
950 client_facade = client.ClientFacade.from_connection(self.connection())
951 results = await client_facade.AddMachines([params])
952 error = results.machines[0].error
953 if error:
954 raise ValueError("Error adding machine: %s" % error.message)
955 machine_id = results.machines[0].machine
956 log.debug('Added new machine %s', machine_id)
957 return await self._wait_for_new('machine', machine_id)
958
959 async def add_relation(self, relation1, relation2):
960 """Add a relation between two applications.
961
962 :param str relation1: '<application>[:<relation_name>]'
963 :param str relation2: '<application>[:<relation_name>]'
964
965 """
966 app_facade = client.ApplicationFacade.from_connection(self.connection())
967
968 log.debug(
969 'Adding relation %s <-> %s', relation1, relation2)
970
971 def _find_relation(*specs):
972 for rel in self.relations:
973 if rel.matches(*specs):
974 return rel
975 return None
976
977 try:
978 result = await app_facade.AddRelation([relation1, relation2])
979 except JujuAPIError as e:
980 if 'relation already exists' not in e.message:
981 raise
982 rel = _find_relation(relation1, relation2)
983 if rel:
984 return rel
985 raise JujuError('Relation {} {} exists but not in model'.format(
986 relation1, relation2))
987
988 specs = ['{}:{}'.format(app, data['name'])
989 for app, data in result.endpoints.items()]
990
991 await self.block_until(lambda: _find_relation(*specs) is not None)
992 return _find_relation(*specs)
993
994 def add_space(self, name, *cidrs):
995 """Add a new network space.
996
997 Adds a new space with the given name and associates the given
998 (optional) list of existing subnet CIDRs with it.
999
1000 :param str name: Name of the space
1001 :param \*cidrs: Optional list of existing subnet CIDRs
1002
1003 """
1004 raise NotImplementedError()
1005
1006 async def add_ssh_key(self, user, key):
1007 """Add a public SSH key to this model.
1008
1009 :param str user: The username of the user
1010 :param str key: The public ssh key
1011
1012 """
1013 key_facade = client.KeyManagerFacade.from_connection(self.connection())
1014 return await key_facade.AddKeys([key], user)
1015 add_ssh_keys = add_ssh_key
1016
1017 def add_subnet(self, cidr_or_id, space, *zones):
1018 """Add an existing subnet to this model.
1019
1020 :param str cidr_or_id: CIDR or provider ID of the existing subnet
1021 :param str space: Network space with which to associate
1022 :param str \*zones: Zone(s) in which the subnet resides
1023
1024 """
1025 raise NotImplementedError()
1026
1027 def get_backups(self):
1028 """Retrieve metadata for backups in this model.
1029
1030 """
1031 raise NotImplementedError()
1032
1033 def block(self, *commands):
1034 """Add a new block to this model.
1035
1036 :param str \*commands: The commands to block. Valid values are
1037 'all-changes', 'destroy-model', 'remove-object'
1038
1039 """
1040 raise NotImplementedError()
1041
1042 def get_blocks(self):
1043 """List blocks for this model.
1044
1045 """
1046 raise NotImplementedError()
1047
1048 def get_cached_images(self, arch=None, kind=None, series=None):
1049 """Return a list of cached OS images.
1050
1051 :param str arch: Filter by image architecture
1052 :param str kind: Filter by image kind, e.g. 'lxd'
1053 :param str series: Filter by image series, e.g. 'xenial'
1054
1055 """
1056 raise NotImplementedError()
1057
1058 def create_backup(self, note=None, no_download=False):
1059 """Create a backup of this model.
1060
1061 :param str note: A note to store with the backup
1062 :param bool no_download: Do not download the backup archive
1063 :return str: Path to downloaded archive
1064
1065 """
1066 raise NotImplementedError()
1067
1068 def create_storage_pool(self, name, provider_type, **pool_config):
1069 """Create or define a storage pool.
1070
1071 :param str name: Name to give the storage pool
1072 :param str provider_type: Pool provider type
1073 :param \*\*pool_config: key/value pool configuration pairs
1074
1075 """
1076 raise NotImplementedError()
1077
1078 def debug_log(
1079 self, no_tail=False, exclude_module=None, include_module=None,
1080 include=None, level=None, limit=0, lines=10, replay=False,
1081 exclude=None):
1082 """Get log messages for this model.
1083
1084 :param bool no_tail: Stop after returning existing log messages
1085 :param list exclude_module: Do not show log messages for these logging
1086 modules
1087 :param list include_module: Only show log messages for these logging
1088 modules
1089 :param list include: Only show log messages for these entities
1090 :param str level: Log level to show, valid options are 'TRACE',
1091 'DEBUG', 'INFO', 'WARNING', 'ERROR,
1092 :param int limit: Return this many of the most recent (possibly
1093 filtered) lines are shown
1094 :param int lines: Yield this many of the most recent lines, and keep
1095 yielding
1096 :param bool replay: Yield the entire log, and keep yielding
1097 :param list exclude: Do not show log messages for these entities
1098
1099 """
1100 raise NotImplementedError()
1101
1102 def _get_series(self, entity_url, entity):
1103 # try to get the series from the provided charm URL
1104 if entity_url.startswith('cs:'):
1105 parts = entity_url[3:].split('/')
1106 else:
1107 parts = entity_url.split('/')
1108 if parts[0].startswith('~'):
1109 parts.pop(0)
1110 if len(parts) > 1:
1111 # series was specified in the URL
1112 return parts[0]
1113 # series was not supplied at all, so use the newest
1114 # supported series according to the charm store
1115 ss = entity['Meta']['supported-series']
1116 return ss['SupportedSeries'][0]
1117
1118 async def deploy(
1119 self, entity_url, application_name=None, bind=None, budget=None,
1120 channel=None, config=None, constraints=None, force=False,
1121 num_units=1, plan=None, resources=None, series=None, storage=None,
1122 to=None):
1123 """Deploy a new service or bundle.
1124
1125 :param str entity_url: Charm or bundle url
1126 :param str application_name: Name to give the service
1127 :param dict bind: <charm endpoint>:<network space> pairs
1128 :param dict budget: <budget name>:<limit> pairs
1129 :param str channel: Charm store channel from which to retrieve
1130 the charm or bundle, e.g. 'edge'
1131 :param dict config: Charm configuration dictionary
1132 :param constraints: Service constraints
1133 :type constraints: :class:`juju.Constraints`
1134 :param bool force: Allow charm to be deployed to a machine running
1135 an unsupported series
1136 :param int num_units: Number of units to deploy
1137 :param str plan: Plan under which to deploy charm
1138 :param dict resources: <resource name>:<file path> pairs
1139 :param str series: Series on which to deploy
1140 :param dict storage: Storage constraints TODO how do these look?
1141 :param to: Placement directive as a string. For example:
1142
1143 '23' - place on machine 23
1144 'lxd:7' - place in new lxd container on machine 7
1145 '24/lxd/3' - place in container 3 on machine 24
1146
1147 If None, a new machine is provisioned.
1148
1149
1150 TODO::
1151
1152 - support local resources
1153
1154 """
1155 if storage:
1156 storage = {
1157 k: client.Constraints(**v)
1158 for k, v in storage.items()
1159 }
1160
1161 entity_path = Path(entity_url.replace('local:', ''))
1162 bundle_path = entity_path / 'bundle.yaml'
1163 metadata_path = entity_path / 'metadata.yaml'
1164
1165 is_local = (
1166 entity_url.startswith('local:') or
1167 entity_path.is_dir() or
1168 entity_path.is_file()
1169 )
1170 if is_local:
1171 entity_id = entity_url.replace('local:', '')
1172 else:
1173 entity = await self.charmstore.entity(entity_url, channel=channel)
1174 entity_id = entity['Id']
1175
1176 client_facade = client.ClientFacade.from_connection(self.connection())
1177
1178 is_bundle = ((is_local and
1179 (entity_id.endswith('.yaml') and entity_path.exists()) or
1180 bundle_path.exists()) or
1181 (not is_local and 'bundle/' in entity_id))
1182
1183 if is_bundle:
1184 handler = BundleHandler(self)
1185 await handler.fetch_plan(entity_id)
1186 await handler.execute_plan()
1187 extant_apps = {app for app in self.applications}
1188 pending_apps = set(handler.applications) - extant_apps
1189 if pending_apps:
1190 # new apps will usually be in the model by now, but if some
1191 # haven't made it yet we'll need to wait on them to be added
1192 await asyncio.gather(*[
1193 asyncio.ensure_future(
1194 self._wait_for_new('application', app_name),
1195 loop=self._connector.loop)
1196 for app_name in pending_apps
1197 ], loop=self._connector.loop)
1198 return [app for name, app in self.applications.items()
1199 if name in handler.applications]
1200 else:
1201 if not is_local:
1202 if not application_name:
1203 application_name = entity['Meta']['charm-metadata']['Name']
1204 if not series:
1205 series = self._get_series(entity_url, entity)
1206 await client_facade.AddCharm(channel, entity_id)
1207 # XXX: we're dropping local resources here, but we don't
1208 # actually support them yet anyway
1209 resources = await self._add_store_resources(application_name,
1210 entity_id,
1211 entity)
1212 else:
1213 if not application_name:
1214 metadata = yaml.load(metadata_path.read_text())
1215 application_name = metadata['name']
1216 # We have a local charm dir that needs to be uploaded
1217 charm_dir = os.path.abspath(
1218 os.path.expanduser(entity_id))
1219 series = series or get_charm_series(charm_dir)
1220 if not series:
1221 raise JujuError(
1222 "Couldn't determine series for charm at {}. "
1223 "Pass a 'series' kwarg to Model.deploy().".format(
1224 charm_dir))
1225 entity_id = await self.add_local_charm_dir(charm_dir, series)
1226 return await self._deploy(
1227 charm_url=entity_id,
1228 application=application_name,
1229 series=series,
1230 config=config or {},
1231 constraints=constraints,
1232 endpoint_bindings=bind,
1233 resources=resources,
1234 storage=storage,
1235 channel=channel,
1236 num_units=num_units,
1237 placement=parse_placement(to)
1238 )
1239
1240 async def _add_store_resources(self, application, entity_url, entity=None):
1241 if not entity:
1242 # avoid extra charm store call if one was already made
1243 entity = await self.charmstore.entity(entity_url)
1244 resources = [
1245 {
1246 'description': resource['Description'],
1247 'fingerprint': resource['Fingerprint'],
1248 'name': resource['Name'],
1249 'path': resource['Path'],
1250 'revision': resource['Revision'],
1251 'size': resource['Size'],
1252 'type_': resource['Type'],
1253 'origin': 'store',
1254 } for resource in entity['Meta']['resources']
1255 ]
1256
1257 if not resources:
1258 return None
1259
1260 resources_facade = client.ResourcesFacade.from_connection(
1261 self.connection())
1262 response = await resources_facade.AddPendingResources(
1263 tag.application(application),
1264 entity_url,
1265 [client.CharmResource(**resource) for resource in resources])
1266 resource_map = {resource['name']: pid
1267 for resource, pid
1268 in zip(resources, response.pending_ids)}
1269 return resource_map
1270
1271 async def _deploy(self, charm_url, application, series, config,
1272 constraints, endpoint_bindings, resources, storage,
1273 channel=None, num_units=None, placement=None):
1274 """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
1275 """
1276 log.info('Deploying %s', charm_url)
1277
1278 # stringify all config values for API, and convert to YAML
1279 config = {k: str(v) for k, v in config.items()}
1280 config = yaml.dump({application: config},
1281 default_flow_style=False)
1282
1283 app_facade = client.ApplicationFacade.from_connection(
1284 self.connection())
1285
1286 app = client.ApplicationDeploy(
1287 charm_url=charm_url,
1288 application=application,
1289 series=series,
1290 channel=channel,
1291 config_yaml=config,
1292 constraints=parse_constraints(constraints),
1293 endpoint_bindings=endpoint_bindings,
1294 num_units=num_units,
1295 resources=resources,
1296 storage=storage,
1297 placement=placement
1298 )
1299 result = await app_facade.Deploy([app])
1300 errors = [r.error.message for r in result.results if r.error]
1301 if errors:
1302 raise JujuError('\n'.join(errors))
1303 return await self._wait_for_new('application', application)
1304
1305 async def destroy(self):
1306 """Terminate all machines and resources for this model.
1307 Is already implemented in controller.py.
1308 """
1309 raise NotImplementedError()
1310
1311 async def destroy_unit(self, *unit_names):
1312 """Destroy units by name.
1313
1314 """
1315 app_facade = client.ApplicationFacade.from_connection(self.connection())
1316
1317 log.debug(
1318 'Destroying unit%s %s',
1319 's' if len(unit_names) == 1 else '',
1320 ' '.join(unit_names))
1321
1322 return await app_facade.DestroyUnits(list(unit_names))
1323 destroy_units = destroy_unit
1324
1325 def get_backup(self, archive_id):
1326 """Download a backup archive file.
1327
1328 :param str archive_id: The id of the archive to download
1329 :return str: Path to the archive file
1330
1331 """
1332 raise NotImplementedError()
1333
1334 def enable_ha(
1335 self, num_controllers=0, constraints=None, series=None, to=None):
1336 """Ensure sufficient controllers exist to provide redundancy.
1337
1338 :param int num_controllers: Number of controllers to make available
1339 :param constraints: Constraints to apply to the controller machines
1340 :type constraints: :class:`juju.Constraints`
1341 :param str series: Series of the controller machines
1342 :param list to: Placement directives for controller machines, e.g.::
1343
1344 '23' - machine 23
1345 'lxc:7' - new lxc container on machine 7
1346 '24/lxc/3' - lxc container 3 or machine 24
1347
1348 If None, a new machine is provisioned.
1349
1350 """
1351 raise NotImplementedError()
1352
1353 async def get_config(self):
1354 """Return the configuration settings for this model.
1355
1356 :returns: A ``dict`` mapping keys to `ConfigValue` instances,
1357 which have `source` and `value` attributes.
1358 """
1359 config_facade = client.ModelConfigFacade.from_connection(
1360 self.connection()
1361 )
1362 result = await config_facade.ModelGet()
1363 config = result.config
1364 for key, value in config.items():
1365 config[key] = ConfigValue.from_json(value)
1366 return config
1367
1368 def get_constraints(self):
1369 """Return the machine constraints for this model.
1370
1371 """
1372 raise NotImplementedError()
1373
1374 def import_ssh_key(self, identity):
1375 """Add a public SSH key from a trusted indentity source to this model.
1376
1377 :param str identity: User identity in the form <lp|gh>:<username>
1378
1379 """
1380 raise NotImplementedError()
1381 import_ssh_keys = import_ssh_key
1382
1383 async def get_machines(self):
1384 """Return list of machines in this model.
1385
1386 """
1387 return list(self.state.machines.keys())
1388
1389 def get_shares(self):
1390 """Return list of all users with access to this model.
1391
1392 """
1393 raise NotImplementedError()
1394
1395 def get_spaces(self):
1396 """Return list of all known spaces, including associated subnets.
1397
1398 """
1399 raise NotImplementedError()
1400
1401 async def get_ssh_key(self, raw_ssh=False):
1402 """Return known SSH keys for this model.
1403 :param bool raw_ssh: if True, returns the raw ssh key,
1404 else it's fingerprint
1405
1406 """
1407 key_facade = client.KeyManagerFacade.from_connection(self.connection())
1408 entity = {'tag': tag.model(self.info.uuid)}
1409 entities = client.Entities([entity])
1410 return await key_facade.ListKeys(entities, raw_ssh)
1411 get_ssh_keys = get_ssh_key
1412
1413 def get_storage(self, filesystem=False, volume=False):
1414 """Return details of storage instances.
1415
1416 :param bool filesystem: Include filesystem storage
1417 :param bool volume: Include volume storage
1418
1419 """
1420 raise NotImplementedError()
1421
1422 def get_storage_pools(self, names=None, providers=None):
1423 """Return list of storage pools.
1424
1425 :param list names: Only include pools with these names
1426 :param list providers: Only include pools for these providers
1427
1428 """
1429 raise NotImplementedError()
1430
1431 def get_subnets(self, space=None, zone=None):
1432 """Return list of known subnets.
1433
1434 :param str space: Only include subnets in this space
1435 :param str zone: Only include subnets in this zone
1436
1437 """
1438 raise NotImplementedError()
1439
1440 def remove_blocks(self):
1441 """Remove all blocks from this model.
1442
1443 """
1444 raise NotImplementedError()
1445
1446 def remove_backup(self, backup_id):
1447 """Delete a backup.
1448
1449 :param str backup_id: The id of the backup to remove
1450
1451 """
1452 raise NotImplementedError()
1453
1454 def remove_cached_images(self, arch=None, kind=None, series=None):
1455 """Remove cached OS images.
1456
1457 :param str arch: Architecture of the images to remove
1458 :param str kind: Image kind to remove, e.g. 'lxd'
1459 :param str series: Image series to remove, e.g. 'xenial'
1460
1461 """
1462 raise NotImplementedError()
1463
1464 def remove_machine(self, *machine_ids):
1465 """Remove a machine from this model.
1466
1467 :param str \*machine_ids: Ids of the machines to remove
1468
1469 """
1470 raise NotImplementedError()
1471 remove_machines = remove_machine
1472
1473 async def remove_ssh_key(self, user, key):
1474 """Remove a public SSH key(s) from this model.
1475
1476 :param str key: Full ssh key
1477 :param str user: Juju user to which the key is registered
1478
1479 """
1480 key_facade = client.KeyManagerFacade.from_connection(self.connection())
1481 key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
1482 key = hashlib.md5(key).hexdigest()
1483 key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
1484 await key_facade.DeleteKeys([key], user)
1485 remove_ssh_keys = remove_ssh_key
1486
1487 def restore_backup(
1488 self, bootstrap=False, constraints=None, archive=None,
1489 backup_id=None, upload_tools=False):
1490 """Restore a backup archive to a new controller.
1491
1492 :param bool bootstrap: Bootstrap a new state machine
1493 :param constraints: Model constraints
1494 :type constraints: :class:`juju.Constraints`
1495 :param str archive: Path to backup archive to restore
1496 :param str backup_id: Id of backup to restore
1497 :param bool upload_tools: Upload tools if bootstrapping a new machine
1498
1499 """
1500 raise NotImplementedError()
1501
1502 def retry_provisioning(self):
1503 """Retry provisioning for failed machines.
1504
1505 """
1506 raise NotImplementedError()
1507
1508 def run(self, command, timeout=None):
1509 """Run command on all machines in this model.
1510
1511 :param str command: The command to run
1512 :param int timeout: Time to wait before command is considered failed
1513
1514 """
1515 raise NotImplementedError()
1516
1517 async def set_config(self, config):
1518 """Set configuration keys on this model.
1519
1520 :param dict config: Mapping of config keys to either string values or
1521 `ConfigValue` instances, as returned by `get_config`.
1522 """
1523 config_facade = client.ModelConfigFacade.from_connection(
1524 self.connection()
1525 )
1526 for key, value in config.items():
1527 if isinstance(value, ConfigValue):
1528 config[key] = value.value
1529 await config_facade.ModelSet(config)
1530
1531 def set_constraints(self, constraints):
1532 """Set machine constraints on this model.
1533
1534 :param :class:`juju.Constraints` constraints: Machine constraints
1535
1536 """
1537 raise NotImplementedError()
1538
1539 def get_action_output(self, action_uuid, wait=-1):
1540 """Get the results of an action by ID.
1541
1542 :param str action_uuid: Id of the action
1543 :param int wait: Time in seconds to wait for action to complete
1544
1545 """
1546 raise NotImplementedError()
1547
1548 def get_action_status(self, uuid_or_prefix=None, name=None):
1549 """Get the status of all actions, filtered by ID, ID prefix, or action name.
1550
1551 :param str uuid_or_prefix: Filter by action uuid or prefix
1552 :param str name: Filter by action name
1553
1554 """
1555 raise NotImplementedError()
1556
1557 def get_budget(self, budget_name):
1558 """Get budget usage info.
1559
1560 :param str budget_name: Name of budget
1561
1562 """
1563 raise NotImplementedError()
1564
1565 async def get_status(self, filters=None, utc=False):
1566 """Return the status of the model.
1567
1568 :param str filters: Optional list of applications, units, or machines
1569 to include, which can use wildcards ('*').
1570 :param bool utc: Display time as UTC in RFC3339 format
1571
1572 """
1573 client_facade = client.ClientFacade.from_connection(self.connection())
1574 return await client_facade.FullStatus(filters)
1575
1576 def sync_tools(
1577 self, all_=False, destination=None, dry_run=False, public=False,
1578 source=None, stream=None, version=None):
1579 """Copy Juju tools into this model.
1580
1581 :param bool all_: Copy all versions, not just the latest
1582 :param str destination: Path to local destination directory
1583 :param bool dry_run: Don't do the actual copy
1584 :param bool public: Tools are for a public cloud, so generate mirrors
1585 information
1586 :param str source: Path to local source directory
1587 :param str stream: Simplestreams stream for which to sync metadata
1588 :param str version: Copy a specific major.minor version
1589
1590 """
1591 raise NotImplementedError()
1592
1593 def unblock(self, *commands):
1594 """Unblock an operation that would alter this model.
1595
1596 :param str \*commands: The commands to unblock. Valid values are
1597 'all-changes', 'destroy-model', 'remove-object'
1598
1599 """
1600 raise NotImplementedError()
1601
1602 def unset_config(self, *keys):
1603 """Unset configuration on this model.
1604
1605 :param str \*keys: The keys to unset
1606
1607 """
1608 raise NotImplementedError()
1609
1610 def upgrade_gui(self):
1611 """Upgrade the Juju GUI for this model.
1612
1613 """
1614 raise NotImplementedError()
1615
1616 def upgrade_juju(
1617 self, dry_run=False, reset_previous_upgrade=False,
1618 upload_tools=False, version=None):
1619 """Upgrade Juju on all machines in a model.
1620
1621 :param bool dry_run: Don't do the actual upgrade
1622 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1623 upgrade status
1624 :param bool upload_tools: Upload local version of tools
1625 :param str version: Upgrade to a specific version
1626
1627 """
1628 raise NotImplementedError()
1629
1630 def upload_backup(self, archive_path):
1631 """Store a backup archive remotely in Juju.
1632
1633 :param str archive_path: Path to local archive
1634
1635 """
1636 raise NotImplementedError()
1637
1638 @property
1639 def charmstore(self):
1640 return self._charmstore
1641
1642 async def get_metrics(self, *tags):
1643 """Retrieve metrics.
1644
1645 :param str \*tags: Tags of entities from which to retrieve metrics.
1646 No tags retrieves the metrics of all units in the model.
1647 :return: Dictionary of unit_name:metrics
1648
1649 """
1650 log.debug("Retrieving metrics for %s",
1651 ', '.join(tags) if tags else "all units")
1652
1653 metrics_facade = client.MetricsDebugFacade.from_connection(
1654 self.connection())
1655
1656 entities = [client.Entity(tag) for tag in tags]
1657 metrics_result = await metrics_facade.GetMetrics(entities)
1658
1659 metrics = collections.defaultdict(list)
1660
1661 for entity_metrics in metrics_result.results:
1662 error = entity_metrics.error
1663 if error:
1664 if "is not a valid tag" in error:
1665 raise ValueError(error.message)
1666 else:
1667 raise Exception(error.message)
1668
1669 for metric in entity_metrics.metrics:
1670 metrics[metric.unit].append(vars(metric))
1671
1672 return metrics
1673
1674
1675 def get_charm_series(path):
1676 """Inspects the charm directory at ``path`` and returns a default
1677 series from its metadata.yaml (the first item in the 'series' list).
1678
1679 Returns None if no series can be determined.
1680
1681 """
1682 md = Path(path) / "metadata.yaml"
1683 if not md.exists():
1684 return None
1685 data = yaml.load(md.open())
1686 series = data.get('series')
1687 return series[0] if series else None
1688
1689
1690 class BundleHandler:
1691 """
1692 Handle bundles by using the API to translate bundle YAML into a plan of
1693 steps and then dispatching each of those using the API.
1694 """
1695 def __init__(self, model):
1696 self.model = model
1697 self.charmstore = model.charmstore
1698 self.plan = []
1699 self.references = {}
1700 self._units_by_app = {}
1701 for unit_name, unit in model.units.items():
1702 app_units = self._units_by_app.setdefault(unit.application, [])
1703 app_units.append(unit_name)
1704 self.client_facade = client.ClientFacade.from_connection(
1705 model.connection())
1706 self.app_facade = client.ApplicationFacade.from_connection(
1707 model.connection())
1708 self.ann_facade = client.AnnotationsFacade.from_connection(
1709 model.connection())
1710
1711 async def _handle_local_charms(self, bundle):
1712 """Search for references to local charms (i.e. filesystem paths)
1713 in the bundle. Upload the local charms to the model, and replace
1714 the filesystem paths with appropriate 'local:' paths in the bundle.
1715
1716 Return the modified bundle.
1717
1718 :param dict bundle: Bundle dictionary
1719 :return: Modified bundle dictionary
1720
1721 """
1722 apps, args = [], []
1723
1724 default_series = bundle.get('series')
1725 apps_dict = bundle.get('applications', bundle.get('services', {}))
1726 for app_name in self.applications:
1727 app_dict = apps_dict[app_name]
1728 charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
1729 if not os.path.isdir(charm_dir):
1730 continue
1731 series = (
1732 app_dict.get('series') or
1733 default_series or
1734 get_charm_series(charm_dir)
1735 )
1736 if not series:
1737 raise JujuError(
1738 "Couldn't determine series for charm at {}. "
1739 "Add a 'series' key to the bundle.".format(charm_dir))
1740
1741 # Keep track of what we need to update. We keep a list of apps
1742 # that need to be updated, and a corresponding list of args
1743 # needed to update those apps.
1744 apps.append(app_name)
1745 args.append((charm_dir, series))
1746
1747 if apps:
1748 # If we have apps to update, spawn all the coroutines concurrently
1749 # and wait for them to finish.
1750 charm_urls = await asyncio.gather(*[
1751 self.model.add_local_charm_dir(*params)
1752 for params in args
1753 ], loop=self.model.loop)
1754 # Update the 'charm:' entry for each app with the new 'local:' url.
1755 for app_name, charm_url in zip(apps, charm_urls):
1756 apps_dict[app_name]['charm'] = charm_url
1757
1758 return bundle
1759
1760 async def fetch_plan(self, entity_id):
1761 is_local = not entity_id.startswith('cs:')
1762
1763 if is_local and os.path.isfile(entity_id):
1764 bundle_yaml = Path(entity_id).read_text()
1765 elif is_local and os.path.isdir(entity_id):
1766 bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
1767 else:
1768 bundle_yaml = await self.charmstore.files(entity_id,
1769 filename='bundle.yaml',
1770 read_file=True)
1771 self.bundle = yaml.safe_load(bundle_yaml)
1772 self.bundle = await self._handle_local_charms(self.bundle)
1773
1774 self.plan = await self.client_facade.GetBundleChanges(
1775 yaml.dump(self.bundle))
1776
1777 async def execute_plan(self):
1778 for step in self.plan.changes:
1779 method = getattr(self, step.method)
1780 result = await method(*step.args)
1781 self.references[step.id_] = result
1782
1783 @property
1784 def applications(self):
1785 apps_dict = self.bundle.get('applications',
1786 self.bundle.get('services', {}))
1787 return list(apps_dict.keys())
1788
1789 def resolve(self, reference):
1790 if reference and reference.startswith('$'):
1791 reference = self.references[reference[1:]]
1792 return reference
1793
1794 async def addCharm(self, charm, series):
1795 """
1796 :param charm string:
1797 Charm holds the URL of the charm to be added.
1798
1799 :param series string:
1800 Series holds the series of the charm to be added
1801 if the charm default is not sufficient.
1802 """
1803 # We don't add local charms because they've already been added
1804 # by self._handle_local_charms
1805 if charm.startswith('local:'):
1806 return charm
1807
1808 entity_id = await self.charmstore.entityId(charm)
1809 log.debug('Adding %s', entity_id)
1810 await self.client_facade.AddCharm(None, entity_id)
1811 return entity_id
1812
1813 async def addMachines(self, params=None):
1814 """
1815 :param params dict:
1816 Dictionary specifying the machine to add. All keys are optional.
1817 Keys include:
1818
1819 series: string specifying the machine OS series.
1820
1821 constraints: string holding machine constraints, if any. We'll
1822 parse this into the json friendly dict that the juju api
1823 expects.
1824
1825 container_type: string holding the type of the container (for
1826 instance ""lxd" or kvm"). It is not specified for top level
1827 machines.
1828
1829 parent_id: string holding a placeholder pointing to another
1830 machine change or to a unit change. This value is only
1831 specified in the case this machine is a container, in
1832 which case also ContainerType is set.
1833
1834 """
1835 params = params or {}
1836
1837 # Normalize keys
1838 params = {normalize_key(k): params[k] for k in params.keys()}
1839
1840 # Fix up values, as necessary.
1841 if 'parent_id' in params:
1842 params['parent_id'] = self.resolve(params['parent_id'])
1843
1844 params['constraints'] = parse_constraints(
1845 params.get('constraints'))
1846 params['jobs'] = params.get('jobs', ['JobHostUnits'])
1847
1848 if params.get('container_type') == 'lxc':
1849 log.warning('Juju 2.0 does not support lxc containers. '
1850 'Converting containers to lxd.')
1851 params['container_type'] = 'lxd'
1852
1853 # Submit the request.
1854 params = client.AddMachineParams(**params)
1855 results = await self.client_facade.AddMachines([params])
1856 error = results.machines[0].error
1857 if error:
1858 raise ValueError("Error adding machine: %s" % error.message)
1859 machine = results.machines[0].machine
1860 log.debug('Added new machine %s', machine)
1861 return machine
1862
1863 async def addRelation(self, endpoint1, endpoint2):
1864 """
1865 :param endpoint1 string:
1866 :param endpoint2 string:
1867 Endpoint1 and Endpoint2 hold relation endpoints in the
1868 "application:interface" form, where the application is always a
1869 placeholder pointing to an application change, and the interface is
1870 optional. Examples are "$deploy-42:web" or just "$deploy-42".
1871 """
1872 endpoints = [endpoint1, endpoint2]
1873 # resolve indirect references
1874 for i in range(len(endpoints)):
1875 parts = endpoints[i].split(':')
1876 parts[0] = self.resolve(parts[0])
1877 endpoints[i] = ':'.join(parts)
1878
1879 log.info('Relating %s <-> %s', *endpoints)
1880 return await self.model.add_relation(*endpoints)
1881
1882 async def deploy(self, charm, series, application, options, constraints,
1883 storage, endpoint_bindings, resources):
1884 """
1885 :param charm string:
1886 Charm holds the URL of the charm to be used to deploy this
1887 application.
1888
1889 :param series string:
1890 Series holds the series of the application to be deployed
1891 if the charm default is not sufficient.
1892
1893 :param application string:
1894 Application holds the application name.
1895
1896 :param options map[string]interface{}:
1897 Options holds application options.
1898
1899 :param constraints string:
1900 Constraints holds the optional application constraints.
1901
1902 :param storage map[string]string:
1903 Storage holds the optional storage constraints.
1904
1905 :param endpoint_bindings map[string]string:
1906 EndpointBindings holds the optional endpoint bindings
1907
1908 :param resources map[string]int:
1909 Resources identifies the revision to use for each resource
1910 of the application's charm.
1911 """
1912 # resolve indirect references
1913 charm = self.resolve(charm)
1914 # the bundle plan doesn't actually do anything with resources, even
1915 # though it ostensibly gives us something (None) for that param
1916 if not charm.startswith('local:'):
1917 resources = await self.model._add_store_resources(application,
1918 charm)
1919 await self.model._deploy(
1920 charm_url=charm,
1921 application=application,
1922 series=series,
1923 config=options,
1924 constraints=constraints,
1925 endpoint_bindings=endpoint_bindings,
1926 resources=resources,
1927 storage=storage,
1928 )
1929 return application
1930
1931 async def addUnit(self, application, to):
1932 """
1933 :param application string:
1934 Application holds the application placeholder name for which a unit
1935 is added.
1936
1937 :param to string:
1938 To holds the optional location where to add the unit, as a
1939 placeholder pointing to another unit change or to a machine change.
1940 """
1941 application = self.resolve(application)
1942 placement = self.resolve(to)
1943 if self._units_by_app.get(application):
1944 # enough units for this application already exist;
1945 # claim one, and carry on
1946 # NB: this should probably honor placement, but the juju client
1947 # doesn't, so we're not bothering, either
1948 unit_name = self._units_by_app[application].pop()
1949 log.debug('Reusing unit %s for %s', unit_name, application)
1950 return self.model.units[unit_name]
1951
1952 log.debug('Adding new unit for %s%s', application,
1953 ' to %s' % placement if placement else '')
1954 return await self.model.applications[application].add_unit(
1955 count=1,
1956 to=placement,
1957 )
1958
1959 async def expose(self, application):
1960 """
1961 :param application string:
1962 Application holds the placeholder name of the application that must
1963 be exposed.
1964 """
1965 application = self.resolve(application)
1966 log.info('Exposing %s', application)
1967 return await self.model.applications[application].expose()
1968
1969 async def setAnnotations(self, id_, entity_type, annotations):
1970 """
1971 :param id_ string:
1972 Id is the placeholder for the application or machine change
1973 corresponding to the entity to be annotated.
1974
1975 :param entity_type EntityType:
1976 EntityType holds the type of the entity, "application" or
1977 "machine".
1978
1979 :param annotations map[string]string:
1980 Annotations holds the annotations as key/value pairs.
1981 """
1982 entity_id = self.resolve(id_)
1983 try:
1984 entity = self.model.state.get_entity(entity_type, entity_id)
1985 except KeyError:
1986 entity = await self.model._wait_for_new(entity_type, entity_id)
1987 return await entity.set_annotations(annotations)
1988
1989
1990 class CharmStore:
1991 """
1992 Async wrapper around theblues.charmstore.CharmStore
1993 """
1994 def __init__(self, loop):
1995 self.loop = loop
1996 self._cs = theblues.charmstore.CharmStore(timeout=5)
1997
1998 def __getattr__(self, name):
1999 """
2000 Wrap method calls in coroutines that use run_in_executor to make them
2001 async.
2002 """
2003 attr = getattr(self._cs, name)
2004 if not callable(attr):
2005 wrapper = partial(getattr, self._cs, name)
2006 setattr(self, name, wrapper)
2007 else:
2008 async def coro(*args, **kwargs):
2009 method = partial(attr, *args, **kwargs)
2010 for attempt in range(1, 4):
2011 try:
2012 return await self.loop.run_in_executor(None, method)
2013 except theblues.errors.ServerError:
2014 if attempt == 3:
2015 raise
2016 await asyncio.sleep(1, loop=self.loop)
2017 setattr(self, name, coro)
2018 wrapper = coro
2019 return wrapper
2020
2021
2022 class CharmArchiveGenerator:
2023 """
2024 Create a Zip archive of a local charm directory for upload to a controller.
2025
2026 This is used automatically by
2027 `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
2028 """
2029 def __init__(self, path):
2030 self.path = os.path.abspath(os.path.expanduser(path))
2031
2032 def make_archive(self, path):
2033 """Create archive of directory and write to ``path``.
2034
2035 :param path: Path to archive
2036
2037 Ignored::
2038
2039 * build/\* - This is used for packing the charm itself and any
2040 similar tasks.
2041 * \*/.\* - Hidden files are all ignored for now. This will most
2042 likely be changed into a specific ignore list
2043 (.bzr, etc)
2044
2045 """
2046 zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
2047 for dirpath, dirnames, filenames in os.walk(self.path):
2048 relative_path = dirpath[len(self.path) + 1:]
2049 if relative_path and not self._ignore(relative_path):
2050 zf.write(dirpath, relative_path)
2051 for name in filenames:
2052 archive_name = os.path.join(relative_path, name)
2053 if not self._ignore(archive_name):
2054 real_path = os.path.join(dirpath, name)
2055 self._check_type(real_path)
2056 if os.path.islink(real_path):
2057 self._check_link(real_path)
2058 self._write_symlink(
2059 zf, os.readlink(real_path), archive_name)
2060 else:
2061 zf.write(real_path, archive_name)
2062 zf.close()
2063 return path
2064
2065 def _check_type(self, path):
2066 """Check the path
2067 """
2068 s = os.stat(path)
2069 if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
2070 return path
2071 raise ValueError("Invalid Charm at % %s" % (
2072 path, "Invalid file type for a charm"))
2073
2074 def _check_link(self, path):
2075 link_path = os.readlink(path)
2076 if link_path[0] == "/":
2077 raise ValueError(
2078 "Invalid Charm at %s: %s" % (
2079 path, "Absolute links are invalid"))
2080 path_dir = os.path.dirname(path)
2081 link_path = os.path.join(path_dir, link_path)
2082 if not link_path.startswith(os.path.abspath(self.path)):
2083 raise ValueError(
2084 "Invalid charm at %s %s" % (
2085 path, "Only internal symlinks are allowed"))
2086
2087 def _write_symlink(self, zf, link_target, link_path):
2088 """Package symlinks with appropriate zipfile metadata."""
2089 info = zipfile.ZipInfo()
2090 info.filename = link_path
2091 info.create_system = 3
2092 # Magic code for symlinks / py2/3 compat
2093 # 27166663808 = (stat.S_IFLNK | 0755) << 16
2094 info.external_attr = 2716663808
2095 zf.writestr(info, link_target)
2096
2097 def _ignore(self, path):
2098 if path == "build" or path.startswith("build/"):
2099 return True
2100 if path.startswith('.'):
2101 return True