K8s Juju connector
[osm/N2VC.git] / modules / libjuju / juju / model.py
1 import asyncio
2 import base64
3 import collections
4 import hashlib
5 import json
6 import logging
7 import os
8 import re
9 import stat
10 import tempfile
11 import weakref
12 import zipfile
13 from concurrent.futures import CancelledError
14 from functools import partial
15 from pathlib import Path
16
17 import theblues.charmstore
18 import theblues.errors
19 import websockets
20 import yaml
21
22 from . import tag, utils
23 from .client import client, connector
24 from .client.client import ConfigValue
25 from .client.client import Value
26 from .constraints import parse as parse_constraints
27 from .constraints import normalize_key
28 from .delta import get_entity_class, get_entity_delta
29 from .errors import JujuAPIError, JujuError
30 from .exceptions import DeadEntityException
31 from .placement import parse as parse_placement
32 from . import provisioner
33
34
35 log = logging.getLogger(__name__)
36
37
38 class _Observer:
39 """Wrapper around an observer callable.
40
41 This wrapper allows filter criteria to be associated with the
42 callable so that it's only called for changes that meet the criteria.
43
44 """
45 def __init__(self, callable_, entity_type, action, entity_id, predicate):
46 self.callable_ = callable_
47 self.entity_type = entity_type
48 self.action = action
49 self.entity_id = entity_id
50 self.predicate = predicate
51 if self.entity_id:
52 self.entity_id = str(self.entity_id)
53 if not self.entity_id.startswith('^'):
54 self.entity_id = '^' + self.entity_id
55 if not self.entity_id.endswith('$'):
56 self.entity_id += '$'
57
58 async def __call__(self, delta, old, new, model):
59 await self.callable_(delta, old, new, model)
60
61 def cares_about(self, delta):
62 """Return True if this observer "cares about" (i.e. wants to be
63 called) for a this delta.
64
65 """
66 if (self.entity_id and delta.get_id() and
67 not re.match(self.entity_id, str(delta.get_id()))):
68 return False
69
70 if self.entity_type and self.entity_type != delta.entity:
71 return False
72
73 if self.action and self.action != delta.type:
74 return False
75
76 if self.predicate and not self.predicate(delta):
77 return False
78
79 return True
80
81
82 class ModelObserver:
83 """
84 Base class for creating observers that react to changes in a model.
85 """
86 async def __call__(self, delta, old, new, model):
87 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
88 method = getattr(self, handler_name, self.on_change)
89 await method(delta, old, new, model)
90
91 async def on_change(self, delta, old, new, model):
92 """Generic model-change handler.
93
94 This should be overridden in a subclass.
95
96 :param delta: :class:`juju.client.overrides.Delta`
97 :param old: :class:`juju.model.ModelEntity`
98 :param new: :class:`juju.model.ModelEntity`
99 :param model: :class:`juju.model.Model`
100
101 """
102 pass
103
104
105 class ModelState:
106 """Holds the state of the model, including the delta history of all
107 entities in the model.
108
109 """
110 def __init__(self, model):
111 self.model = model
112 self.state = dict()
113
114 def _live_entity_map(self, entity_type):
115 """Return an id:Entity map of all the living entities of
116 type ``entity_type``.
117
118 """
119 return {
120 entity_id: self.get_entity(entity_type, entity_id)
121 for entity_id, history in self.state.get(entity_type, {}).items()
122 if history[-1] is not None
123 }
124
125 @property
126 def applications(self):
127 """Return a map of application-name:Application for all applications
128 currently in the model.
129
130 """
131 return self._live_entity_map('application')
132
133 @property
134 def machines(self):
135 """Return a map of machine-id:Machine for all machines currently in
136 the model.
137
138 """
139 return self._live_entity_map('machine')
140
141 @property
142 def units(self):
143 """Return a map of unit-id:Unit for all units currently in
144 the model.
145
146 """
147 return self._live_entity_map('unit')
148
149 @property
150 def relations(self):
151 """Return a map of relation-id:Relation for all relations currently in
152 the model.
153
154 """
155 return self._live_entity_map('relation')
156
157 def entity_history(self, entity_type, entity_id):
158 """Return the history deque for an entity.
159
160 """
161 return self.state[entity_type][entity_id]
162
163 def entity_data(self, entity_type, entity_id, history_index):
164 """Return the data dict for an entity at a specific index of its
165 history.
166
167 """
168 return self.entity_history(entity_type, entity_id)[history_index]
169
170 def apply_delta(self, delta):
171 """Apply delta to our state and return a copy of the
172 affected object as it was before and after the update, e.g.:
173
174 old_obj, new_obj = self.apply_delta(delta)
175
176 old_obj may be None if the delta is for the creation of a new object,
177 e.g. a new application or unit is deployed.
178
179 new_obj will never be None, but may be dead (new_obj.dead == True)
180 if the object was deleted as a result of the delta being applied.
181
182 """
183 history = (
184 self.state
185 .setdefault(delta.entity, {})
186 .setdefault(delta.get_id(), collections.deque())
187 )
188
189 history.append(delta.data)
190 if delta.type == 'remove':
191 history.append(None)
192
193 entity = self.get_entity(delta.entity, delta.get_id())
194 return entity.previous(), entity
195
196 def get_entity(
197 self, entity_type, entity_id, history_index=-1, connected=True):
198 """Return an object instance for the given entity_type and id.
199
200 By default the object state matches the most recent state from
201 Juju. To get an instance of the object in an older state, pass
202 history_index, an index into the history deque for the entity.
203
204 """
205
206 if history_index < 0 and history_index != -1:
207 history_index += len(self.entity_history(entity_type, entity_id))
208 if history_index < 0:
209 return None
210
211 try:
212 self.entity_data(entity_type, entity_id, history_index)
213 except IndexError:
214 return None
215
216 entity_class = get_entity_class(entity_type)
217 return entity_class(
218 entity_id, self.model, history_index=history_index,
219 connected=connected)
220
221
222 class ModelEntity:
223 """An object in the Model tree"""
224
225 def __init__(self, entity_id, model, history_index=-1, connected=True):
226 """Initialize a new entity
227
228 :param entity_id str: The unique id of the object in the model
229 :param model: The model instance in whose object tree this
230 entity resides
231 :history_index int: The index of this object's state in the model's
232 history deque for this entity
233 :connected bool: Flag indicating whether this object gets live updates
234 from the model.
235
236 """
237 self.entity_id = entity_id
238 self.model = model
239 self._history_index = history_index
240 self.connected = connected
241 self.connection = model.connection()
242
243 def __repr__(self):
244 return '<{} entity_id="{}">'.format(type(self).__name__,
245 self.entity_id)
246
247 def __getattr__(self, name):
248 """Fetch object attributes from the underlying data dict held in the
249 model.
250
251 """
252 try:
253 return self.safe_data[name]
254 except KeyError:
255 name = name.replace('_', '-')
256 if name in self.safe_data:
257 return self.safe_data[name]
258 else:
259 raise
260
261 def __bool__(self):
262 return bool(self.data)
263
264 def on_change(self, callable_):
265 """Add a change observer to this entity.
266
267 """
268 self.model.add_observer(
269 callable_, self.entity_type, 'change', self.entity_id)
270
271 def on_remove(self, callable_):
272 """Add a remove observer to this entity.
273
274 """
275 self.model.add_observer(
276 callable_, self.entity_type, 'remove', self.entity_id)
277
278 @property
279 def entity_type(self):
280 """A string identifying the entity type of this object, e.g.
281 'application' or 'unit', etc.
282
283 """
284 return self.__class__.__name__.lower()
285
286 @property
287 def current(self):
288 """Return True if this object represents the current state of the
289 entity in the underlying model.
290
291 This will be True except when the object represents an entity at a
292 non-latest state in history, e.g. if the object was obtained by calling
293 .previous() on another object.
294
295 """
296 return self._history_index == -1
297
298 @property
299 def dead(self):
300 """Returns True if this entity no longer exists in the underlying
301 model.
302
303 """
304 return (
305 self.data is None or
306 self.model.state.entity_data(
307 self.entity_type, self.entity_id, -1) is None
308 )
309
310 @property
311 def alive(self):
312 """Returns True if this entity still exists in the underlying
313 model.
314
315 """
316 return not self.dead
317
318 @property
319 def data(self):
320 """The data dictionary for this entity.
321
322 """
323 return self.model.state.entity_data(
324 self.entity_type, self.entity_id, self._history_index)
325
326 @property
327 def safe_data(self):
328 """The data dictionary for this entity.
329
330 If this `ModelEntity` points to the dead state, it will
331 raise `DeadEntityException`.
332
333 """
334 if self.data is None:
335 raise DeadEntityException(
336 "Entity {}:{} is dead - its attributes can no longer be "
337 "accessed. Use the .previous() method on this object to get "
338 "a copy of the object at its previous state.".format(
339 self.entity_type, self.entity_id))
340 return self.data
341
342 def previous(self):
343 """Return a copy of this object as was at its previous state in
344 history.
345
346 Returns None if this object is new (and therefore has no history).
347
348 The returned object is always "disconnected", i.e. does not receive
349 live updates.
350
351 """
352 return self.model.state.get_entity(
353 self.entity_type, self.entity_id, self._history_index - 1,
354 connected=False)
355
356 def next(self):
357 """Return a copy of this object at its next state in
358 history.
359
360 Returns None if this object is already the latest.
361
362 The returned object is "disconnected", i.e. does not receive
363 live updates, unless it is current (latest).
364
365 """
366 if self._history_index == -1:
367 return None
368
369 new_index = self._history_index + 1
370 connected = (
371 new_index == len(self.model.state.entity_history(
372 self.entity_type, self.entity_id)) - 1
373 )
374 return self.model.state.get_entity(
375 self.entity_type, self.entity_id, self._history_index - 1,
376 connected=connected)
377
378 def latest(self):
379 """Return a copy of this object at its current state in the model.
380
381 Returns self if this object is already the latest.
382
383 The returned object is always "connected", i.e. receives
384 live updates from the model.
385
386 """
387 if self._history_index == -1:
388 return self
389
390 return self.model.state.get_entity(self.entity_type, self.entity_id)
391
392
393 class Model:
394 """
395 The main API for interacting with a Juju model.
396 """
397 def __init__(
398 self,
399 loop=None,
400 max_frame_size=None,
401 bakery_client=None,
402 jujudata=None,
403 ):
404 """Instantiate a new Model.
405
406 The connect method will need to be called before this
407 object can be used for anything interesting.
408
409 If jujudata is None, jujudata.FileJujuData will be used.
410
411 :param loop: an asyncio event loop
412 :param max_frame_size: See
413 `juju.client.connection.Connection.MAX_FRAME_SIZE`
414 :param bakery_client httpbakery.Client: The bakery client to use
415 for macaroon authorization.
416 :param jujudata JujuData: The source for current controller information
417 """
418 self._connector = connector.Connector(
419 loop=loop,
420 max_frame_size=max_frame_size,
421 bakery_client=bakery_client,
422 jujudata=jujudata,
423 )
424 self._observers = weakref.WeakValueDictionary()
425 self.state = ModelState(self)
426 self._info = None
427 self._watch_stopping = asyncio.Event(loop=self._connector.loop)
428 self._watch_stopped = asyncio.Event(loop=self._connector.loop)
429 self._watch_received = asyncio.Event(loop=self._connector.loop)
430 self._watch_stopped.set()
431 self._charmstore = CharmStore(self._connector.loop)
432
433 def is_connected(self):
434 """Reports whether the Model is currently connected."""
435 return self._connector.is_connected()
436
437 @property
438 def loop(self):
439 return self._connector.loop
440
441 def connection(self):
442 """Return the current Connection object. It raises an exception
443 if the Model is disconnected"""
444 return self._connector.connection()
445
446 async def get_controller(self):
447 """Return a Controller instance for the currently connected model.
448 :return Controller:
449 """
450 from juju.controller import Controller
451 controller = Controller(jujudata=self._connector.jujudata)
452 kwargs = self.connection().connect_params()
453 kwargs.pop('uuid')
454 await controller._connect_direct(**kwargs)
455 return controller
456
457 async def __aenter__(self):
458 await self.connect()
459 return self
460
461 async def __aexit__(self, exc_type, exc, tb):
462 await self.disconnect()
463
464 async def connect(self, *args, **kwargs):
465 """Connect to a juju model.
466
467 This supports two calling conventions:
468
469 The model and (optionally) authentication information can be taken
470 from the data files created by the Juju CLI. This convention will
471 be used if a ``model_name`` is specified, or if the ``endpoint``
472 and ``uuid`` are not.
473
474 Otherwise, all of the ``endpoint``, ``uuid``, and authentication
475 information (``username`` and ``password``, or ``bakery_client`` and/or
476 ``macaroons``) are required.
477
478 If a single positional argument is given, it will be assumed to be
479 the ``model_name``. Otherwise, the first positional argument, if any,
480 must be the ``endpoint``.
481
482 Available parameters are:
483
484 :param model_name: Format [controller:][user/]model
485 :param str endpoint: The hostname:port of the controller to connect to.
486 :param str uuid: The model UUID to connect to.
487 :param str username: The username for controller-local users (or None
488 to use macaroon-based login.)
489 :param str password: The password for controller-local users.
490 :param str cacert: The CA certificate of the controller
491 (PEM formatted).
492 :param httpbakery.Client bakery_client: The macaroon bakery client to
493 to use when performing macaroon-based login. Macaroon tokens
494 acquired when logging will be saved to bakery_client.cookies.
495 If this is None, a default bakery_client will be used.
496 :param list macaroons: List of macaroons to load into the
497 ``bakery_client``.
498 :param asyncio.BaseEventLoop loop: The event loop to use for async
499 operations.
500 :param int max_frame_size: The maximum websocket frame size to allow.
501 """
502 await self.disconnect()
503 if 'endpoint' not in kwargs and len(args) < 2:
504 if args and 'model_name' in kwargs:
505 raise TypeError('connect() got multiple values for model_name')
506 elif args:
507 model_name = args[0]
508 else:
509 model_name = kwargs.pop('model_name', None)
510 await self._connector.connect_model(model_name, **kwargs)
511 else:
512 if 'model_name' in kwargs:
513 raise TypeError('connect() got values for both '
514 'model_name and endpoint')
515 if args and 'endpoint' in kwargs:
516 raise TypeError('connect() got multiple values for endpoint')
517 if len(args) < 2 and 'uuid' not in kwargs:
518 raise TypeError('connect() missing value for uuid')
519 has_userpass = (len(args) >= 4 or
520 {'username', 'password'}.issubset(kwargs))
521 has_macaroons = (len(args) >= 6 or not
522 {'bakery_client', 'macaroons'}.isdisjoint(kwargs))
523 if not (has_userpass or has_macaroons):
524 raise TypeError('connect() missing auth params')
525 arg_names = [
526 'endpoint',
527 'uuid',
528 'username',
529 'password',
530 'cacert',
531 'bakery_client',
532 'macaroons',
533 'loop',
534 'max_frame_size',
535 ]
536 for i, arg in enumerate(args):
537 kwargs[arg_names[i]] = arg
538 if not {'endpoint', 'uuid'}.issubset(kwargs):
539 raise ValueError('endpoint and uuid are required '
540 'if model_name not given')
541 if not ({'username', 'password'}.issubset(kwargs) or
542 {'bakery_client', 'macaroons'}.intersection(kwargs)):
543 raise ValueError('Authentication parameters are required '
544 'if model_name not given')
545 await self._connector.connect(**kwargs)
546 await self._after_connect()
547
548 async def connect_model(self, model_name):
549 """
550 .. deprecated:: 0.6.2
551 Use ``connect(model_name=model_name)`` instead.
552 """
553 return await self.connect(model_name=model_name)
554
555 async def connect_current(self):
556 """
557 .. deprecated:: 0.6.2
558 Use ``connect()`` instead.
559 """
560 return await self.connect()
561
562 async def _connect_direct(self, **kwargs):
563 await self.disconnect()
564 await self._connector.connect(**kwargs)
565 await self._after_connect()
566
567 async def _after_connect(self):
568 self._watch()
569
570 # Wait for the first packet of data from the AllWatcher,
571 # which contains all information on the model.
572 # TODO this means that we can't do anything until
573 # we've received all the model data, which might be
574 # a whole load of unneeded data if all the client wants
575 # to do is make one RPC call.
576 await self._watch_received.wait()
577
578 await self.get_info()
579
580 async def disconnect(self):
581 """Shut down the watcher task and close websockets.
582
583 """
584 if not self._watch_stopped.is_set():
585 log.debug('Stopping watcher task')
586 self._watch_stopping.set()
587 await self._watch_stopped.wait()
588 self._watch_stopping.clear()
589
590 if self.is_connected():
591 log.debug('Closing model connection')
592 await self._connector.disconnect()
593 self._info = None
594
595 async def add_local_charm_dir(self, charm_dir, series):
596 """Upload a local charm to the model.
597
598 This will automatically generate an archive from
599 the charm dir.
600
601 :param charm_dir: Path to the charm directory
602 :param series: Charm series
603
604 """
605 fh = tempfile.NamedTemporaryFile()
606 CharmArchiveGenerator(charm_dir).make_archive(fh.name)
607 with fh:
608 func = partial(
609 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
610 charm_url = await self._connector.loop.run_in_executor(None, func)
611
612 log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
613 return charm_url
614
615 def add_local_charm(self, charm_file, series, size=None):
616 """Upload a local charm archive to the model.
617
618 Returns the 'local:...' url that should be used to deploy the charm.
619
620 :param charm_file: Path to charm zip archive
621 :param series: Charm series
622 :param size: Size of the archive, in bytes
623 :return str: 'local:...' url for deploying the charm
624 :raises: :class:`JujuError` if the upload fails
625
626 Uses an https endpoint at the same host:port as the wss.
627 Supports large file uploads.
628
629 .. warning::
630
631 This method will block. Consider using :meth:`add_local_charm_dir`
632 instead.
633
634 """
635 conn, headers, path_prefix = self.connection().https_connection()
636 path = "%s/charms?series=%s" % (path_prefix, series)
637 headers['Content-Type'] = 'application/zip'
638 if size:
639 headers['Content-Length'] = size
640 conn.request("POST", path, charm_file, headers)
641 response = conn.getresponse()
642 result = response.read().decode()
643 if not response.status == 200:
644 raise JujuError(result)
645 result = json.loads(result)
646 return result['charm-url']
647
648 def all_units_idle(self):
649 """Return True if all units are idle.
650
651 """
652 for unit in self.units.values():
653 unit_status = unit.data['agent-status']['current']
654 if unit_status != 'idle':
655 return False
656 return True
657
658 async def reset(self, force=False):
659 """Reset the model to a clean state.
660
661 :param bool force: Force-terminate machines.
662
663 This returns only after the model has reached a clean state. "Clean"
664 means no applications or machines exist in the model.
665
666 """
667 log.debug('Resetting model')
668 for app in self.applications.values():
669 await app.destroy()
670 for machine in self.machines.values():
671 await machine.destroy(force=force)
672 await self.block_until(
673 lambda: len(self.machines) == 0
674 )
675
676 async def block_until(self, *conditions, timeout=None, wait_period=0.5):
677 """Return only after all conditions are true.
678
679 Raises `websockets.ConnectionClosed` if disconnected.
680 """
681 def _disconnected():
682 return not (self.is_connected() and self.connection().is_open)
683
684 def done():
685 return _disconnected() or all(c() for c in conditions)
686
687 await utils.block_until(done,
688 timeout=timeout,
689 wait_period=wait_period,
690 loop=self.loop)
691 if _disconnected():
692 raise websockets.ConnectionClosed(1006, 'no reason')
693
694 @property
695 def applications(self):
696 """Return a map of application-name:Application for all applications
697 currently in the model.
698
699 """
700 return self.state.applications
701
702 @property
703 def machines(self):
704 """Return a map of machine-id:Machine for all machines currently in
705 the model.
706
707 """
708 return self.state.machines
709
710 @property
711 def units(self):
712 """Return a map of unit-id:Unit for all units currently in
713 the model.
714
715 """
716 return self.state.units
717
718 @property
719 def relations(self):
720 """Return a list of all Relations currently in the model.
721
722 """
723 return list(self.state.relations.values())
724
725 async def get_info(self):
726 """Return a client.ModelInfo object for this Model.
727
728 Retrieves latest info for this Model from the api server. The
729 return value is cached on the Model.info attribute so that the
730 valued may be accessed again without another api call, if
731 desired.
732
733 This method is called automatically when the Model is connected,
734 resulting in Model.info being initialized without requiring an
735 explicit call to this method.
736
737 """
738 facade = client.ClientFacade.from_connection(self.connection())
739
740 self._info = await facade.ModelInfo()
741 log.debug('Got ModelInfo: %s', vars(self.info))
742
743 return self.info
744
745 @property
746 def info(self):
747 """Return the cached client.ModelInfo object for this Model.
748
749 If Model.get_info() has not been called, this will return None.
750 """
751 return self._info
752
753 def add_observer(
754 self, callable_, entity_type=None, action=None, entity_id=None,
755 predicate=None):
756 """Register an "on-model-change" callback
757
758 Once the model is connected, ``callable_``
759 will be called each time the model changes. ``callable_`` should
760 be Awaitable and accept the following positional arguments:
761
762 delta - An instance of :class:`juju.delta.EntityDelta`
763 containing the raw delta data recv'd from the Juju
764 websocket.
765
766 old_obj - If the delta modifies an existing object in the model,
767 old_obj will be a copy of that object, as it was before the
768 delta was applied. Will be None if the delta creates a new
769 entity in the model.
770
771 new_obj - A copy of the new or updated object, after the delta
772 is applied. Will be None if the delta removes an entity
773 from the model.
774
775 model - The :class:`Model` itself.
776
777 Events for which ``callable_`` is called can be specified by passing
778 entity_type, action, and/or entitiy_id filter criteria, e.g.::
779
780 add_observer(
781 myfunc,
782 entity_type='application', action='add', entity_id='ubuntu')
783
784 For more complex filtering conditions, pass a predicate function. It
785 will be called with a delta as its only argument. If the predicate
786 function returns True, the ``callable_`` will be called.
787
788 """
789 observer = _Observer(
790 callable_, entity_type, action, entity_id, predicate)
791 self._observers[observer] = callable_
792
793 def _watch(self):
794 """Start an asynchronous watch against this model.
795
796 See :meth:`add_observer` to register an onchange callback.
797
798 """
799 async def _all_watcher():
800 try:
801 allwatcher = client.AllWatcherFacade.from_connection(
802 self.connection())
803 while not self._watch_stopping.is_set():
804 try:
805 results = await utils.run_with_interrupt(
806 allwatcher.Next(),
807 self._watch_stopping,
808 loop=self._connector.loop)
809 except JujuAPIError as e:
810 if 'watcher was stopped' not in str(e):
811 raise
812 if self._watch_stopping.is_set():
813 # this shouldn't ever actually happen, because
814 # the event should trigger before the controller
815 # has a chance to tell us the watcher is stopped
816 # but handle it gracefully, just in case
817 break
818 # controller stopped our watcher for some reason
819 # but we're not actually stopping, so just restart it
820 log.warning(
821 'Watcher: watcher stopped, restarting')
822 del allwatcher.Id
823 continue
824 except websockets.ConnectionClosed:
825 monitor = self.connection().monitor
826 if monitor.status == monitor.ERROR:
827 # closed unexpectedly, try to reopen
828 log.warning(
829 'Watcher: connection closed, reopening')
830 await self.connection().reconnect()
831 if monitor.status != monitor.CONNECTED:
832 # reconnect failed; abort and shutdown
833 log.error('Watcher: automatic reconnect '
834 'failed; stopping watcher')
835 break
836 del allwatcher.Id
837 continue
838 else:
839 # closed on request, go ahead and shutdown
840 break
841 if self._watch_stopping.is_set():
842 try:
843 await allwatcher.Stop()
844 except websockets.ConnectionClosed:
845 pass # can't stop on a closed conn
846 break
847 for delta in results.deltas:
848 try:
849 delta = get_entity_delta(delta)
850 old_obj, new_obj = self.state.apply_delta(delta)
851 await self._notify_observers(delta, old_obj, new_obj)
852 except KeyError as e:
853 log.debug("unknown delta type: %s", e.args[0])
854 self._watch_received.set()
855 except CancelledError:
856 pass
857 except Exception:
858 log.exception('Error in watcher')
859 raise
860 finally:
861 self._watch_stopped.set()
862
863 log.debug('Starting watcher task')
864 self._watch_received.clear()
865 self._watch_stopping.clear()
866 self._watch_stopped.clear()
867 self._connector.loop.create_task(_all_watcher())
868
869 async def _notify_observers(self, delta, old_obj, new_obj):
870 """Call observing callbacks, notifying them of a change in model state
871
872 :param delta: The raw change from the watcher
873 (:class:`juju.client.overrides.Delta`)
874 :param old_obj: The object in the model that this delta updates.
875 May be None.
876 :param new_obj: The object in the model that is created or updated
877 by applying this delta.
878
879 """
880 if new_obj and not old_obj:
881 delta.type = 'add'
882
883 log.debug(
884 'Model changed: %s %s %s',
885 delta.entity, delta.type, delta.get_id())
886
887 for o in self._observers:
888 if o.cares_about(delta):
889 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
890 loop=self._connector.loop)
891
892 async def _wait(self, entity_type, entity_id, action, predicate=None):
893 """
894 Block the calling routine until a given action has happened to the
895 given entity
896
897 :param entity_type: The entity's type.
898 :param entity_id: The entity's id.
899 :param action: the type of action (e.g., 'add', 'change', or 'remove')
900 :param predicate: optional callable that must take as an
901 argument a delta, and must return a boolean, indicating
902 whether the delta contains the specific action we're looking
903 for. For example, you might check to see whether a 'change'
904 has a 'completed' status. See the _Observer class for details.
905
906 """
907 q = asyncio.Queue(loop=self._connector.loop)
908
909 async def callback(delta, old, new, model):
910 await q.put(delta.get_id())
911
912 self.add_observer(callback, entity_type, action, entity_id, predicate)
913 entity_id = await q.get()
914 # object might not be in the entity_map if we were waiting for a
915 # 'remove' action
916 return self.state._live_entity_map(entity_type).get(entity_id)
917
918 async def _wait_for_new(self, entity_type, entity_id):
919 """Wait for a new object to appear in the Model and return it.
920
921 Waits for an object of type ``entity_type`` with id ``entity_id``
922 to appear in the model. This is similar to watching for the
923 object using ``block_until``, but uses the watcher rather than
924 polling.
925
926 """
927 # if the entity is already in the model, just return it
928 if entity_id in self.state._live_entity_map(entity_type):
929 return self.state._live_entity_map(entity_type)[entity_id]
930 return await self._wait(entity_type, entity_id, None)
931
932 async def wait_for_action(self, action_id):
933 """Given an action, wait for it to complete."""
934
935 if action_id.startswith("action-"):
936 # if we've been passed action.tag, transform it into the
937 # id that the api deltas will use.
938 action_id = action_id[7:]
939
940 def predicate(delta):
941 return delta.data['status'] in ('completed', 'failed')
942
943 return await self._wait('action', action_id, None, predicate)
944
945 async def add_machine(
946 self, spec=None, constraints=None, disks=None, series=None):
947 """Start a new, empty machine and optionally a container, or add a
948 container to a machine.
949
950 :param str spec: Machine specification
951 Examples::
952
953 (None) - starts a new machine
954 'lxd' - starts a new machine with one lxd container
955 'lxd:4' - starts a new lxd container on machine 4
956 'ssh:user@10.10.0.3:/path/to/private/key' - manually provision
957 a machine with ssh and the private key used for authentication
958 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
959 'maas2.name' - acquire machine maas2.name on MAAS
960
961 :param dict constraints: Machine constraints, which can contain the
962 the following keys::
963
964 arch : str
965 container : str
966 cores : int
967 cpu_power : int
968 instance_type : str
969 mem : int
970 root_disk : int
971 spaces : list(str)
972 tags : list(str)
973 virt_type : str
974
975 Example::
976
977 constraints={
978 'mem': 256 * MB,
979 'tags': ['virtual'],
980 }
981
982 :param list disks: List of disk constraint dictionaries, which can
983 contain the following keys::
984
985 count : int
986 pool : str
987 size : int
988
989 Example::
990
991 disks=[{
992 'pool': 'rootfs',
993 'size': 10 * GB,
994 'count': 1,
995 }]
996
997 :param str series: Series, e.g. 'xenial'
998
999 Supported container types are: lxd, kvm
1000
1001 When deploying a container to an existing machine, constraints cannot
1002 be used.
1003
1004 """
1005 params = client.AddMachineParams()
1006
1007 if spec:
1008 if spec.startswith("ssh:"):
1009 placement, target, private_key_path = spec.split(":")
1010 user, host = target.split("@")
1011
1012 sshProvisioner = provisioner.SSHProvisioner(
1013 host=host,
1014 user=user,
1015 private_key_path=private_key_path,
1016 )
1017
1018 params = sshProvisioner.provision_machine()
1019 else:
1020 placement = parse_placement(spec)
1021 if placement:
1022 params.placement = placement[0]
1023
1024 params.jobs = ['JobHostUnits']
1025
1026 if constraints:
1027 params.constraints = client.Value.from_json(constraints)
1028
1029 if disks:
1030 params.disks = [
1031 client.Constraints.from_json(o) for o in disks]
1032
1033 if series:
1034 params.series = series
1035
1036 # Submit the request.
1037 client_facade = client.ClientFacade.from_connection(self.connection())
1038 results = await client_facade.AddMachines([params])
1039 error = results.machines[0].error
1040 if error:
1041 raise ValueError("Error adding machine: %s" % error.message)
1042 machine_id = results.machines[0].machine
1043
1044 if spec:
1045 if spec.startswith("ssh:"):
1046 # Need to run this after AddMachines has been called,
1047 # as we need the machine_id
1048 await sshProvisioner.install_agent(
1049 self.connection(),
1050 params.nonce,
1051 machine_id,
1052 )
1053
1054 log.debug('Added new machine %s', machine_id)
1055 return await self._wait_for_new('machine', machine_id)
1056
1057 async def add_relation(self, relation1, relation2):
1058 """Add a relation between two applications.
1059
1060 :param str relation1: '<application>[:<relation_name>]'
1061 :param str relation2: '<application>[:<relation_name>]'
1062
1063 """
1064 connection = self.connection()
1065 app_facade = client.ApplicationFacade.from_connection(connection)
1066
1067 log.debug(
1068 'Adding relation %s <-> %s', relation1, relation2)
1069
1070 def _find_relation(*specs):
1071 for rel in self.relations:
1072 if rel.matches(*specs):
1073 return rel
1074 return None
1075
1076 try:
1077 result = await app_facade.AddRelation([relation1, relation2])
1078 except JujuAPIError as e:
1079 if 'relation already exists' not in e.message:
1080 raise
1081 rel = _find_relation(relation1, relation2)
1082 if rel:
1083 return rel
1084 raise JujuError('Relation {} {} exists but not in model'.format(
1085 relation1, relation2))
1086
1087 specs = ['{}:{}'.format(app, data['name'])
1088 for app, data in result.endpoints.items()]
1089
1090 await self.block_until(lambda: _find_relation(*specs) is not None)
1091 return _find_relation(*specs)
1092
1093 def add_space(self, name, *cidrs):
1094 """Add a new network space.
1095
1096 Adds a new space with the given name and associates the given
1097 (optional) list of existing subnet CIDRs with it.
1098
1099 :param str name: Name of the space
1100 :param *cidrs: Optional list of existing subnet CIDRs
1101
1102 """
1103 raise NotImplementedError()
1104
1105 async def add_ssh_key(self, user, key):
1106 """Add a public SSH key to this model.
1107
1108 :param str user: The username of the user
1109 :param str key: The public ssh key
1110
1111 """
1112 key_facade = client.KeyManagerFacade.from_connection(self.connection())
1113 return await key_facade.AddKeys([key], user)
1114 add_ssh_keys = add_ssh_key
1115
1116 def add_subnet(self, cidr_or_id, space, *zones):
1117 """Add an existing subnet to this model.
1118
1119 :param str cidr_or_id: CIDR or provider ID of the existing subnet
1120 :param str space: Network space with which to associate
1121 :param str *zones: Zone(s) in which the subnet resides
1122
1123 """
1124 raise NotImplementedError()
1125
1126 def get_backups(self):
1127 """Retrieve metadata for backups in this model.
1128
1129 """
1130 raise NotImplementedError()
1131
1132 def block(self, *commands):
1133 """Add a new block to this model.
1134
1135 :param str *commands: The commands to block. Valid values are
1136 'all-changes', 'destroy-model', 'remove-object'
1137
1138 """
1139 raise NotImplementedError()
1140
1141 def get_blocks(self):
1142 """List blocks for this model.
1143
1144 """
1145 raise NotImplementedError()
1146
1147 def get_cached_images(self, arch=None, kind=None, series=None):
1148 """Return a list of cached OS images.
1149
1150 :param str arch: Filter by image architecture
1151 :param str kind: Filter by image kind, e.g. 'lxd'
1152 :param str series: Filter by image series, e.g. 'xenial'
1153
1154 """
1155 raise NotImplementedError()
1156
1157 def create_backup(self, note=None, no_download=False):
1158 """Create a backup of this model.
1159
1160 :param str note: A note to store with the backup
1161 :param bool no_download: Do not download the backup archive
1162 :return str: Path to downloaded archive
1163
1164 """
1165 raise NotImplementedError()
1166
1167 def create_storage_pool(self, name, provider_type, **pool_config):
1168 """Create or define a storage pool.
1169
1170 :param str name: Name to give the storage pool
1171 :param str provider_type: Pool provider type
1172 :param **pool_config: key/value pool configuration pairs
1173
1174 """
1175 raise NotImplementedError()
1176
1177 def debug_log(
1178 self, no_tail=False, exclude_module=None, include_module=None,
1179 include=None, level=None, limit=0, lines=10, replay=False,
1180 exclude=None):
1181 """Get log messages for this model.
1182
1183 :param bool no_tail: Stop after returning existing log messages
1184 :param list exclude_module: Do not show log messages for these logging
1185 modules
1186 :param list include_module: Only show log messages for these logging
1187 modules
1188 :param list include: Only show log messages for these entities
1189 :param str level: Log level to show, valid options are 'TRACE',
1190 'DEBUG', 'INFO', 'WARNING', 'ERROR,
1191 :param int limit: Return this many of the most recent (possibly
1192 filtered) lines are shown
1193 :param int lines: Yield this many of the most recent lines, and keep
1194 yielding
1195 :param bool replay: Yield the entire log, and keep yielding
1196 :param list exclude: Do not show log messages for these entities
1197
1198 """
1199 raise NotImplementedError()
1200
1201 def _get_series(self, entity_url, entity):
1202 # try to get the series from the provided charm URL
1203 if entity_url.startswith('cs:'):
1204 parts = entity_url[3:].split('/')
1205 else:
1206 parts = entity_url.split('/')
1207 if parts[0].startswith('~'):
1208 parts.pop(0)
1209 if len(parts) > 1:
1210 # series was specified in the URL
1211 return parts[0]
1212 # series was not supplied at all, so use the newest
1213 # supported series according to the charm store
1214 ss = entity['Meta']['supported-series']
1215 return ss['SupportedSeries'][0]
1216
1217 async def deploy(
1218 self, entity_url, application_name=None, bind=None, budget=None,
1219 channel=None, config=None, constraints=None, force=False,
1220 num_units=1, plan=None, resources=None, series=None, storage=None,
1221 to=None, devices=None):
1222 """Deploy a new service or bundle.
1223
1224 :param str entity_url: Charm or bundle url
1225 :param str application_name: Name to give the service
1226 :param dict bind: <charm endpoint>:<network space> pairs
1227 :param dict budget: <budget name>:<limit> pairs
1228 :param str channel: Charm store channel from which to retrieve
1229 the charm or bundle, e.g. 'edge'
1230 :param dict config: Charm configuration dictionary
1231 :param constraints: Service constraints
1232 :type constraints: :class:`juju.Constraints`
1233 :param bool force: Allow charm to be deployed to a machine running
1234 an unsupported series
1235 :param int num_units: Number of units to deploy
1236 :param str plan: Plan under which to deploy charm
1237 :param dict resources: <resource name>:<file path> pairs
1238 :param str series: Series on which to deploy
1239 :param dict storage: Storage constraints TODO how do these look?
1240 :param to: Placement directive as a string. For example:
1241
1242 '23' - place on machine 23
1243 'lxd:7' - place in new lxd container on machine 7
1244 '24/lxd/3' - place in container 3 on machine 24
1245
1246 If None, a new machine is provisioned.
1247
1248
1249 TODO::
1250
1251 - support local resources
1252
1253 """
1254 if storage:
1255 storage = {
1256 k: client.Constraints(**v)
1257 for k, v in storage.items()
1258 }
1259
1260 entity_path = Path(entity_url.replace('local:', ''))
1261 bundle_path = entity_path / 'bundle.yaml'
1262 metadata_path = entity_path / 'metadata.yaml'
1263
1264 is_local = (
1265 entity_url.startswith('local:') or
1266 entity_path.is_dir() or
1267 entity_path.is_file()
1268 )
1269 if is_local:
1270 entity_id = entity_url.replace('local:', '')
1271 else:
1272 entity = await self.charmstore.entity(entity_url, channel=channel,
1273 include_stats=False)
1274 entity_id = entity['Id']
1275
1276 client_facade = client.ClientFacade.from_connection(self.connection())
1277
1278 is_bundle = ((is_local and
1279 (entity_id.endswith('.yaml') and entity_path.exists()) or
1280 bundle_path.exists()) or
1281 (not is_local and 'bundle/' in entity_id))
1282
1283 if is_bundle:
1284 handler = BundleHandler(self)
1285 await handler.fetch_plan(entity_id)
1286 await handler.execute_plan()
1287 extant_apps = {app for app in self.applications}
1288 pending_apps = set(handler.applications) - extant_apps
1289 if pending_apps:
1290 # new apps will usually be in the model by now, but if some
1291 # haven't made it yet we'll need to wait on them to be added
1292 await asyncio.gather(*[
1293 asyncio.ensure_future(
1294 self._wait_for_new('application', app_name),
1295 loop=self._connector.loop)
1296 for app_name in pending_apps
1297 ], loop=self._connector.loop)
1298 return [app for name, app in self.applications.items()
1299 if name in handler.applications]
1300 else:
1301 if not is_local:
1302 if not application_name:
1303 application_name = entity['Meta']['charm-metadata']['Name']
1304 if not series:
1305 series = self._get_series(entity_url, entity)
1306 await client_facade.AddCharm(channel, entity_id)
1307 # XXX: we're dropping local resources here, but we don't
1308 # actually support them yet anyway
1309 resources = await self._add_store_resources(application_name,
1310 entity_id,
1311 entity=entity)
1312 else:
1313 if not application_name:
1314 metadata = yaml.load(metadata_path.read_text())
1315 application_name = metadata['name']
1316 # We have a local charm dir that needs to be uploaded
1317 charm_dir = os.path.abspath(
1318 os.path.expanduser(entity_id))
1319 series = series or get_charm_series(charm_dir)
1320 if not series:
1321 raise JujuError(
1322 "Couldn't determine series for charm at {}. "
1323 "Pass a 'series' kwarg to Model.deploy().".format(
1324 charm_dir))
1325 entity_id = await self.add_local_charm_dir(charm_dir, series)
1326 return await self._deploy(
1327 charm_url=entity_id,
1328 application=application_name,
1329 series=series,
1330 config=config or {},
1331 constraints=constraints,
1332 endpoint_bindings=bind,
1333 resources=resources,
1334 storage=storage,
1335 channel=channel,
1336 num_units=num_units,
1337 placement=parse_placement(to),
1338 devices=devices,
1339 )
1340
1341 async def _add_store_resources(self, application, entity_url,
1342 overrides=None, entity=None):
1343 if not entity:
1344 # avoid extra charm store call if one was already made
1345 entity = await self.charmstore.entity(entity_url,
1346 include_stats=False)
1347 resources = [
1348 {
1349 'description': resource['Description'],
1350 'fingerprint': resource['Fingerprint'],
1351 'name': resource['Name'],
1352 'path': resource['Path'],
1353 'revision': resource['Revision'],
1354 'size': resource['Size'],
1355 'type_': resource['Type'],
1356 'origin': 'store',
1357 } for resource in entity['Meta']['resources']
1358 ]
1359
1360 if overrides:
1361 names = {r['name'] for r in resources}
1362 unknown = overrides.keys() - names
1363 if unknown:
1364 raise JujuError('Unrecognized resource{}: {}'.format(
1365 's' if len(unknown) > 1 else '',
1366 ', '.join(unknown)))
1367 for resource in resources:
1368 if resource['name'] in overrides:
1369 resource['revision'] = overrides[resource['name']]
1370
1371 if not resources:
1372 return None
1373
1374 resources_facade = client.ResourcesFacade.from_connection(
1375 self.connection())
1376 response = await resources_facade.AddPendingResources(
1377 tag.application(application),
1378 entity_url,
1379 [client.CharmResource(**resource) for resource in resources])
1380 resource_map = {resource['name']: pid
1381 for resource, pid
1382 in zip(resources, response.pending_ids)}
1383 return resource_map
1384
1385 async def _deploy(self, charm_url, application, series, config,
1386 constraints, endpoint_bindings, resources, storage,
1387 channel=None, num_units=None, placement=None,
1388 devices=None):
1389 """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
1390 """
1391 log.info('Deploying %s', charm_url)
1392
1393 # stringify all config values for API, and convert to YAML
1394 config = {k: str(v) for k, v in config.items()}
1395 config = yaml.dump({application: config},
1396 default_flow_style=False)
1397
1398 app_facade = client.ApplicationFacade.from_connection(
1399 self.connection())
1400
1401 app = client.ApplicationDeploy(
1402 charm_url=charm_url,
1403 application=application,
1404 series=series,
1405 channel=channel,
1406 config_yaml=config,
1407 constraints=parse_constraints(constraints),
1408 endpoint_bindings=endpoint_bindings,
1409 num_units=num_units,
1410 resources=resources,
1411 storage=storage,
1412 placement=placement,
1413 devices=devices,
1414 )
1415 result = await app_facade.Deploy([app])
1416 errors = [r.error.message for r in result.results if r.error]
1417 if errors:
1418 raise JujuError('\n'.join(errors))
1419 return await self._wait_for_new('application', application)
1420
1421 async def destroy(self):
1422 """Terminate all machines and resources for this model.
1423 Is already implemented in controller.py.
1424 """
1425 raise NotImplementedError()
1426
1427 async def destroy_unit(self, *unit_names):
1428 """Destroy units by name.
1429
1430 """
1431 connection = self.connection()
1432 app_facade = client.ApplicationFacade.from_connection(connection)
1433
1434 log.debug(
1435 'Destroying unit%s %s',
1436 's' if len(unit_names) == 1 else '',
1437 ' '.join(unit_names))
1438
1439 return await app_facade.DestroyUnits(list(unit_names))
1440 destroy_units = destroy_unit
1441
1442 def get_backup(self, archive_id):
1443 """Download a backup archive file.
1444
1445 :param str archive_id: The id of the archive to download
1446 :return str: Path to the archive file
1447
1448 """
1449 raise NotImplementedError()
1450
1451 def enable_ha(
1452 self, num_controllers=0, constraints=None, series=None, to=None):
1453 """Ensure sufficient controllers exist to provide redundancy.
1454
1455 :param int num_controllers: Number of controllers to make available
1456 :param constraints: Constraints to apply to the controller machines
1457 :type constraints: :class:`juju.Constraints`
1458 :param str series: Series of the controller machines
1459 :param list to: Placement directives for controller machines, e.g.::
1460
1461 '23' - machine 23
1462 'lxc:7' - new lxc container on machine 7
1463 '24/lxc/3' - lxc container 3 or machine 24
1464
1465 If None, a new machine is provisioned.
1466
1467 """
1468 raise NotImplementedError()
1469
1470 async def get_config(self):
1471 """Return the configuration settings for this model.
1472
1473 :returns: A ``dict`` mapping keys to `ConfigValue` instances,
1474 which have `source` and `value` attributes.
1475 """
1476 config_facade = client.ModelConfigFacade.from_connection(
1477 self.connection()
1478 )
1479 result = await config_facade.ModelGet()
1480 config = result.config
1481 for key, value in config.items():
1482 config[key] = ConfigValue.from_json(value)
1483 return config
1484
1485 async def get_constraints(self):
1486 """Return the machine constraints for this model.
1487
1488 :returns: A ``dict`` of constraints.
1489 """
1490 constraints = {}
1491 client_facade = client.ClientFacade.from_connection(self.connection())
1492 result = await client_facade.GetModelConstraints()
1493
1494 # GetModelConstraints returns GetConstraintsResults which has a
1495 # 'constraints' attribute. If no constraints have been set
1496 # GetConstraintsResults.constraints is None. Otherwise
1497 # GetConstraintsResults.constraints has an attribute for each possible
1498 # constraint, each of these in turn will be None if they have not been
1499 # set.
1500 if result.constraints:
1501 constraint_types = [a for a in dir(result.constraints)
1502 if a in Value._toSchema.keys()]
1503 for constraint in constraint_types:
1504 value = getattr(result.constraints, constraint)
1505 if value is not None:
1506 constraints[constraint] = getattr(result.constraints,
1507 constraint)
1508 return constraints
1509
1510 def import_ssh_key(self, identity):
1511 """Add a public SSH key from a trusted indentity source to this model.
1512
1513 :param str identity: User identity in the form <lp|gh>:<username>
1514
1515 """
1516 raise NotImplementedError()
1517 import_ssh_keys = import_ssh_key
1518
1519 async def get_machines(self):
1520 """Return list of machines in this model.
1521
1522 """
1523 return list(self.state.machines.keys())
1524
1525 def get_shares(self):
1526 """Return list of all users with access to this model.
1527
1528 """
1529 raise NotImplementedError()
1530
1531 def get_spaces(self):
1532 """Return list of all known spaces, including associated subnets.
1533
1534 """
1535 raise NotImplementedError()
1536
1537 async def get_ssh_key(self, raw_ssh=False):
1538 """Return known SSH keys for this model.
1539 :param bool raw_ssh: if True, returns the raw ssh key,
1540 else it's fingerprint
1541
1542 """
1543 key_facade = client.KeyManagerFacade.from_connection(self.connection())
1544 entity = {'tag': tag.model(self.info.uuid)}
1545 entities = client.Entities([entity])
1546 return await key_facade.ListKeys(entities, raw_ssh)
1547 get_ssh_keys = get_ssh_key
1548
1549 def get_storage(self, filesystem=False, volume=False):
1550 """Return details of storage instances.
1551
1552 :param bool filesystem: Include filesystem storage
1553 :param bool volume: Include volume storage
1554
1555 """
1556 raise NotImplementedError()
1557
1558 def get_storage_pools(self, names=None, providers=None):
1559 """Return list of storage pools.
1560
1561 :param list names: Only include pools with these names
1562 :param list providers: Only include pools for these providers
1563
1564 """
1565 raise NotImplementedError()
1566
1567 def get_subnets(self, space=None, zone=None):
1568 """Return list of known subnets.
1569
1570 :param str space: Only include subnets in this space
1571 :param str zone: Only include subnets in this zone
1572
1573 """
1574 raise NotImplementedError()
1575
1576 def remove_blocks(self):
1577 """Remove all blocks from this model.
1578
1579 """
1580 raise NotImplementedError()
1581
1582 def remove_backup(self, backup_id):
1583 """Delete a backup.
1584
1585 :param str backup_id: The id of the backup to remove
1586
1587 """
1588 raise NotImplementedError()
1589
1590 def remove_cached_images(self, arch=None, kind=None, series=None):
1591 """Remove cached OS images.
1592
1593 :param str arch: Architecture of the images to remove
1594 :param str kind: Image kind to remove, e.g. 'lxd'
1595 :param str series: Image series to remove, e.g. 'xenial'
1596
1597 """
1598 raise NotImplementedError()
1599
1600 def remove_machine(self, *machine_ids):
1601 """Remove a machine from this model.
1602
1603 :param str *machine_ids: Ids of the machines to remove
1604
1605 """
1606 raise NotImplementedError()
1607 remove_machines = remove_machine
1608
1609 async def remove_ssh_key(self, user, key):
1610 """Remove a public SSH key(s) from this model.
1611
1612 :param str key: Full ssh key
1613 :param str user: Juju user to which the key is registered
1614
1615 """
1616 key_facade = client.KeyManagerFacade.from_connection(self.connection())
1617 key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
1618 key = hashlib.md5(key).hexdigest()
1619 key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
1620 await key_facade.DeleteKeys([key], user)
1621 remove_ssh_keys = remove_ssh_key
1622
1623 def restore_backup(
1624 self, bootstrap=False, constraints=None, archive=None,
1625 backup_id=None, upload_tools=False):
1626 """Restore a backup archive to a new controller.
1627
1628 :param bool bootstrap: Bootstrap a new state machine
1629 :param constraints: Model constraints
1630 :type constraints: :class:`juju.Constraints`
1631 :param str archive: Path to backup archive to restore
1632 :param str backup_id: Id of backup to restore
1633 :param bool upload_tools: Upload tools if bootstrapping a new machine
1634
1635 """
1636 raise NotImplementedError()
1637
1638 def retry_provisioning(self):
1639 """Retry provisioning for failed machines.
1640
1641 """
1642 raise NotImplementedError()
1643
1644 def run(self, command, timeout=None):
1645 """Run command on all machines in this model.
1646
1647 :param str command: The command to run
1648 :param int timeout: Time to wait before command is considered failed
1649
1650 """
1651 raise NotImplementedError()
1652
1653 async def set_config(self, config):
1654 """Set configuration keys on this model.
1655
1656 :param dict config: Mapping of config keys to either string values or
1657 `ConfigValue` instances, as returned by `get_config`.
1658 """
1659 config_facade = client.ModelConfigFacade.from_connection(
1660 self.connection()
1661 )
1662 for key, value in config.items():
1663 if isinstance(value, ConfigValue):
1664 config[key] = value.value
1665 await config_facade.ModelSet(config)
1666
1667 async def set_constraints(self, constraints):
1668 """Set machine constraints on this model.
1669
1670 :param dict config: Mapping of model constraints
1671 """
1672 client_facade = client.ClientFacade.from_connection(self.connection())
1673 await client_facade.SetModelConstraints(
1674 application='',
1675 constraints=constraints)
1676
1677 async def get_action_output(self, action_uuid, wait=None):
1678 """Get the results of an action by ID.
1679
1680 :param str action_uuid: Id of the action
1681 :param int wait: Time in seconds to wait for action to complete.
1682 :return dict: Output from action
1683 :raises: :class:`JujuError` if invalid action_uuid
1684 """
1685 action_facade = client.ActionFacade.from_connection(
1686 self.connection()
1687 )
1688 entity = [{'tag': tag.action(action_uuid)}]
1689 # Cannot use self.wait_for_action as the action event has probably
1690 # already happened and self.wait_for_action works by processing
1691 # model deltas and checking if they match our type. If the action
1692 # has already occured then the delta has gone.
1693
1694 async def _wait_for_action_status():
1695 while True:
1696 action_output = await action_facade.Actions(entity)
1697 if action_output.results[0].status in ('completed', 'failed'):
1698 return
1699 else:
1700 await asyncio.sleep(1)
1701 await asyncio.wait_for(
1702 _wait_for_action_status(),
1703 timeout=wait)
1704 action_output = await action_facade.Actions(entity)
1705 # ActionResult.output is None if the action produced no output
1706 if action_output.results[0].output is None:
1707 output = {}
1708 else:
1709 output = action_output.results[0].output
1710 return output
1711
1712 async def get_action_status(self, uuid_or_prefix=None, name=None):
1713 """Get the status of all actions, filtered by ID, ID prefix, or name.
1714
1715 :param str uuid_or_prefix: Filter by action uuid or prefix
1716 :param str name: Filter by action name
1717
1718 """
1719 results = {}
1720 action_results = []
1721 action_facade = client.ActionFacade.from_connection(
1722 self.connection()
1723 )
1724 if name:
1725 name_results = await action_facade.FindActionsByNames([name])
1726 action_results.extend(name_results.actions[0].actions)
1727 if uuid_or_prefix:
1728 # Collect list of actions matching uuid or prefix
1729 matching_actions = await action_facade.FindActionTagsByPrefix(
1730 [uuid_or_prefix])
1731 entities = []
1732 for actions in matching_actions.matches.values():
1733 entities = [{'tag': a.tag} for a in actions]
1734 # Get action results matching action tags
1735 uuid_results = await action_facade.Actions(entities)
1736 action_results.extend(uuid_results.results)
1737 for a in action_results:
1738 results[tag.untag('action-', a.action.tag)] = a.status
1739 return results
1740
1741 def get_budget(self, budget_name):
1742 """Get budget usage info.
1743
1744 :param str budget_name: Name of budget
1745
1746 """
1747 raise NotImplementedError()
1748
1749 async def get_status(self, filters=None, utc=False):
1750 """Return the status of the model.
1751
1752 :param str filters: Optional list of applications, units, or machines
1753 to include, which can use wildcards ('*').
1754 :param bool utc: Display time as UTC in RFC3339 format
1755
1756 """
1757 client_facade = client.ClientFacade.from_connection(self.connection())
1758 return await client_facade.FullStatus(filters)
1759
1760 def sync_tools(
1761 self, all_=False, destination=None, dry_run=False, public=False,
1762 source=None, stream=None, version=None):
1763 """Copy Juju tools into this model.
1764
1765 :param bool all_: Copy all versions, not just the latest
1766 :param str destination: Path to local destination directory
1767 :param bool dry_run: Don't do the actual copy
1768 :param bool public: Tools are for a public cloud, so generate mirrors
1769 information
1770 :param str source: Path to local source directory
1771 :param str stream: Simplestreams stream for which to sync metadata
1772 :param str version: Copy a specific major.minor version
1773
1774 """
1775 raise NotImplementedError()
1776
1777 def unblock(self, *commands):
1778 """Unblock an operation that would alter this model.
1779
1780 :param str *commands: The commands to unblock. Valid values are
1781 'all-changes', 'destroy-model', 'remove-object'
1782
1783 """
1784 raise NotImplementedError()
1785
1786 def unset_config(self, *keys):
1787 """Unset configuration on this model.
1788
1789 :param str *keys: The keys to unset
1790
1791 """
1792 raise NotImplementedError()
1793
1794 def upgrade_gui(self):
1795 """Upgrade the Juju GUI for this model.
1796
1797 """
1798 raise NotImplementedError()
1799
1800 def upgrade_juju(
1801 self, dry_run=False, reset_previous_upgrade=False,
1802 upload_tools=False, version=None):
1803 """Upgrade Juju on all machines in a model.
1804
1805 :param bool dry_run: Don't do the actual upgrade
1806 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1807 upgrade status
1808 :param bool upload_tools: Upload local version of tools
1809 :param str version: Upgrade to a specific version
1810
1811 """
1812 raise NotImplementedError()
1813
1814 def upload_backup(self, archive_path):
1815 """Store a backup archive remotely in Juju.
1816
1817 :param str archive_path: Path to local archive
1818
1819 """
1820 raise NotImplementedError()
1821
1822 @property
1823 def charmstore(self):
1824 return self._charmstore
1825
1826 async def get_metrics(self, *tags):
1827 """Retrieve metrics.
1828
1829 :param str *tags: Tags of entities from which to retrieve metrics.
1830 No tags retrieves the metrics of all units in the model.
1831 :return: Dictionary of unit_name:metrics
1832
1833 """
1834 log.debug("Retrieving metrics for %s",
1835 ', '.join(tags) if tags else "all units")
1836
1837 metrics_facade = client.MetricsDebugFacade.from_connection(
1838 self.connection())
1839
1840 entities = [client.Entity(tag) for tag in tags]
1841 metrics_result = await metrics_facade.GetMetrics(entities)
1842
1843 metrics = collections.defaultdict(list)
1844
1845 for entity_metrics in metrics_result.results:
1846 error = entity_metrics.error
1847 if error:
1848 if "is not a valid tag" in error:
1849 raise ValueError(error.message)
1850 else:
1851 raise Exception(error.message)
1852
1853 for metric in entity_metrics.metrics:
1854 metrics[metric.unit].append(vars(metric))
1855
1856 return metrics
1857
1858
1859 def get_charm_series(path):
1860 """Inspects the charm directory at ``path`` and returns a default
1861 series from its metadata.yaml (the first item in the 'series' list).
1862
1863 Returns None if no series can be determined.
1864
1865 """
1866 md = Path(path) / "metadata.yaml"
1867 if not md.exists():
1868 return None
1869 data = yaml.load(md.open())
1870 series = data.get('series')
1871 return series[0] if series else None
1872
1873
1874 class BundleHandler:
1875 """
1876 Handle bundles by using the API to translate bundle YAML into a plan of
1877 steps and then dispatching each of those using the API.
1878 """
1879 def __init__(self, model):
1880 self.model = model
1881 self.charmstore = model.charmstore
1882 self.plan = []
1883 self.references = {}
1884 self._units_by_app = {}
1885 for unit_name, unit in model.units.items():
1886 app_units = self._units_by_app.setdefault(unit.application, [])
1887 app_units.append(unit_name)
1888 self.bundle_facade = client.BundleFacade.from_connection(
1889 model.connection())
1890 self.client_facade = client.ClientFacade.from_connection(
1891 model.connection())
1892 self.app_facade = client.ApplicationFacade.from_connection(
1893 model.connection())
1894 self.ann_facade = client.AnnotationsFacade.from_connection(
1895 model.connection())
1896
1897 async def _handle_local_charms(self, bundle):
1898 """Search for references to local charms (i.e. filesystem paths)
1899 in the bundle. Upload the local charms to the model, and replace
1900 the filesystem paths with appropriate 'local:' paths in the bundle.
1901
1902 Return the modified bundle.
1903
1904 :param dict bundle: Bundle dictionary
1905 :return: Modified bundle dictionary
1906
1907 """
1908 apps, args = [], []
1909
1910 default_series = bundle.get('series')
1911 apps_dict = bundle.get('applications', bundle.get('services', {}))
1912 for app_name in self.applications:
1913 app_dict = apps_dict[app_name]
1914 charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
1915 if not os.path.isdir(charm_dir):
1916 continue
1917 series = (
1918 app_dict.get('series') or
1919 default_series or
1920 get_charm_series(charm_dir)
1921 )
1922 if not series:
1923 raise JujuError(
1924 "Couldn't determine series for charm at {}. "
1925 "Add a 'series' key to the bundle.".format(charm_dir))
1926
1927 # Keep track of what we need to update. We keep a list of apps
1928 # that need to be updated, and a corresponding list of args
1929 # needed to update those apps.
1930 apps.append(app_name)
1931 args.append((charm_dir, series))
1932
1933 if apps:
1934 # If we have apps to update, spawn all the coroutines concurrently
1935 # and wait for them to finish.
1936 charm_urls = await asyncio.gather(*[
1937 self.model.add_local_charm_dir(*params)
1938 for params in args
1939 ], loop=self.model.loop)
1940 # Update the 'charm:' entry for each app with the new 'local:' url.
1941 for app_name, charm_url in zip(apps, charm_urls):
1942 apps_dict[app_name]['charm'] = charm_url
1943
1944 return bundle
1945
1946 async def fetch_plan(self, entity_id):
1947 is_store_url = entity_id.startswith('cs:')
1948
1949 if not is_store_url and os.path.isfile(entity_id):
1950 bundle_yaml = Path(entity_id).read_text()
1951 elif not is_store_url and os.path.isdir(entity_id):
1952 bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
1953 else:
1954 bundle_yaml = await self.charmstore.files(entity_id,
1955 filename='bundle.yaml',
1956 read_file=True)
1957 self.bundle = yaml.safe_load(bundle_yaml)
1958 self.bundle = await self._handle_local_charms(self.bundle)
1959
1960 self.plan = await self.bundle_facade.GetChanges(
1961 yaml.dump(self.bundle))
1962
1963 if self.plan.errors:
1964 raise JujuError(self.plan.errors)
1965
1966 async def execute_plan(self):
1967 for step in self.plan.changes:
1968 method = getattr(self, step.method)
1969 result = await method(*step.args)
1970 self.references[step.id_] = result
1971
1972 @property
1973 def applications(self):
1974 apps_dict = self.bundle.get('applications',
1975 self.bundle.get('services', {}))
1976 return list(apps_dict.keys())
1977
1978 def resolve(self, reference):
1979 if reference and reference.startswith('$'):
1980 reference = self.references[reference[1:]]
1981 return reference
1982
1983 async def addCharm(self, charm, series):
1984 """
1985 :param charm string:
1986 Charm holds the URL of the charm to be added.
1987
1988 :param series string:
1989 Series holds the series of the charm to be added
1990 if the charm default is not sufficient.
1991 """
1992 # We don't add local charms because they've already been added
1993 # by self._handle_local_charms
1994 if charm.startswith('local:'):
1995 return charm
1996
1997 entity_id = await self.charmstore.entityId(charm)
1998 log.debug('Adding %s', entity_id)
1999 await self.client_facade.AddCharm(None, entity_id)
2000 return entity_id
2001
2002 async def addMachines(self, params=None):
2003 """
2004 :param params dict:
2005 Dictionary specifying the machine to add. All keys are optional.
2006 Keys include:
2007
2008 series: string specifying the machine OS series.
2009
2010 constraints: string holding machine constraints, if any. We'll
2011 parse this into the json friendly dict that the juju api
2012 expects.
2013
2014 container_type: string holding the type of the container (for
2015 instance ""lxd" or kvm"). It is not specified for top level
2016 machines.
2017
2018 parent_id: string holding a placeholder pointing to another
2019 machine change or to a unit change. This value is only
2020 specified in the case this machine is a container, in
2021 which case also ContainerType is set.
2022
2023 """
2024 params = params or {}
2025
2026 # Normalize keys
2027 params = {normalize_key(k): params[k] for k in params.keys()}
2028
2029 # Fix up values, as necessary.
2030 if 'parent_id' in params:
2031 if params['parent_id'].startswith('$addUnit'):
2032 unit = self.resolve(params['parent_id'])[0]
2033 params['parent_id'] = unit.machine.entity_id
2034 else:
2035 params['parent_id'] = self.resolve(params['parent_id'])
2036
2037 params['constraints'] = parse_constraints(
2038 params.get('constraints'))
2039 params['jobs'] = params.get('jobs', ['JobHostUnits'])
2040
2041 if params.get('container_type') == 'lxc':
2042 log.warning('Juju 2.0 does not support lxc containers. '
2043 'Converting containers to lxd.')
2044 params['container_type'] = 'lxd'
2045
2046 # Submit the request.
2047 params = client.AddMachineParams(**params)
2048 results = await self.client_facade.AddMachines([params])
2049 error = results.machines[0].error
2050 if error:
2051 raise ValueError("Error adding machine: %s" % error.message)
2052 machine = results.machines[0].machine
2053 log.debug('Added new machine %s', machine)
2054 return machine
2055
2056 async def addRelation(self, endpoint1, endpoint2):
2057 """
2058 :param endpoint1 string:
2059 :param endpoint2 string:
2060 Endpoint1 and Endpoint2 hold relation endpoints in the
2061 "application:interface" form, where the application is always a
2062 placeholder pointing to an application change, and the interface is
2063 optional. Examples are "$deploy-42:web" or just "$deploy-42".
2064 """
2065 endpoints = [endpoint1, endpoint2]
2066 # resolve indirect references
2067 for i in range(len(endpoints)):
2068 parts = endpoints[i].split(':')
2069 parts[0] = self.resolve(parts[0])
2070 endpoints[i] = ':'.join(parts)
2071
2072 log.info('Relating %s <-> %s', *endpoints)
2073 return await self.model.add_relation(*endpoints)
2074
2075 async def deploy(self, charm, series, application, options, constraints,
2076 storage, endpoint_bindings, *args):
2077 """
2078 :param charm string:
2079 Charm holds the URL of the charm to be used to deploy this
2080 application.
2081
2082 :param series string:
2083 Series holds the series of the application to be deployed
2084 if the charm default is not sufficient.
2085
2086 :param application string:
2087 Application holds the application name.
2088
2089 :param options map[string]interface{}:
2090 Options holds application options.
2091
2092 :param constraints string:
2093 Constraints holds the optional application constraints.
2094
2095 :param storage map[string]string:
2096 Storage holds the optional storage constraints.
2097
2098 :param endpoint_bindings map[string]string:
2099 EndpointBindings holds the optional endpoint bindings
2100
2101 :param devices map[string]string:
2102 Devices holds the optional devices constraints.
2103 (Only given on Juju 2.5+)
2104
2105 :param resources map[string]int:
2106 Resources identifies the revision to use for each resource
2107 of the application's charm.
2108
2109 :param num_units int:
2110 NumUnits holds the number of units required. For IAAS models, this
2111 will be 0 and separate AddUnitChanges will be used. For Kubernetes
2112 models, this will be used to scale the application.
2113 (Only given on Juju 2.5+)
2114 """
2115 # resolve indirect references
2116 charm = self.resolve(charm)
2117
2118 if len(args) == 1:
2119 # Juju 2.4 and below only sends the resources
2120 resources = args[0]
2121 devices, num_units = None, None
2122 else:
2123 # Juju 2.5+ sends devices before resources, as well as num_units
2124 # There might be placement but we need to ignore that.
2125 devices, resources, num_units = args[:3]
2126
2127 if not charm.startswith('local:'):
2128 resources = await self.model._add_store_resources(
2129 application, charm, overrides=resources)
2130 await self.model._deploy(
2131 charm_url=charm,
2132 application=application,
2133 series=series,
2134 config=options,
2135 constraints=constraints,
2136 endpoint_bindings=endpoint_bindings,
2137 resources=resources,
2138 storage=storage,
2139 devices=devices,
2140 num_units=num_units,
2141 )
2142 return application
2143
2144 async def addUnit(self, application, to):
2145 """
2146 :param application string:
2147 Application holds the application placeholder name for which a unit
2148 is added.
2149
2150 :param to string:
2151 To holds the optional location where to add the unit, as a
2152 placeholder pointing to another unit change or to a machine change.
2153 """
2154 application = self.resolve(application)
2155 placement = self.resolve(to)
2156 if self._units_by_app.get(application):
2157 # enough units for this application already exist;
2158 # claim one, and carry on
2159 # NB: this should probably honor placement, but the juju client
2160 # doesn't, so we're not bothering, either
2161 unit_name = self._units_by_app[application].pop()
2162 log.debug('Reusing unit %s for %s', unit_name, application)
2163 return self.model.units[unit_name]
2164
2165 log.debug('Adding new unit for %s%s', application,
2166 ' to %s' % placement if placement else '')
2167 return await self.model.applications[application].add_unit(
2168 count=1,
2169 to=placement,
2170 )
2171
2172 async def scale(self, application, scale):
2173 """
2174 Handle a change of scale to a k8s application.
2175
2176 :param string application:
2177 Application holds the application placeholder name for which a unit
2178 is added.
2179
2180 :param int scale:
2181 New scale value to use.
2182 """
2183 application = self.resolve(application)
2184 return await self.model.applications[application].scale(scale=scale)
2185
2186 async def expose(self, application):
2187 """
2188 :param application string:
2189 Application holds the placeholder name of the application that must
2190 be exposed.
2191 """
2192 application = self.resolve(application)
2193 log.info('Exposing %s', application)
2194 return await self.model.applications[application].expose()
2195
2196 async def setAnnotations(self, id_, entity_type, annotations):
2197 """
2198 :param id_ string:
2199 Id is the placeholder for the application or machine change
2200 corresponding to the entity to be annotated.
2201
2202 :param entity_type EntityType:
2203 EntityType holds the type of the entity, "application" or
2204 "machine".
2205
2206 :param annotations map[string]string:
2207 Annotations holds the annotations as key/value pairs.
2208 """
2209 entity_id = self.resolve(id_)
2210 try:
2211 entity = self.model.state.get_entity(entity_type, entity_id)
2212 except KeyError:
2213 entity = await self.model._wait_for_new(entity_type, entity_id)
2214 return await entity.set_annotations(annotations)
2215
2216
2217 class CharmStore:
2218 """
2219 Async wrapper around theblues.charmstore.CharmStore
2220 """
2221 def __init__(self, loop, cs_timeout=20):
2222 self.loop = loop
2223 self._cs = theblues.charmstore.CharmStore(timeout=cs_timeout)
2224
2225 def __getattr__(self, name):
2226 """
2227 Wrap method calls in coroutines that use run_in_executor to make them
2228 async.
2229 """
2230 attr = getattr(self._cs, name)
2231 if not callable(attr):
2232 wrapper = partial(getattr, self._cs, name)
2233 setattr(self, name, wrapper)
2234 else:
2235 async def coro(*args, **kwargs):
2236 method = partial(attr, *args, **kwargs)
2237 for attempt in range(1, 4):
2238 try:
2239 return await self.loop.run_in_executor(None, method)
2240 except theblues.errors.ServerError:
2241 if attempt == 3:
2242 raise
2243 await asyncio.sleep(1, loop=self.loop)
2244 setattr(self, name, coro)
2245 wrapper = coro
2246 return wrapper
2247
2248
2249 class CharmArchiveGenerator:
2250 """
2251 Create a Zip archive of a local charm directory for upload to a controller.
2252
2253 This is used automatically by
2254 `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
2255 """
2256 def __init__(self, path):
2257 self.path = os.path.abspath(os.path.expanduser(path))
2258
2259 def make_archive(self, path):
2260 """Create archive of directory and write to ``path``.
2261
2262 :param path: Path to archive
2263
2264 Ignored::
2265
2266 * build/* - This is used for packing the charm itself and any
2267 similar tasks.
2268 * */.* - Hidden files are all ignored for now. This will most
2269 likely be changed into a specific ignore list
2270 (.bzr, etc)
2271
2272 """
2273 zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
2274 for dirpath, dirnames, filenames in os.walk(self.path):
2275 relative_path = dirpath[len(self.path) + 1:]
2276 if relative_path and not self._ignore(relative_path):
2277 zf.write(dirpath, relative_path)
2278 for name in filenames:
2279 archive_name = os.path.join(relative_path, name)
2280 if not self._ignore(archive_name):
2281 real_path = os.path.join(dirpath, name)
2282 self._check_type(real_path)
2283 if os.path.islink(real_path):
2284 self._check_link(real_path)
2285 self._write_symlink(
2286 zf, os.readlink(real_path), archive_name)
2287 else:
2288 zf.write(real_path, archive_name)
2289 zf.close()
2290 return path
2291
2292 def _check_type(self, path):
2293 """Check the path
2294 """
2295 s = os.stat(path)
2296 if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
2297 return path
2298 raise ValueError("Invalid Charm at % %s" % (
2299 path, "Invalid file type for a charm"))
2300
2301 def _check_link(self, path):
2302 link_path = os.readlink(path)
2303 if link_path[0] == "/":
2304 raise ValueError(
2305 "Invalid Charm at %s: %s" % (
2306 path, "Absolute links are invalid"))
2307 path_dir = os.path.dirname(path)
2308 link_path = os.path.join(path_dir, link_path)
2309 if not link_path.startswith(os.path.abspath(self.path)):
2310 raise ValueError(
2311 "Invalid charm at %s %s" % (
2312 path, "Only internal symlinks are allowed"))
2313
2314 def _write_symlink(self, zf, link_target, link_path):
2315 """Package symlinks with appropriate zipfile metadata."""
2316 info = zipfile.ZipInfo()
2317 info.filename = link_path
2318 info.create_system = 3
2319 # Magic code for symlinks / py2/3 compat
2320 # 27166663808 = (stat.S_IFLNK | 0755) << 16
2321 info.external_attr = 2716663808
2322 zf.writestr(info, link_target)
2323
2324 def _ignore(self, path):
2325 if path == "build" or path.startswith("build/"):
2326 return True
2327 if path.startswith('.'):
2328 return True