blob: bd7d43077d9e238cf40d5170a99850a732c34b23 [file] [log] [blame]
Adam Israeldcdf82b2017-08-15 15:26:43 -04001import asyncio
2import base64
3import collections
4import hashlib
5import json
6import logging
7import os
8import re
9import stat
10import tempfile
11import weakref
12import zipfile
13from concurrent.futures import CancelledError
14from functools import partial
15from pathlib import Path
16
Adam Israeldcdf82b2017-08-15 15:26:43 -040017import theblues.charmstore
18import theblues.errors
Adam Israelb8a82812019-03-27 14:50:11 -040019import websockets
20import yaml
Adam Israeldcdf82b2017-08-15 15:26:43 -040021
22from . import tag, utils
Adam Israelb8a82812019-03-27 14:50:11 -040023from .client import client, connector
Adam Israeldcdf82b2017-08-15 15:26:43 -040024from .client.client import ConfigValue
Adam Israelb8a82812019-03-27 14:50:11 -040025from .client.client import Value
26from .constraints import parse as parse_constraints
27from .constraints import normalize_key
28from .delta import get_entity_class, get_entity_delta
29from .errors import JujuAPIError, JujuError
Adam Israeldcdf82b2017-08-15 15:26:43 -040030from .exceptions import DeadEntityException
Adam Israeldcdf82b2017-08-15 15:26:43 -040031from .placement import parse as parse_placement
Adam Israelb8a82812019-03-27 14:50:11 -040032from . import provisioner
33
Adam Israeldcdf82b2017-08-15 15:26:43 -040034
35log = logging.getLogger(__name__)
36
37
Adam Israelb8a82812019-03-27 14:50:11 -040038class _Observer:
Adam Israeldcdf82b2017-08-15 15:26:43 -040039 """Wrapper around an observer callable.
40
41 This wrapper allows filter criteria to be associated with the
42 callable so that it's only called for changes that meet the criteria.
43
44 """
45 def __init__(self, callable_, entity_type, action, entity_id, predicate):
46 self.callable_ = callable_
47 self.entity_type = entity_type
48 self.action = action
49 self.entity_id = entity_id
50 self.predicate = predicate
51 if self.entity_id:
52 self.entity_id = str(self.entity_id)
53 if not self.entity_id.startswith('^'):
54 self.entity_id = '^' + self.entity_id
55 if not self.entity_id.endswith('$'):
56 self.entity_id += '$'
57
58 async def __call__(self, delta, old, new, model):
59 await self.callable_(delta, old, new, model)
60
61 def cares_about(self, delta):
62 """Return True if this observer "cares about" (i.e. wants to be
63 called) for a this delta.
64
65 """
66 if (self.entity_id and delta.get_id() and
67 not re.match(self.entity_id, str(delta.get_id()))):
68 return False
69
70 if self.entity_type and self.entity_type != delta.entity:
71 return False
72
73 if self.action and self.action != delta.type:
74 return False
75
76 if self.predicate and not self.predicate(delta):
77 return False
78
79 return True
80
81
Adam Israelb8a82812019-03-27 14:50:11 -040082class ModelObserver:
Adam Israeldcdf82b2017-08-15 15:26:43 -040083 """
84 Base class for creating observers that react to changes in a model.
85 """
86 async def __call__(self, delta, old, new, model):
87 handler_name = 'on_{}_{}'.format(delta.entity, delta.type)
88 method = getattr(self, handler_name, self.on_change)
89 await method(delta, old, new, model)
90
91 async def on_change(self, delta, old, new, model):
92 """Generic model-change handler.
93
94 This should be overridden in a subclass.
95
96 :param delta: :class:`juju.client.overrides.Delta`
97 :param old: :class:`juju.model.ModelEntity`
98 :param new: :class:`juju.model.ModelEntity`
99 :param model: :class:`juju.model.Model`
100
101 """
102 pass
103
104
Adam Israelb8a82812019-03-27 14:50:11 -0400105class ModelState:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400106 """Holds the state of the model, including the delta history of all
107 entities in the model.
108
109 """
110 def __init__(self, model):
111 self.model = model
112 self.state = dict()
113
114 def _live_entity_map(self, entity_type):
115 """Return an id:Entity map of all the living entities of
116 type ``entity_type``.
117
118 """
119 return {
120 entity_id: self.get_entity(entity_type, entity_id)
121 for entity_id, history in self.state.get(entity_type, {}).items()
122 if history[-1] is not None
123 }
124
125 @property
126 def applications(self):
127 """Return a map of application-name:Application for all applications
128 currently in the model.
129
130 """
131 return self._live_entity_map('application')
132
133 @property
134 def machines(self):
135 """Return a map of machine-id:Machine for all machines currently in
136 the model.
137
138 """
139 return self._live_entity_map('machine')
140
141 @property
142 def units(self):
143 """Return a map of unit-id:Unit for all units currently in
144 the model.
145
146 """
147 return self._live_entity_map('unit')
148
Adam Israelb8a82812019-03-27 14:50:11 -0400149 @property
150 def relations(self):
151 """Return a map of relation-id:Relation for all relations currently in
152 the model.
153
154 """
155 return self._live_entity_map('relation')
156
Adam Israeldcdf82b2017-08-15 15:26:43 -0400157 def entity_history(self, entity_type, entity_id):
158 """Return the history deque for an entity.
159
160 """
161 return self.state[entity_type][entity_id]
162
163 def entity_data(self, entity_type, entity_id, history_index):
164 """Return the data dict for an entity at a specific index of its
165 history.
166
167 """
168 return self.entity_history(entity_type, entity_id)[history_index]
169
170 def apply_delta(self, delta):
171 """Apply delta to our state and return a copy of the
172 affected object as it was before and after the update, e.g.:
173
174 old_obj, new_obj = self.apply_delta(delta)
175
176 old_obj may be None if the delta is for the creation of a new object,
177 e.g. a new application or unit is deployed.
178
179 new_obj will never be None, but may be dead (new_obj.dead == True)
180 if the object was deleted as a result of the delta being applied.
181
182 """
183 history = (
184 self.state
185 .setdefault(delta.entity, {})
186 .setdefault(delta.get_id(), collections.deque())
187 )
188
189 history.append(delta.data)
190 if delta.type == 'remove':
191 history.append(None)
192
193 entity = self.get_entity(delta.entity, delta.get_id())
194 return entity.previous(), entity
195
196 def get_entity(
197 self, entity_type, entity_id, history_index=-1, connected=True):
198 """Return an object instance for the given entity_type and id.
199
200 By default the object state matches the most recent state from
201 Juju. To get an instance of the object in an older state, pass
202 history_index, an index into the history deque for the entity.
203
204 """
205
206 if history_index < 0 and history_index != -1:
207 history_index += len(self.entity_history(entity_type, entity_id))
208 if history_index < 0:
209 return None
210
211 try:
212 self.entity_data(entity_type, entity_id, history_index)
213 except IndexError:
214 return None
215
216 entity_class = get_entity_class(entity_type)
217 return entity_class(
218 entity_id, self.model, history_index=history_index,
219 connected=connected)
220
221
Adam Israelb8a82812019-03-27 14:50:11 -0400222class ModelEntity:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400223 """An object in the Model tree"""
224
225 def __init__(self, entity_id, model, history_index=-1, connected=True):
226 """Initialize a new entity
227
228 :param entity_id str: The unique id of the object in the model
229 :param model: The model instance in whose object tree this
230 entity resides
231 :history_index int: The index of this object's state in the model's
232 history deque for this entity
233 :connected bool: Flag indicating whether this object gets live updates
234 from the model.
235
236 """
237 self.entity_id = entity_id
238 self.model = model
239 self._history_index = history_index
240 self.connected = connected
Adam Israelb8a82812019-03-27 14:50:11 -0400241 self.connection = model.connection()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400242
243 def __repr__(self):
244 return '<{} entity_id="{}">'.format(type(self).__name__,
245 self.entity_id)
246
247 def __getattr__(self, name):
248 """Fetch object attributes from the underlying data dict held in the
249 model.
250
251 """
252 try:
253 return self.safe_data[name]
254 except KeyError:
255 name = name.replace('_', '-')
256 if name in self.safe_data:
257 return self.safe_data[name]
258 else:
259 raise
260
261 def __bool__(self):
262 return bool(self.data)
263
264 def on_change(self, callable_):
265 """Add a change observer to this entity.
266
267 """
268 self.model.add_observer(
269 callable_, self.entity_type, 'change', self.entity_id)
270
271 def on_remove(self, callable_):
272 """Add a remove observer to this entity.
273
274 """
275 self.model.add_observer(
276 callable_, self.entity_type, 'remove', self.entity_id)
277
278 @property
279 def entity_type(self):
280 """A string identifying the entity type of this object, e.g.
281 'application' or 'unit', etc.
282
283 """
284 return self.__class__.__name__.lower()
285
286 @property
287 def current(self):
288 """Return True if this object represents the current state of the
289 entity in the underlying model.
290
291 This will be True except when the object represents an entity at a
292 non-latest state in history, e.g. if the object was obtained by calling
293 .previous() on another object.
294
295 """
296 return self._history_index == -1
297
298 @property
299 def dead(self):
300 """Returns True if this entity no longer exists in the underlying
301 model.
302
303 """
304 return (
305 self.data is None or
306 self.model.state.entity_data(
307 self.entity_type, self.entity_id, -1) is None
308 )
309
310 @property
311 def alive(self):
312 """Returns True if this entity still exists in the underlying
313 model.
314
315 """
316 return not self.dead
317
318 @property
319 def data(self):
320 """The data dictionary for this entity.
321
322 """
323 return self.model.state.entity_data(
324 self.entity_type, self.entity_id, self._history_index)
325
326 @property
327 def safe_data(self):
328 """The data dictionary for this entity.
329
330 If this `ModelEntity` points to the dead state, it will
331 raise `DeadEntityException`.
332
333 """
334 if self.data is None:
335 raise DeadEntityException(
336 "Entity {}:{} is dead - its attributes can no longer be "
337 "accessed. Use the .previous() method on this object to get "
338 "a copy of the object at its previous state.".format(
339 self.entity_type, self.entity_id))
340 return self.data
341
342 def previous(self):
343 """Return a copy of this object as was at its previous state in
344 history.
345
346 Returns None if this object is new (and therefore has no history).
347
348 The returned object is always "disconnected", i.e. does not receive
349 live updates.
350
351 """
352 return self.model.state.get_entity(
353 self.entity_type, self.entity_id, self._history_index - 1,
354 connected=False)
355
356 def next(self):
357 """Return a copy of this object at its next state in
358 history.
359
360 Returns None if this object is already the latest.
361
362 The returned object is "disconnected", i.e. does not receive
363 live updates, unless it is current (latest).
364
365 """
366 if self._history_index == -1:
367 return None
368
369 new_index = self._history_index + 1
370 connected = (
371 new_index == len(self.model.state.entity_history(
372 self.entity_type, self.entity_id)) - 1
373 )
374 return self.model.state.get_entity(
375 self.entity_type, self.entity_id, self._history_index - 1,
376 connected=connected)
377
378 def latest(self):
379 """Return a copy of this object at its current state in the model.
380
381 Returns self if this object is already the latest.
382
383 The returned object is always "connected", i.e. receives
384 live updates from the model.
385
386 """
387 if self._history_index == -1:
388 return self
389
390 return self.model.state.get_entity(self.entity_type, self.entity_id)
391
392
Adam Israelb8a82812019-03-27 14:50:11 -0400393class Model:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400394 """
395 The main API for interacting with a Juju model.
396 """
Adam Israelb8a82812019-03-27 14:50:11 -0400397 def __init__(
398 self,
399 loop=None,
400 max_frame_size=None,
401 bakery_client=None,
402 jujudata=None,
403 ):
404 """Instantiate a new Model.
405
406 The connect method will need to be called before this
407 object can be used for anything interesting.
408
409 If jujudata is None, jujudata.FileJujuData will be used.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400410
411 :param loop: an asyncio event loop
412 :param max_frame_size: See
413 `juju.client.connection.Connection.MAX_FRAME_SIZE`
Adam Israelb8a82812019-03-27 14:50:11 -0400414 :param bakery_client httpbakery.Client: The bakery client to use
415 for macaroon authorization.
416 :param jujudata JujuData: The source for current controller information
Adam Israeldcdf82b2017-08-15 15:26:43 -0400417 """
Adam Israelb8a82812019-03-27 14:50:11 -0400418 self._connector = connector.Connector(
419 loop=loop,
420 max_frame_size=max_frame_size,
421 bakery_client=bakery_client,
422 jujudata=jujudata,
423 )
424 self._observers = weakref.WeakValueDictionary()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400425 self.state = ModelState(self)
Adam Israelb8a82812019-03-27 14:50:11 -0400426 self._info = None
427 self._watch_stopping = asyncio.Event(loop=self._connector.loop)
428 self._watch_stopped = asyncio.Event(loop=self._connector.loop)
429 self._watch_received = asyncio.Event(loop=self._connector.loop)
430 self._watch_stopped.set()
431 self._charmstore = CharmStore(self._connector.loop)
432
433 def is_connected(self):
434 """Reports whether the Model is currently connected."""
435 return self._connector.is_connected()
436
437 @property
438 def loop(self):
439 return self._connector.loop
440
441 def connection(self):
442 """Return the current Connection object. It raises an exception
443 if the Model is disconnected"""
444 return self._connector.connection()
445
446 async def get_controller(self):
447 """Return a Controller instance for the currently connected model.
448 :return Controller:
449 """
450 from juju.controller import Controller
451 controller = Controller(jujudata=self._connector.jujudata)
452 kwargs = self.connection().connect_params()
453 kwargs.pop('uuid')
454 await controller._connect_direct(**kwargs)
455 return controller
Adam Israeldcdf82b2017-08-15 15:26:43 -0400456
457 async def __aenter__(self):
Adam Israelb8a82812019-03-27 14:50:11 -0400458 await self.connect()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400459 return self
460
461 async def __aexit__(self, exc_type, exc, tb):
462 await self.disconnect()
463
Adam Israelb8a82812019-03-27 14:50:11 -0400464 async def connect(self, *args, **kwargs):
465 """Connect to a juju model.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400466
Adam Israelb8a82812019-03-27 14:50:11 -0400467 This supports two calling conventions:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400468
Adam Israelb8a82812019-03-27 14:50:11 -0400469 The model and (optionally) authentication information can be taken
470 from the data files created by the Juju CLI. This convention will
471 be used if a ``model_name`` is specified, or if the ``endpoint``
472 and ``uuid`` are not.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400473
Adam Israelb8a82812019-03-27 14:50:11 -0400474 Otherwise, all of the ``endpoint``, ``uuid``, and authentication
475 information (``username`` and ``password``, or ``bakery_client`` and/or
476 ``macaroons``) are required.
477
478 If a single positional argument is given, it will be assumed to be
479 the ``model_name``. Otherwise, the first positional argument, if any,
480 must be the ``endpoint``.
481
482 Available parameters are:
483
484 :param model_name: Format [controller:][user/]model
485 :param str endpoint: The hostname:port of the controller to connect to.
486 :param str uuid: The model UUID to connect to.
487 :param str username: The username for controller-local users (or None
488 to use macaroon-based login.)
489 :param str password: The password for controller-local users.
490 :param str cacert: The CA certificate of the controller
491 (PEM formatted).
492 :param httpbakery.Client bakery_client: The macaroon bakery client to
493 to use when performing macaroon-based login. Macaroon tokens
494 acquired when logging will be saved to bakery_client.cookies.
495 If this is None, a default bakery_client will be used.
496 :param list macaroons: List of macaroons to load into the
497 ``bakery_client``.
498 :param asyncio.BaseEventLoop loop: The event loop to use for async
499 operations.
500 :param int max_frame_size: The maximum websocket frame size to allow.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400501 """
Adam Israelb8a82812019-03-27 14:50:11 -0400502 await self.disconnect()
503 if 'endpoint' not in kwargs and len(args) < 2:
504 if args and 'model_name' in kwargs:
505 raise TypeError('connect() got multiple values for model_name')
506 elif args:
507 model_name = args[0]
508 else:
509 model_name = kwargs.pop('model_name', None)
510 await self._connector.connect_model(model_name, **kwargs)
511 else:
512 if 'model_name' in kwargs:
513 raise TypeError('connect() got values for both '
514 'model_name and endpoint')
515 if args and 'endpoint' in kwargs:
516 raise TypeError('connect() got multiple values for endpoint')
517 if len(args) < 2 and 'uuid' not in kwargs:
518 raise TypeError('connect() missing value for uuid')
519 has_userpass = (len(args) >= 4 or
520 {'username', 'password'}.issubset(kwargs))
521 has_macaroons = (len(args) >= 6 or not
522 {'bakery_client', 'macaroons'}.isdisjoint(kwargs))
523 if not (has_userpass or has_macaroons):
524 raise TypeError('connect() missing auth params')
525 arg_names = [
526 'endpoint',
527 'uuid',
528 'username',
529 'password',
530 'cacert',
531 'bakery_client',
532 'macaroons',
533 'loop',
534 'max_frame_size',
535 ]
536 for i, arg in enumerate(args):
537 kwargs[arg_names[i]] = arg
538 if not {'endpoint', 'uuid'}.issubset(kwargs):
539 raise ValueError('endpoint and uuid are required '
540 'if model_name not given')
541 if not ({'username', 'password'}.issubset(kwargs) or
542 {'bakery_client', 'macaroons'}.intersection(kwargs)):
543 raise ValueError('Authentication parameters are required '
544 'if model_name not given')
545 await self._connector.connect(**kwargs)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400546 await self._after_connect()
547
548 async def connect_model(self, model_name):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400549 """
Adam Israelb8a82812019-03-27 14:50:11 -0400550 .. deprecated:: 0.6.2
551 Use ``connect(model_name=model_name)`` instead.
552 """
553 return await self.connect(model_name=model_name)
554
555 async def connect_current(self):
556 """
557 .. deprecated:: 0.6.2
558 Use ``connect()`` instead.
559 """
560 return await self.connect()
561
562 async def _connect_direct(self, **kwargs):
563 await self.disconnect()
564 await self._connector.connect(**kwargs)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400565 await self._after_connect()
566
567 async def _after_connect(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400568 self._watch()
Adam Israelb8a82812019-03-27 14:50:11 -0400569
570 # Wait for the first packet of data from the AllWatcher,
571 # which contains all information on the model.
572 # TODO this means that we can't do anything until
573 # we've received all the model data, which might be
574 # a whole load of unneeded data if all the client wants
575 # to do is make one RPC call.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400576 await self._watch_received.wait()
Adam Israelb8a82812019-03-27 14:50:11 -0400577
Adam Israeldcdf82b2017-08-15 15:26:43 -0400578 await self.get_info()
579
580 async def disconnect(self):
581 """Shut down the watcher task and close websockets.
582
583 """
Adam Israelb8a82812019-03-27 14:50:11 -0400584 if not self._watch_stopped.is_set():
Adam Israeldcdf82b2017-08-15 15:26:43 -0400585 log.debug('Stopping watcher task')
586 self._watch_stopping.set()
587 await self._watch_stopped.wait()
Adam Israelb8a82812019-03-27 14:50:11 -0400588 self._watch_stopping.clear()
589
590 if self.is_connected():
Adam Israeldcdf82b2017-08-15 15:26:43 -0400591 log.debug('Closing model connection')
Adam Israelb8a82812019-03-27 14:50:11 -0400592 await self._connector.disconnect()
593 self._info = None
Adam Israeldcdf82b2017-08-15 15:26:43 -0400594
595 async def add_local_charm_dir(self, charm_dir, series):
596 """Upload a local charm to the model.
597
598 This will automatically generate an archive from
599 the charm dir.
600
601 :param charm_dir: Path to the charm directory
602 :param series: Charm series
603
604 """
605 fh = tempfile.NamedTemporaryFile()
606 CharmArchiveGenerator(charm_dir).make_archive(fh.name)
607 with fh:
608 func = partial(
609 self.add_local_charm, fh, series, os.stat(fh.name).st_size)
Adam Israelb8a82812019-03-27 14:50:11 -0400610 charm_url = await self._connector.loop.run_in_executor(None, func)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400611
612 log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
613 return charm_url
614
615 def add_local_charm(self, charm_file, series, size=None):
616 """Upload a local charm archive to the model.
617
618 Returns the 'local:...' url that should be used to deploy the charm.
619
620 :param charm_file: Path to charm zip archive
621 :param series: Charm series
622 :param size: Size of the archive, in bytes
623 :return str: 'local:...' url for deploying the charm
624 :raises: :class:`JujuError` if the upload fails
625
626 Uses an https endpoint at the same host:port as the wss.
627 Supports large file uploads.
628
629 .. warning::
630
631 This method will block. Consider using :meth:`add_local_charm_dir`
632 instead.
633
634 """
Adam Israelb8a82812019-03-27 14:50:11 -0400635 conn, headers, path_prefix = self.connection().https_connection()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400636 path = "%s/charms?series=%s" % (path_prefix, series)
637 headers['Content-Type'] = 'application/zip'
638 if size:
639 headers['Content-Length'] = size
640 conn.request("POST", path, charm_file, headers)
641 response = conn.getresponse()
642 result = response.read().decode()
643 if not response.status == 200:
644 raise JujuError(result)
645 result = json.loads(result)
646 return result['charm-url']
647
648 def all_units_idle(self):
649 """Return True if all units are idle.
650
651 """
652 for unit in self.units.values():
653 unit_status = unit.data['agent-status']['current']
654 if unit_status != 'idle':
655 return False
656 return True
657
658 async def reset(self, force=False):
659 """Reset the model to a clean state.
660
661 :param bool force: Force-terminate machines.
662
663 This returns only after the model has reached a clean state. "Clean"
664 means no applications or machines exist in the model.
665
666 """
667 log.debug('Resetting model')
668 for app in self.applications.values():
669 await app.destroy()
670 for machine in self.machines.values():
671 await machine.destroy(force=force)
672 await self.block_until(
673 lambda: len(self.machines) == 0
674 )
675
676 async def block_until(self, *conditions, timeout=None, wait_period=0.5):
677 """Return only after all conditions are true.
678
Adam Israelb8a82812019-03-27 14:50:11 -0400679 Raises `websockets.ConnectionClosed` if disconnected.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400680 """
Adam Israelb8a82812019-03-27 14:50:11 -0400681 def _disconnected():
682 return not (self.is_connected() and self.connection().is_open)
683
684 def done():
685 return _disconnected() or all(c() for c in conditions)
686
687 await utils.block_until(done,
688 timeout=timeout,
689 wait_period=wait_period,
690 loop=self.loop)
691 if _disconnected():
692 raise websockets.ConnectionClosed(1006, 'no reason')
Adam Israeldcdf82b2017-08-15 15:26:43 -0400693
694 @property
695 def applications(self):
696 """Return a map of application-name:Application for all applications
697 currently in the model.
698
699 """
700 return self.state.applications
701
702 @property
703 def machines(self):
704 """Return a map of machine-id:Machine for all machines currently in
705 the model.
706
707 """
708 return self.state.machines
709
710 @property
711 def units(self):
712 """Return a map of unit-id:Unit for all units currently in
713 the model.
714
715 """
716 return self.state.units
717
Adam Israelb8a82812019-03-27 14:50:11 -0400718 @property
719 def relations(self):
720 """Return a list of all Relations currently in the model.
721
722 """
723 return list(self.state.relations.values())
724
Adam Israeldcdf82b2017-08-15 15:26:43 -0400725 async def get_info(self):
726 """Return a client.ModelInfo object for this Model.
727
728 Retrieves latest info for this Model from the api server. The
729 return value is cached on the Model.info attribute so that the
730 valued may be accessed again without another api call, if
731 desired.
732
733 This method is called automatically when the Model is connected,
734 resulting in Model.info being initialized without requiring an
735 explicit call to this method.
736
737 """
Adam Israelb8a82812019-03-27 14:50:11 -0400738 facade = client.ClientFacade.from_connection(self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -0400739
Adam Israelb8a82812019-03-27 14:50:11 -0400740 self._info = await facade.ModelInfo()
Adam Israeldcdf82b2017-08-15 15:26:43 -0400741 log.debug('Got ModelInfo: %s', vars(self.info))
742
743 return self.info
744
Adam Israelb8a82812019-03-27 14:50:11 -0400745 @property
746 def info(self):
747 """Return the cached client.ModelInfo object for this Model.
748
749 If Model.get_info() has not been called, this will return None.
750 """
751 return self._info
752
Adam Israeldcdf82b2017-08-15 15:26:43 -0400753 def add_observer(
754 self, callable_, entity_type=None, action=None, entity_id=None,
755 predicate=None):
756 """Register an "on-model-change" callback
757
758 Once the model is connected, ``callable_``
759 will be called each time the model changes. ``callable_`` should
760 be Awaitable and accept the following positional arguments:
761
762 delta - An instance of :class:`juju.delta.EntityDelta`
763 containing the raw delta data recv'd from the Juju
764 websocket.
765
766 old_obj - If the delta modifies an existing object in the model,
767 old_obj will be a copy of that object, as it was before the
768 delta was applied. Will be None if the delta creates a new
769 entity in the model.
770
771 new_obj - A copy of the new or updated object, after the delta
772 is applied. Will be None if the delta removes an entity
773 from the model.
774
775 model - The :class:`Model` itself.
776
777 Events for which ``callable_`` is called can be specified by passing
778 entity_type, action, and/or entitiy_id filter criteria, e.g.::
779
780 add_observer(
781 myfunc,
782 entity_type='application', action='add', entity_id='ubuntu')
783
784 For more complex filtering conditions, pass a predicate function. It
785 will be called with a delta as its only argument. If the predicate
786 function returns True, the ``callable_`` will be called.
787
788 """
789 observer = _Observer(
790 callable_, entity_type, action, entity_id, predicate)
Adam Israelb8a82812019-03-27 14:50:11 -0400791 self._observers[observer] = callable_
Adam Israeldcdf82b2017-08-15 15:26:43 -0400792
793 def _watch(self):
794 """Start an asynchronous watch against this model.
795
796 See :meth:`add_observer` to register an onchange callback.
797
798 """
799 async def _all_watcher():
800 try:
801 allwatcher = client.AllWatcherFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -0400802 self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -0400803 while not self._watch_stopping.is_set():
804 try:
805 results = await utils.run_with_interrupt(
806 allwatcher.Next(),
807 self._watch_stopping,
Adam Israelb8a82812019-03-27 14:50:11 -0400808 loop=self._connector.loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400809 except JujuAPIError as e:
810 if 'watcher was stopped' not in str(e):
811 raise
812 if self._watch_stopping.is_set():
813 # this shouldn't ever actually happen, because
814 # the event should trigger before the controller
815 # has a chance to tell us the watcher is stopped
816 # but handle it gracefully, just in case
817 break
818 # controller stopped our watcher for some reason
819 # but we're not actually stopping, so just restart it
820 log.warning(
821 'Watcher: watcher stopped, restarting')
822 del allwatcher.Id
823 continue
824 except websockets.ConnectionClosed:
Adam Israelb8a82812019-03-27 14:50:11 -0400825 monitor = self.connection().monitor
Adam Israeldcdf82b2017-08-15 15:26:43 -0400826 if monitor.status == monitor.ERROR:
827 # closed unexpectedly, try to reopen
828 log.warning(
829 'Watcher: connection closed, reopening')
Adam Israelb8a82812019-03-27 14:50:11 -0400830 await self.connection().reconnect()
831 if monitor.status != monitor.CONNECTED:
832 # reconnect failed; abort and shutdown
833 log.error('Watcher: automatic reconnect '
834 'failed; stopping watcher')
835 break
Adam Israeldcdf82b2017-08-15 15:26:43 -0400836 del allwatcher.Id
837 continue
838 else:
839 # closed on request, go ahead and shutdown
840 break
841 if self._watch_stopping.is_set():
Adam Israelb8a82812019-03-27 14:50:11 -0400842 try:
843 await allwatcher.Stop()
844 except websockets.ConnectionClosed:
845 pass # can't stop on a closed conn
Adam Israeldcdf82b2017-08-15 15:26:43 -0400846 break
847 for delta in results.deltas:
848 delta = get_entity_delta(delta)
849 old_obj, new_obj = self.state.apply_delta(delta)
850 await self._notify_observers(delta, old_obj, new_obj)
851 self._watch_received.set()
852 except CancelledError:
853 pass
854 except Exception:
855 log.exception('Error in watcher')
856 raise
857 finally:
858 self._watch_stopped.set()
859
860 log.debug('Starting watcher task')
861 self._watch_received.clear()
862 self._watch_stopping.clear()
863 self._watch_stopped.clear()
Adam Israelb8a82812019-03-27 14:50:11 -0400864 self._connector.loop.create_task(_all_watcher())
Adam Israeldcdf82b2017-08-15 15:26:43 -0400865
866 async def _notify_observers(self, delta, old_obj, new_obj):
867 """Call observing callbacks, notifying them of a change in model state
868
869 :param delta: The raw change from the watcher
870 (:class:`juju.client.overrides.Delta`)
871 :param old_obj: The object in the model that this delta updates.
872 May be None.
873 :param new_obj: The object in the model that is created or updated
874 by applying this delta.
875
876 """
877 if new_obj and not old_obj:
878 delta.type = 'add'
879
880 log.debug(
881 'Model changed: %s %s %s',
882 delta.entity, delta.type, delta.get_id())
883
Adam Israelb8a82812019-03-27 14:50:11 -0400884 for o in self._observers:
Adam Israeldcdf82b2017-08-15 15:26:43 -0400885 if o.cares_about(delta):
886 asyncio.ensure_future(o(delta, old_obj, new_obj, self),
Adam Israelb8a82812019-03-27 14:50:11 -0400887 loop=self._connector.loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400888
889 async def _wait(self, entity_type, entity_id, action, predicate=None):
890 """
891 Block the calling routine until a given action has happened to the
892 given entity
893
894 :param entity_type: The entity's type.
895 :param entity_id: The entity's id.
896 :param action: the type of action (e.g., 'add', 'change', or 'remove')
897 :param predicate: optional callable that must take as an
898 argument a delta, and must return a boolean, indicating
899 whether the delta contains the specific action we're looking
900 for. For example, you might check to see whether a 'change'
901 has a 'completed' status. See the _Observer class for details.
902
903 """
Adam Israelb8a82812019-03-27 14:50:11 -0400904 q = asyncio.Queue(loop=self._connector.loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400905
906 async def callback(delta, old, new, model):
907 await q.put(delta.get_id())
908
909 self.add_observer(callback, entity_type, action, entity_id, predicate)
910 entity_id = await q.get()
911 # object might not be in the entity_map if we were waiting for a
912 # 'remove' action
913 return self.state._live_entity_map(entity_type).get(entity_id)
914
Adam Israelb8a82812019-03-27 14:50:11 -0400915 async def _wait_for_new(self, entity_type, entity_id):
Adam Israeldcdf82b2017-08-15 15:26:43 -0400916 """Wait for a new object to appear in the Model and return it.
917
Adam Israelb8a82812019-03-27 14:50:11 -0400918 Waits for an object of type ``entity_type`` with id ``entity_id``
919 to appear in the model. This is similar to watching for the
920 object using ``block_until``, but uses the watcher rather than
921 polling.
Adam Israeldcdf82b2017-08-15 15:26:43 -0400922
923 """
924 # if the entity is already in the model, just return it
925 if entity_id in self.state._live_entity_map(entity_type):
926 return self.state._live_entity_map(entity_type)[entity_id]
Adam Israelb8a82812019-03-27 14:50:11 -0400927 return await self._wait(entity_type, entity_id, None)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400928
929 async def wait_for_action(self, action_id):
930 """Given an action, wait for it to complete."""
931
932 if action_id.startswith("action-"):
933 # if we've been passed action.tag, transform it into the
934 # id that the api deltas will use.
935 action_id = action_id[7:]
936
937 def predicate(delta):
938 return delta.data['status'] in ('completed', 'failed')
939
Adam Israelb8a82812019-03-27 14:50:11 -0400940 return await self._wait('action', action_id, None, predicate)
Adam Israeldcdf82b2017-08-15 15:26:43 -0400941
942 async def add_machine(
943 self, spec=None, constraints=None, disks=None, series=None):
944 """Start a new, empty machine and optionally a container, or add a
945 container to a machine.
946
947 :param str spec: Machine specification
948 Examples::
949
950 (None) - starts a new machine
951 'lxd' - starts a new machine with one lxd container
952 'lxd:4' - starts a new lxd container on machine 4
Adam Israelb8a82812019-03-27 14:50:11 -0400953 'ssh:user@10.10.0.3:/path/to/private/key' - manually provision
954 a machine with ssh and the private key used for authentication
Adam Israeldcdf82b2017-08-15 15:26:43 -0400955 'zone=us-east-1a' - starts a machine in zone us-east-1s on AWS
956 'maas2.name' - acquire machine maas2.name on MAAS
957
958 :param dict constraints: Machine constraints, which can contain the
959 the following keys::
960
961 arch : str
962 container : str
963 cores : int
964 cpu_power : int
965 instance_type : str
966 mem : int
967 root_disk : int
968 spaces : list(str)
969 tags : list(str)
970 virt_type : str
971
972 Example::
973
974 constraints={
975 'mem': 256 * MB,
976 'tags': ['virtual'],
977 }
978
979 :param list disks: List of disk constraint dictionaries, which can
980 contain the following keys::
981
982 count : int
983 pool : str
984 size : int
985
986 Example::
987
988 disks=[{
989 'pool': 'rootfs',
990 'size': 10 * GB,
991 'count': 1,
992 }]
993
994 :param str series: Series, e.g. 'xenial'
995
996 Supported container types are: lxd, kvm
997
998 When deploying a container to an existing machine, constraints cannot
999 be used.
1000
1001 """
1002 params = client.AddMachineParams()
Adam Israeldcdf82b2017-08-15 15:26:43 -04001003
1004 if spec:
Adam Israelb8a82812019-03-27 14:50:11 -04001005 if spec.startswith("ssh:"):
1006 placement, target, private_key_path = spec.split(":")
1007 user, host = target.split("@")
1008
1009 sshProvisioner = provisioner.SSHProvisioner(
1010 host=host,
1011 user=user,
1012 private_key_path=private_key_path,
1013 )
1014
1015 params = sshProvisioner.provision_machine()
1016 else:
1017 placement = parse_placement(spec)
1018 if placement:
1019 params.placement = placement[0]
1020
1021 params.jobs = ['JobHostUnits']
Adam Israeldcdf82b2017-08-15 15:26:43 -04001022
1023 if constraints:
1024 params.constraints = client.Value.from_json(constraints)
1025
1026 if disks:
1027 params.disks = [
1028 client.Constraints.from_json(o) for o in disks]
1029
1030 if series:
1031 params.series = series
1032
1033 # Submit the request.
Adam Israelb8a82812019-03-27 14:50:11 -04001034 client_facade = client.ClientFacade.from_connection(self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001035 results = await client_facade.AddMachines([params])
1036 error = results.machines[0].error
1037 if error:
1038 raise ValueError("Error adding machine: %s" % error.message)
1039 machine_id = results.machines[0].machine
Adam Israelb8a82812019-03-27 14:50:11 -04001040
1041 if spec:
1042 if spec.startswith("ssh:"):
1043 # Need to run this after AddMachines has been called,
1044 # as we need the machine_id
1045 await sshProvisioner.install_agent(
1046 self.connection(),
1047 params.nonce,
1048 machine_id,
1049 )
1050
Adam Israeldcdf82b2017-08-15 15:26:43 -04001051 log.debug('Added new machine %s', machine_id)
1052 return await self._wait_for_new('machine', machine_id)
1053
1054 async def add_relation(self, relation1, relation2):
1055 """Add a relation between two applications.
1056
1057 :param str relation1: '<application>[:<relation_name>]'
1058 :param str relation2: '<application>[:<relation_name>]'
1059
1060 """
Adam Israelb8a82812019-03-27 14:50:11 -04001061 connection = self.connection()
1062 app_facade = client.ApplicationFacade.from_connection(connection)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001063
1064 log.debug(
1065 'Adding relation %s <-> %s', relation1, relation2)
1066
Adam Israelb8a82812019-03-27 14:50:11 -04001067 def _find_relation(*specs):
1068 for rel in self.relations:
1069 if rel.matches(*specs):
1070 return rel
1071 return None
1072
Adam Israeldcdf82b2017-08-15 15:26:43 -04001073 try:
1074 result = await app_facade.AddRelation([relation1, relation2])
1075 except JujuAPIError as e:
1076 if 'relation already exists' not in e.message:
1077 raise
Adam Israelb8a82812019-03-27 14:50:11 -04001078 rel = _find_relation(relation1, relation2)
1079 if rel:
1080 return rel
1081 raise JujuError('Relation {} {} exists but not in model'.format(
1082 relation1, relation2))
Adam Israeldcdf82b2017-08-15 15:26:43 -04001083
Adam Israelb8a82812019-03-27 14:50:11 -04001084 specs = ['{}:{}'.format(app, data['name'])
1085 for app, data in result.endpoints.items()]
Adam Israeldcdf82b2017-08-15 15:26:43 -04001086
Adam Israelb8a82812019-03-27 14:50:11 -04001087 await self.block_until(lambda: _find_relation(*specs) is not None)
1088 return _find_relation(*specs)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001089
1090 def add_space(self, name, *cidrs):
1091 """Add a new network space.
1092
1093 Adds a new space with the given name and associates the given
1094 (optional) list of existing subnet CIDRs with it.
1095
1096 :param str name: Name of the space
Adam Israelb8a82812019-03-27 14:50:11 -04001097 :param *cidrs: Optional list of existing subnet CIDRs
Adam Israeldcdf82b2017-08-15 15:26:43 -04001098
1099 """
1100 raise NotImplementedError()
1101
1102 async def add_ssh_key(self, user, key):
1103 """Add a public SSH key to this model.
1104
1105 :param str user: The username of the user
1106 :param str key: The public ssh key
1107
1108 """
Adam Israelb8a82812019-03-27 14:50:11 -04001109 key_facade = client.KeyManagerFacade.from_connection(self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001110 return await key_facade.AddKeys([key], user)
1111 add_ssh_keys = add_ssh_key
1112
1113 def add_subnet(self, cidr_or_id, space, *zones):
1114 """Add an existing subnet to this model.
1115
1116 :param str cidr_or_id: CIDR or provider ID of the existing subnet
1117 :param str space: Network space with which to associate
Adam Israelb8a82812019-03-27 14:50:11 -04001118 :param str *zones: Zone(s) in which the subnet resides
Adam Israeldcdf82b2017-08-15 15:26:43 -04001119
1120 """
1121 raise NotImplementedError()
1122
1123 def get_backups(self):
1124 """Retrieve metadata for backups in this model.
1125
1126 """
1127 raise NotImplementedError()
1128
1129 def block(self, *commands):
1130 """Add a new block to this model.
1131
Adam Israelb8a82812019-03-27 14:50:11 -04001132 :param str *commands: The commands to block. Valid values are
Adam Israeldcdf82b2017-08-15 15:26:43 -04001133 'all-changes', 'destroy-model', 'remove-object'
1134
1135 """
1136 raise NotImplementedError()
1137
1138 def get_blocks(self):
1139 """List blocks for this model.
1140
1141 """
1142 raise NotImplementedError()
1143
1144 def get_cached_images(self, arch=None, kind=None, series=None):
1145 """Return a list of cached OS images.
1146
1147 :param str arch: Filter by image architecture
1148 :param str kind: Filter by image kind, e.g. 'lxd'
1149 :param str series: Filter by image series, e.g. 'xenial'
1150
1151 """
1152 raise NotImplementedError()
1153
1154 def create_backup(self, note=None, no_download=False):
1155 """Create a backup of this model.
1156
1157 :param str note: A note to store with the backup
1158 :param bool no_download: Do not download the backup archive
1159 :return str: Path to downloaded archive
1160
1161 """
1162 raise NotImplementedError()
1163
1164 def create_storage_pool(self, name, provider_type, **pool_config):
1165 """Create or define a storage pool.
1166
1167 :param str name: Name to give the storage pool
1168 :param str provider_type: Pool provider type
Adam Israelb8a82812019-03-27 14:50:11 -04001169 :param **pool_config: key/value pool configuration pairs
Adam Israeldcdf82b2017-08-15 15:26:43 -04001170
1171 """
1172 raise NotImplementedError()
1173
1174 def debug_log(
1175 self, no_tail=False, exclude_module=None, include_module=None,
1176 include=None, level=None, limit=0, lines=10, replay=False,
1177 exclude=None):
1178 """Get log messages for this model.
1179
1180 :param bool no_tail: Stop after returning existing log messages
1181 :param list exclude_module: Do not show log messages for these logging
1182 modules
1183 :param list include_module: Only show log messages for these logging
1184 modules
1185 :param list include: Only show log messages for these entities
1186 :param str level: Log level to show, valid options are 'TRACE',
1187 'DEBUG', 'INFO', 'WARNING', 'ERROR,
1188 :param int limit: Return this many of the most recent (possibly
1189 filtered) lines are shown
1190 :param int lines: Yield this many of the most recent lines, and keep
1191 yielding
1192 :param bool replay: Yield the entire log, and keep yielding
1193 :param list exclude: Do not show log messages for these entities
1194
1195 """
1196 raise NotImplementedError()
1197
1198 def _get_series(self, entity_url, entity):
1199 # try to get the series from the provided charm URL
1200 if entity_url.startswith('cs:'):
1201 parts = entity_url[3:].split('/')
1202 else:
1203 parts = entity_url.split('/')
1204 if parts[0].startswith('~'):
1205 parts.pop(0)
1206 if len(parts) > 1:
1207 # series was specified in the URL
1208 return parts[0]
1209 # series was not supplied at all, so use the newest
1210 # supported series according to the charm store
1211 ss = entity['Meta']['supported-series']
1212 return ss['SupportedSeries'][0]
1213
1214 async def deploy(
1215 self, entity_url, application_name=None, bind=None, budget=None,
1216 channel=None, config=None, constraints=None, force=False,
1217 num_units=1, plan=None, resources=None, series=None, storage=None,
Adam Israelb8a82812019-03-27 14:50:11 -04001218 to=None, devices=None):
Adam Israeldcdf82b2017-08-15 15:26:43 -04001219 """Deploy a new service or bundle.
1220
1221 :param str entity_url: Charm or bundle url
1222 :param str application_name: Name to give the service
1223 :param dict bind: <charm endpoint>:<network space> pairs
1224 :param dict budget: <budget name>:<limit> pairs
1225 :param str channel: Charm store channel from which to retrieve
1226 the charm or bundle, e.g. 'edge'
1227 :param dict config: Charm configuration dictionary
1228 :param constraints: Service constraints
1229 :type constraints: :class:`juju.Constraints`
1230 :param bool force: Allow charm to be deployed to a machine running
1231 an unsupported series
1232 :param int num_units: Number of units to deploy
1233 :param str plan: Plan under which to deploy charm
1234 :param dict resources: <resource name>:<file path> pairs
1235 :param str series: Series on which to deploy
1236 :param dict storage: Storage constraints TODO how do these look?
1237 :param to: Placement directive as a string. For example:
1238
1239 '23' - place on machine 23
1240 'lxd:7' - place in new lxd container on machine 7
1241 '24/lxd/3' - place in container 3 on machine 24
1242
1243 If None, a new machine is provisioned.
1244
1245
1246 TODO::
1247
1248 - support local resources
1249
1250 """
1251 if storage:
1252 storage = {
1253 k: client.Constraints(**v)
1254 for k, v in storage.items()
1255 }
1256
Adam Israelb8a82812019-03-27 14:50:11 -04001257 entity_path = Path(entity_url.replace('local:', ''))
1258 bundle_path = entity_path / 'bundle.yaml'
1259 metadata_path = entity_path / 'metadata.yaml'
1260
Adam Israeldcdf82b2017-08-15 15:26:43 -04001261 is_local = (
1262 entity_url.startswith('local:') or
Adam Israelb8a82812019-03-27 14:50:11 -04001263 entity_path.is_dir() or
1264 entity_path.is_file()
Adam Israeldcdf82b2017-08-15 15:26:43 -04001265 )
1266 if is_local:
1267 entity_id = entity_url.replace('local:', '')
1268 else:
Adam Israelb8a82812019-03-27 14:50:11 -04001269 entity = await self.charmstore.entity(entity_url, channel=channel,
1270 include_stats=False)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001271 entity_id = entity['Id']
1272
Adam Israelb8a82812019-03-27 14:50:11 -04001273 client_facade = client.ClientFacade.from_connection(self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001274
1275 is_bundle = ((is_local and
Adam Israelb8a82812019-03-27 14:50:11 -04001276 (entity_id.endswith('.yaml') and entity_path.exists()) or
1277 bundle_path.exists()) or
Adam Israeldcdf82b2017-08-15 15:26:43 -04001278 (not is_local and 'bundle/' in entity_id))
1279
1280 if is_bundle:
1281 handler = BundleHandler(self)
1282 await handler.fetch_plan(entity_id)
1283 await handler.execute_plan()
1284 extant_apps = {app for app in self.applications}
1285 pending_apps = set(handler.applications) - extant_apps
1286 if pending_apps:
1287 # new apps will usually be in the model by now, but if some
1288 # haven't made it yet we'll need to wait on them to be added
1289 await asyncio.gather(*[
1290 asyncio.ensure_future(
1291 self._wait_for_new('application', app_name),
Adam Israelb8a82812019-03-27 14:50:11 -04001292 loop=self._connector.loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001293 for app_name in pending_apps
Adam Israelb8a82812019-03-27 14:50:11 -04001294 ], loop=self._connector.loop)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001295 return [app for name, app in self.applications.items()
1296 if name in handler.applications]
1297 else:
1298 if not is_local:
1299 if not application_name:
1300 application_name = entity['Meta']['charm-metadata']['Name']
1301 if not series:
1302 series = self._get_series(entity_url, entity)
1303 await client_facade.AddCharm(channel, entity_id)
1304 # XXX: we're dropping local resources here, but we don't
1305 # actually support them yet anyway
1306 resources = await self._add_store_resources(application_name,
1307 entity_id,
Adam Israelb8a82812019-03-27 14:50:11 -04001308 entity=entity)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001309 else:
Adam Israelb8a82812019-03-27 14:50:11 -04001310 if not application_name:
1311 metadata = yaml.load(metadata_path.read_text())
1312 application_name = metadata['name']
Adam Israeldcdf82b2017-08-15 15:26:43 -04001313 # We have a local charm dir that needs to be uploaded
1314 charm_dir = os.path.abspath(
1315 os.path.expanduser(entity_id))
1316 series = series or get_charm_series(charm_dir)
1317 if not series:
1318 raise JujuError(
1319 "Couldn't determine series for charm at {}. "
1320 "Pass a 'series' kwarg to Model.deploy().".format(
1321 charm_dir))
1322 entity_id = await self.add_local_charm_dir(charm_dir, series)
1323 return await self._deploy(
1324 charm_url=entity_id,
1325 application=application_name,
1326 series=series,
1327 config=config or {},
1328 constraints=constraints,
1329 endpoint_bindings=bind,
1330 resources=resources,
1331 storage=storage,
1332 channel=channel,
1333 num_units=num_units,
Adam Israelb8a82812019-03-27 14:50:11 -04001334 placement=parse_placement(to),
1335 devices=devices,
Adam Israeldcdf82b2017-08-15 15:26:43 -04001336 )
1337
Adam Israelb8a82812019-03-27 14:50:11 -04001338 async def _add_store_resources(self, application, entity_url,
1339 overrides=None, entity=None):
Adam Israeldcdf82b2017-08-15 15:26:43 -04001340 if not entity:
1341 # avoid extra charm store call if one was already made
Adam Israelb8a82812019-03-27 14:50:11 -04001342 entity = await self.charmstore.entity(entity_url,
1343 include_stats=False)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001344 resources = [
1345 {
1346 'description': resource['Description'],
1347 'fingerprint': resource['Fingerprint'],
1348 'name': resource['Name'],
1349 'path': resource['Path'],
1350 'revision': resource['Revision'],
1351 'size': resource['Size'],
1352 'type_': resource['Type'],
1353 'origin': 'store',
1354 } for resource in entity['Meta']['resources']
1355 ]
1356
Adam Israelb8a82812019-03-27 14:50:11 -04001357 if overrides:
1358 names = {r['name'] for r in resources}
1359 unknown = overrides.keys() - names
1360 if unknown:
1361 raise JujuError('Unrecognized resource{}: {}'.format(
1362 's' if len(unknown) > 1 else '',
1363 ', '.join(unknown)))
1364 for resource in resources:
1365 if resource['name'] in overrides:
1366 resource['revision'] = overrides[resource['name']]
1367
Adam Israeldcdf82b2017-08-15 15:26:43 -04001368 if not resources:
1369 return None
1370
1371 resources_facade = client.ResourcesFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001372 self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001373 response = await resources_facade.AddPendingResources(
1374 tag.application(application),
1375 entity_url,
1376 [client.CharmResource(**resource) for resource in resources])
1377 resource_map = {resource['name']: pid
1378 for resource, pid
1379 in zip(resources, response.pending_ids)}
1380 return resource_map
1381
1382 async def _deploy(self, charm_url, application, series, config,
1383 constraints, endpoint_bindings, resources, storage,
Adam Israelb8a82812019-03-27 14:50:11 -04001384 channel=None, num_units=None, placement=None,
1385 devices=None):
Adam Israeldcdf82b2017-08-15 15:26:43 -04001386 """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
1387 """
1388 log.info('Deploying %s', charm_url)
1389
1390 # stringify all config values for API, and convert to YAML
1391 config = {k: str(v) for k, v in config.items()}
1392 config = yaml.dump({application: config},
1393 default_flow_style=False)
1394
1395 app_facade = client.ApplicationFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001396 self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001397
1398 app = client.ApplicationDeploy(
1399 charm_url=charm_url,
1400 application=application,
1401 series=series,
1402 channel=channel,
1403 config_yaml=config,
1404 constraints=parse_constraints(constraints),
1405 endpoint_bindings=endpoint_bindings,
1406 num_units=num_units,
1407 resources=resources,
1408 storage=storage,
Adam Israelb8a82812019-03-27 14:50:11 -04001409 placement=placement,
1410 devices=devices,
Adam Israeldcdf82b2017-08-15 15:26:43 -04001411 )
Adam Israeldcdf82b2017-08-15 15:26:43 -04001412 result = await app_facade.Deploy([app])
1413 errors = [r.error.message for r in result.results if r.error]
1414 if errors:
1415 raise JujuError('\n'.join(errors))
1416 return await self._wait_for_new('application', application)
1417
1418 async def destroy(self):
1419 """Terminate all machines and resources for this model.
1420 Is already implemented in controller.py.
1421 """
1422 raise NotImplementedError()
1423
1424 async def destroy_unit(self, *unit_names):
1425 """Destroy units by name.
1426
1427 """
Adam Israelb8a82812019-03-27 14:50:11 -04001428 connection = self.connection()
1429 app_facade = client.ApplicationFacade.from_connection(connection)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001430
1431 log.debug(
1432 'Destroying unit%s %s',
1433 's' if len(unit_names) == 1 else '',
1434 ' '.join(unit_names))
1435
1436 return await app_facade.DestroyUnits(list(unit_names))
1437 destroy_units = destroy_unit
1438
1439 def get_backup(self, archive_id):
1440 """Download a backup archive file.
1441
1442 :param str archive_id: The id of the archive to download
1443 :return str: Path to the archive file
1444
1445 """
1446 raise NotImplementedError()
1447
1448 def enable_ha(
1449 self, num_controllers=0, constraints=None, series=None, to=None):
1450 """Ensure sufficient controllers exist to provide redundancy.
1451
1452 :param int num_controllers: Number of controllers to make available
1453 :param constraints: Constraints to apply to the controller machines
1454 :type constraints: :class:`juju.Constraints`
1455 :param str series: Series of the controller machines
1456 :param list to: Placement directives for controller machines, e.g.::
1457
1458 '23' - machine 23
1459 'lxc:7' - new lxc container on machine 7
1460 '24/lxc/3' - lxc container 3 or machine 24
1461
1462 If None, a new machine is provisioned.
1463
1464 """
1465 raise NotImplementedError()
1466
1467 async def get_config(self):
1468 """Return the configuration settings for this model.
1469
1470 :returns: A ``dict`` mapping keys to `ConfigValue` instances,
1471 which have `source` and `value` attributes.
1472 """
1473 config_facade = client.ModelConfigFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001474 self.connection()
Adam Israeldcdf82b2017-08-15 15:26:43 -04001475 )
1476 result = await config_facade.ModelGet()
1477 config = result.config
1478 for key, value in config.items():
1479 config[key] = ConfigValue.from_json(value)
1480 return config
1481
Adam Israelb8a82812019-03-27 14:50:11 -04001482 async def get_constraints(self):
Adam Israeldcdf82b2017-08-15 15:26:43 -04001483 """Return the machine constraints for this model.
1484
Adam Israelb8a82812019-03-27 14:50:11 -04001485 :returns: A ``dict`` of constraints.
Adam Israeldcdf82b2017-08-15 15:26:43 -04001486 """
Adam Israelb8a82812019-03-27 14:50:11 -04001487 constraints = {}
1488 client_facade = client.ClientFacade.from_connection(self.connection())
1489 result = await client_facade.GetModelConstraints()
Adam Israeldcdf82b2017-08-15 15:26:43 -04001490
Adam Israelb8a82812019-03-27 14:50:11 -04001491 # GetModelConstraints returns GetConstraintsResults which has a
1492 # 'constraints' attribute. If no constraints have been set
1493 # GetConstraintsResults.constraints is None. Otherwise
1494 # GetConstraintsResults.constraints has an attribute for each possible
1495 # constraint, each of these in turn will be None if they have not been
1496 # set.
1497 if result.constraints:
1498 constraint_types = [a for a in dir(result.constraints)
1499 if a in Value._toSchema.keys()]
1500 for constraint in constraint_types:
1501 value = getattr(result.constraints, constraint)
1502 if value is not None:
1503 constraints[constraint] = getattr(result.constraints,
1504 constraint)
1505 return constraints
Adam Israeldcdf82b2017-08-15 15:26:43 -04001506
1507 def import_ssh_key(self, identity):
1508 """Add a public SSH key from a trusted indentity source to this model.
1509
1510 :param str identity: User identity in the form <lp|gh>:<username>
1511
1512 """
1513 raise NotImplementedError()
1514 import_ssh_keys = import_ssh_key
1515
1516 async def get_machines(self):
1517 """Return list of machines in this model.
1518
1519 """
1520 return list(self.state.machines.keys())
1521
1522 def get_shares(self):
1523 """Return list of all users with access to this model.
1524
1525 """
1526 raise NotImplementedError()
1527
1528 def get_spaces(self):
1529 """Return list of all known spaces, including associated subnets.
1530
1531 """
1532 raise NotImplementedError()
1533
1534 async def get_ssh_key(self, raw_ssh=False):
1535 """Return known SSH keys for this model.
1536 :param bool raw_ssh: if True, returns the raw ssh key,
1537 else it's fingerprint
1538
1539 """
Adam Israelb8a82812019-03-27 14:50:11 -04001540 key_facade = client.KeyManagerFacade.from_connection(self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001541 entity = {'tag': tag.model(self.info.uuid)}
1542 entities = client.Entities([entity])
1543 return await key_facade.ListKeys(entities, raw_ssh)
1544 get_ssh_keys = get_ssh_key
1545
1546 def get_storage(self, filesystem=False, volume=False):
1547 """Return details of storage instances.
1548
1549 :param bool filesystem: Include filesystem storage
1550 :param bool volume: Include volume storage
1551
1552 """
1553 raise NotImplementedError()
1554
1555 def get_storage_pools(self, names=None, providers=None):
1556 """Return list of storage pools.
1557
1558 :param list names: Only include pools with these names
1559 :param list providers: Only include pools for these providers
1560
1561 """
1562 raise NotImplementedError()
1563
1564 def get_subnets(self, space=None, zone=None):
1565 """Return list of known subnets.
1566
1567 :param str space: Only include subnets in this space
1568 :param str zone: Only include subnets in this zone
1569
1570 """
1571 raise NotImplementedError()
1572
1573 def remove_blocks(self):
1574 """Remove all blocks from this model.
1575
1576 """
1577 raise NotImplementedError()
1578
1579 def remove_backup(self, backup_id):
1580 """Delete a backup.
1581
1582 :param str backup_id: The id of the backup to remove
1583
1584 """
1585 raise NotImplementedError()
1586
1587 def remove_cached_images(self, arch=None, kind=None, series=None):
1588 """Remove cached OS images.
1589
1590 :param str arch: Architecture of the images to remove
1591 :param str kind: Image kind to remove, e.g. 'lxd'
1592 :param str series: Image series to remove, e.g. 'xenial'
1593
1594 """
1595 raise NotImplementedError()
1596
1597 def remove_machine(self, *machine_ids):
1598 """Remove a machine from this model.
1599
Adam Israelb8a82812019-03-27 14:50:11 -04001600 :param str *machine_ids: Ids of the machines to remove
Adam Israeldcdf82b2017-08-15 15:26:43 -04001601
1602 """
1603 raise NotImplementedError()
1604 remove_machines = remove_machine
1605
1606 async def remove_ssh_key(self, user, key):
1607 """Remove a public SSH key(s) from this model.
1608
1609 :param str key: Full ssh key
1610 :param str user: Juju user to which the key is registered
1611
1612 """
Adam Israelb8a82812019-03-27 14:50:11 -04001613 key_facade = client.KeyManagerFacade.from_connection(self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001614 key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
1615 key = hashlib.md5(key).hexdigest()
Adam Israelb8a82812019-03-27 14:50:11 -04001616 key = ':'.join(a + b for a, b in zip(key[::2], key[1::2]))
Adam Israeldcdf82b2017-08-15 15:26:43 -04001617 await key_facade.DeleteKeys([key], user)
1618 remove_ssh_keys = remove_ssh_key
1619
1620 def restore_backup(
1621 self, bootstrap=False, constraints=None, archive=None,
1622 backup_id=None, upload_tools=False):
1623 """Restore a backup archive to a new controller.
1624
1625 :param bool bootstrap: Bootstrap a new state machine
1626 :param constraints: Model constraints
1627 :type constraints: :class:`juju.Constraints`
1628 :param str archive: Path to backup archive to restore
1629 :param str backup_id: Id of backup to restore
1630 :param bool upload_tools: Upload tools if bootstrapping a new machine
1631
1632 """
1633 raise NotImplementedError()
1634
1635 def retry_provisioning(self):
1636 """Retry provisioning for failed machines.
1637
1638 """
1639 raise NotImplementedError()
1640
Adam Israeldcdf82b2017-08-15 15:26:43 -04001641 def run(self, command, timeout=None):
1642 """Run command on all machines in this model.
1643
1644 :param str command: The command to run
1645 :param int timeout: Time to wait before command is considered failed
1646
1647 """
1648 raise NotImplementedError()
1649
1650 async def set_config(self, config):
1651 """Set configuration keys on this model.
1652
1653 :param dict config: Mapping of config keys to either string values or
1654 `ConfigValue` instances, as returned by `get_config`.
1655 """
1656 config_facade = client.ModelConfigFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001657 self.connection()
Adam Israeldcdf82b2017-08-15 15:26:43 -04001658 )
1659 for key, value in config.items():
1660 if isinstance(value, ConfigValue):
1661 config[key] = value.value
1662 await config_facade.ModelSet(config)
1663
Adam Israelb8a82812019-03-27 14:50:11 -04001664 async def set_constraints(self, constraints):
Adam Israeldcdf82b2017-08-15 15:26:43 -04001665 """Set machine constraints on this model.
1666
Adam Israelb8a82812019-03-27 14:50:11 -04001667 :param dict config: Mapping of model constraints
Adam Israeldcdf82b2017-08-15 15:26:43 -04001668 """
Adam Israelb8a82812019-03-27 14:50:11 -04001669 client_facade = client.ClientFacade.from_connection(self.connection())
1670 await client_facade.SetModelConstraints(
1671 application='',
1672 constraints=constraints)
Adam Israeldcdf82b2017-08-15 15:26:43 -04001673
Adam Israelb8a82812019-03-27 14:50:11 -04001674 async def get_action_output(self, action_uuid, wait=None):
Adam Israeldcdf82b2017-08-15 15:26:43 -04001675 """Get the results of an action by ID.
1676
1677 :param str action_uuid: Id of the action
Adam Israelb8a82812019-03-27 14:50:11 -04001678 :param int wait: Time in seconds to wait for action to complete.
1679 :return dict: Output from action
1680 :raises: :class:`JujuError` if invalid action_uuid
Adam Israeldcdf82b2017-08-15 15:26:43 -04001681 """
Adam Israelb8a82812019-03-27 14:50:11 -04001682 action_facade = client.ActionFacade.from_connection(
1683 self.connection()
1684 )
1685 entity = [{'tag': tag.action(action_uuid)}]
1686 # Cannot use self.wait_for_action as the action event has probably
1687 # already happened and self.wait_for_action works by processing
1688 # model deltas and checking if they match our type. If the action
1689 # has already occured then the delta has gone.
Adam Israeldcdf82b2017-08-15 15:26:43 -04001690
Adam Israelb8a82812019-03-27 14:50:11 -04001691 async def _wait_for_action_status():
1692 while True:
1693 action_output = await action_facade.Actions(entity)
1694 if action_output.results[0].status in ('completed', 'failed'):
1695 return
1696 else:
1697 await asyncio.sleep(1)
1698 await asyncio.wait_for(
1699 _wait_for_action_status(),
1700 timeout=wait)
1701 action_output = await action_facade.Actions(entity)
1702 # ActionResult.output is None if the action produced no output
1703 if action_output.results[0].output is None:
1704 output = {}
1705 else:
1706 output = action_output.results[0].output
1707 return output
1708
1709 async def get_action_status(self, uuid_or_prefix=None, name=None):
1710 """Get the status of all actions, filtered by ID, ID prefix, or name.
Adam Israeldcdf82b2017-08-15 15:26:43 -04001711
1712 :param str uuid_or_prefix: Filter by action uuid or prefix
1713 :param str name: Filter by action name
1714
1715 """
Adam Israelb8a82812019-03-27 14:50:11 -04001716 results = {}
1717 action_results = []
1718 action_facade = client.ActionFacade.from_connection(
1719 self.connection()
1720 )
1721 if name:
1722 name_results = await action_facade.FindActionsByNames([name])
1723 action_results.extend(name_results.actions[0].actions)
1724 if uuid_or_prefix:
1725 # Collect list of actions matching uuid or prefix
1726 matching_actions = await action_facade.FindActionTagsByPrefix(
1727 [uuid_or_prefix])
1728 entities = []
1729 for actions in matching_actions.matches.values():
1730 entities = [{'tag': a.tag} for a in actions]
1731 # Get action results matching action tags
1732 uuid_results = await action_facade.Actions(entities)
1733 action_results.extend(uuid_results.results)
1734 for a in action_results:
1735 results[tag.untag('action-', a.action.tag)] = a.status
1736 return results
Adam Israeldcdf82b2017-08-15 15:26:43 -04001737
1738 def get_budget(self, budget_name):
1739 """Get budget usage info.
1740
1741 :param str budget_name: Name of budget
1742
1743 """
1744 raise NotImplementedError()
1745
1746 async def get_status(self, filters=None, utc=False):
1747 """Return the status of the model.
1748
1749 :param str filters: Optional list of applications, units, or machines
1750 to include, which can use wildcards ('*').
1751 :param bool utc: Display time as UTC in RFC3339 format
1752
1753 """
Adam Israelb8a82812019-03-27 14:50:11 -04001754 client_facade = client.ClientFacade.from_connection(self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001755 return await client_facade.FullStatus(filters)
1756
1757 def sync_tools(
1758 self, all_=False, destination=None, dry_run=False, public=False,
1759 source=None, stream=None, version=None):
1760 """Copy Juju tools into this model.
1761
1762 :param bool all_: Copy all versions, not just the latest
1763 :param str destination: Path to local destination directory
1764 :param bool dry_run: Don't do the actual copy
1765 :param bool public: Tools are for a public cloud, so generate mirrors
1766 information
1767 :param str source: Path to local source directory
1768 :param str stream: Simplestreams stream for which to sync metadata
1769 :param str version: Copy a specific major.minor version
1770
1771 """
1772 raise NotImplementedError()
1773
1774 def unblock(self, *commands):
1775 """Unblock an operation that would alter this model.
1776
Adam Israelb8a82812019-03-27 14:50:11 -04001777 :param str *commands: The commands to unblock. Valid values are
Adam Israeldcdf82b2017-08-15 15:26:43 -04001778 'all-changes', 'destroy-model', 'remove-object'
1779
1780 """
1781 raise NotImplementedError()
1782
1783 def unset_config(self, *keys):
1784 """Unset configuration on this model.
1785
Adam Israelb8a82812019-03-27 14:50:11 -04001786 :param str *keys: The keys to unset
Adam Israeldcdf82b2017-08-15 15:26:43 -04001787
1788 """
1789 raise NotImplementedError()
1790
1791 def upgrade_gui(self):
1792 """Upgrade the Juju GUI for this model.
1793
1794 """
1795 raise NotImplementedError()
1796
1797 def upgrade_juju(
1798 self, dry_run=False, reset_previous_upgrade=False,
1799 upload_tools=False, version=None):
1800 """Upgrade Juju on all machines in a model.
1801
1802 :param bool dry_run: Don't do the actual upgrade
1803 :param bool reset_previous_upgrade: Clear the previous (incomplete)
1804 upgrade status
1805 :param bool upload_tools: Upload local version of tools
1806 :param str version: Upgrade to a specific version
1807
1808 """
1809 raise NotImplementedError()
1810
1811 def upload_backup(self, archive_path):
1812 """Store a backup archive remotely in Juju.
1813
1814 :param str archive_path: Path to local archive
1815
1816 """
1817 raise NotImplementedError()
1818
1819 @property
1820 def charmstore(self):
1821 return self._charmstore
1822
1823 async def get_metrics(self, *tags):
1824 """Retrieve metrics.
1825
Adam Israelb8a82812019-03-27 14:50:11 -04001826 :param str *tags: Tags of entities from which to retrieve metrics.
Adam Israeldcdf82b2017-08-15 15:26:43 -04001827 No tags retrieves the metrics of all units in the model.
1828 :return: Dictionary of unit_name:metrics
1829
1830 """
1831 log.debug("Retrieving metrics for %s",
1832 ', '.join(tags) if tags else "all units")
1833
1834 metrics_facade = client.MetricsDebugFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001835 self.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001836
1837 entities = [client.Entity(tag) for tag in tags]
1838 metrics_result = await metrics_facade.GetMetrics(entities)
1839
1840 metrics = collections.defaultdict(list)
1841
1842 for entity_metrics in metrics_result.results:
1843 error = entity_metrics.error
1844 if error:
1845 if "is not a valid tag" in error:
1846 raise ValueError(error.message)
1847 else:
1848 raise Exception(error.message)
1849
1850 for metric in entity_metrics.metrics:
1851 metrics[metric.unit].append(vars(metric))
1852
1853 return metrics
1854
1855
1856def get_charm_series(path):
1857 """Inspects the charm directory at ``path`` and returns a default
1858 series from its metadata.yaml (the first item in the 'series' list).
1859
1860 Returns None if no series can be determined.
1861
1862 """
1863 md = Path(path) / "metadata.yaml"
1864 if not md.exists():
1865 return None
1866 data = yaml.load(md.open())
1867 series = data.get('series')
1868 return series[0] if series else None
1869
1870
Adam Israelb8a82812019-03-27 14:50:11 -04001871class BundleHandler:
Adam Israeldcdf82b2017-08-15 15:26:43 -04001872 """
1873 Handle bundles by using the API to translate bundle YAML into a plan of
1874 steps and then dispatching each of those using the API.
1875 """
1876 def __init__(self, model):
1877 self.model = model
1878 self.charmstore = model.charmstore
1879 self.plan = []
1880 self.references = {}
1881 self._units_by_app = {}
1882 for unit_name, unit in model.units.items():
1883 app_units = self._units_by_app.setdefault(unit.application, [])
1884 app_units.append(unit_name)
Adam Israelb8a82812019-03-27 14:50:11 -04001885 self.bundle_facade = client.BundleFacade.from_connection(
1886 model.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001887 self.client_facade = client.ClientFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001888 model.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001889 self.app_facade = client.ApplicationFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001890 model.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001891 self.ann_facade = client.AnnotationsFacade.from_connection(
Adam Israelb8a82812019-03-27 14:50:11 -04001892 model.connection())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001893
1894 async def _handle_local_charms(self, bundle):
1895 """Search for references to local charms (i.e. filesystem paths)
1896 in the bundle. Upload the local charms to the model, and replace
1897 the filesystem paths with appropriate 'local:' paths in the bundle.
1898
1899 Return the modified bundle.
1900
1901 :param dict bundle: Bundle dictionary
1902 :return: Modified bundle dictionary
1903
1904 """
1905 apps, args = [], []
1906
1907 default_series = bundle.get('series')
Adam Israelb8a82812019-03-27 14:50:11 -04001908 apps_dict = bundle.get('applications', bundle.get('services', {}))
Adam Israeldcdf82b2017-08-15 15:26:43 -04001909 for app_name in self.applications:
Adam Israelb8a82812019-03-27 14:50:11 -04001910 app_dict = apps_dict[app_name]
Adam Israeldcdf82b2017-08-15 15:26:43 -04001911 charm_dir = os.path.abspath(os.path.expanduser(app_dict['charm']))
1912 if not os.path.isdir(charm_dir):
1913 continue
1914 series = (
1915 app_dict.get('series') or
1916 default_series or
1917 get_charm_series(charm_dir)
1918 )
1919 if not series:
1920 raise JujuError(
1921 "Couldn't determine series for charm at {}. "
1922 "Add a 'series' key to the bundle.".format(charm_dir))
1923
1924 # Keep track of what we need to update. We keep a list of apps
1925 # that need to be updated, and a corresponding list of args
1926 # needed to update those apps.
1927 apps.append(app_name)
1928 args.append((charm_dir, series))
1929
1930 if apps:
1931 # If we have apps to update, spawn all the coroutines concurrently
1932 # and wait for them to finish.
1933 charm_urls = await asyncio.gather(*[
1934 self.model.add_local_charm_dir(*params)
1935 for params in args
1936 ], loop=self.model.loop)
1937 # Update the 'charm:' entry for each app with the new 'local:' url.
1938 for app_name, charm_url in zip(apps, charm_urls):
Adam Israelb8a82812019-03-27 14:50:11 -04001939 apps_dict[app_name]['charm'] = charm_url
Adam Israeldcdf82b2017-08-15 15:26:43 -04001940
1941 return bundle
1942
1943 async def fetch_plan(self, entity_id):
Adam Israelb8a82812019-03-27 14:50:11 -04001944 is_store_url = entity_id.startswith('cs:')
1945
1946 if not is_store_url and os.path.isfile(entity_id):
1947 bundle_yaml = Path(entity_id).read_text()
1948 elif not is_store_url and os.path.isdir(entity_id):
Adam Israeldcdf82b2017-08-15 15:26:43 -04001949 bundle_yaml = (Path(entity_id) / "bundle.yaml").read_text()
1950 else:
1951 bundle_yaml = await self.charmstore.files(entity_id,
1952 filename='bundle.yaml',
1953 read_file=True)
1954 self.bundle = yaml.safe_load(bundle_yaml)
1955 self.bundle = await self._handle_local_charms(self.bundle)
1956
Adam Israelb8a82812019-03-27 14:50:11 -04001957 self.plan = await self.bundle_facade.GetChanges(
Adam Israeldcdf82b2017-08-15 15:26:43 -04001958 yaml.dump(self.bundle))
1959
Adam Israelb8a82812019-03-27 14:50:11 -04001960 if self.plan.errors:
1961 raise JujuError(self.plan.errors)
1962
Adam Israeldcdf82b2017-08-15 15:26:43 -04001963 async def execute_plan(self):
1964 for step in self.plan.changes:
1965 method = getattr(self, step.method)
1966 result = await method(*step.args)
1967 self.references[step.id_] = result
1968
1969 @property
1970 def applications(self):
Adam Israelb8a82812019-03-27 14:50:11 -04001971 apps_dict = self.bundle.get('applications',
1972 self.bundle.get('services', {}))
1973 return list(apps_dict.keys())
Adam Israeldcdf82b2017-08-15 15:26:43 -04001974
1975 def resolve(self, reference):
1976 if reference and reference.startswith('$'):
1977 reference = self.references[reference[1:]]
1978 return reference
1979
1980 async def addCharm(self, charm, series):
1981 """
1982 :param charm string:
1983 Charm holds the URL of the charm to be added.
1984
1985 :param series string:
1986 Series holds the series of the charm to be added
1987 if the charm default is not sufficient.
1988 """
1989 # We don't add local charms because they've already been added
1990 # by self._handle_local_charms
1991 if charm.startswith('local:'):
1992 return charm
1993
1994 entity_id = await self.charmstore.entityId(charm)
1995 log.debug('Adding %s', entity_id)
1996 await self.client_facade.AddCharm(None, entity_id)
1997 return entity_id
1998
1999 async def addMachines(self, params=None):
2000 """
2001 :param params dict:
2002 Dictionary specifying the machine to add. All keys are optional.
2003 Keys include:
2004
2005 series: string specifying the machine OS series.
2006
2007 constraints: string holding machine constraints, if any. We'll
2008 parse this into the json friendly dict that the juju api
2009 expects.
2010
2011 container_type: string holding the type of the container (for
2012 instance ""lxd" or kvm"). It is not specified for top level
2013 machines.
2014
2015 parent_id: string holding a placeholder pointing to another
2016 machine change or to a unit change. This value is only
2017 specified in the case this machine is a container, in
2018 which case also ContainerType is set.
2019
2020 """
2021 params = params or {}
2022
2023 # Normalize keys
2024 params = {normalize_key(k): params[k] for k in params.keys()}
2025
2026 # Fix up values, as necessary.
2027 if 'parent_id' in params:
Adam Israelb8a82812019-03-27 14:50:11 -04002028 if params['parent_id'].startswith('$addUnit'):
2029 unit = self.resolve(params['parent_id'])[0]
2030 params['parent_id'] = unit.machine.entity_id
2031 else:
2032 params['parent_id'] = self.resolve(params['parent_id'])
Adam Israeldcdf82b2017-08-15 15:26:43 -04002033
2034 params['constraints'] = parse_constraints(
2035 params.get('constraints'))
2036 params['jobs'] = params.get('jobs', ['JobHostUnits'])
2037
2038 if params.get('container_type') == 'lxc':
2039 log.warning('Juju 2.0 does not support lxc containers. '
2040 'Converting containers to lxd.')
2041 params['container_type'] = 'lxd'
2042
2043 # Submit the request.
2044 params = client.AddMachineParams(**params)
2045 results = await self.client_facade.AddMachines([params])
2046 error = results.machines[0].error
2047 if error:
2048 raise ValueError("Error adding machine: %s" % error.message)
2049 machine = results.machines[0].machine
2050 log.debug('Added new machine %s', machine)
2051 return machine
2052
2053 async def addRelation(self, endpoint1, endpoint2):
2054 """
2055 :param endpoint1 string:
2056 :param endpoint2 string:
2057 Endpoint1 and Endpoint2 hold relation endpoints in the
2058 "application:interface" form, where the application is always a
2059 placeholder pointing to an application change, and the interface is
2060 optional. Examples are "$deploy-42:web" or just "$deploy-42".
2061 """
2062 endpoints = [endpoint1, endpoint2]
2063 # resolve indirect references
2064 for i in range(len(endpoints)):
2065 parts = endpoints[i].split(':')
2066 parts[0] = self.resolve(parts[0])
2067 endpoints[i] = ':'.join(parts)
2068
2069 log.info('Relating %s <-> %s', *endpoints)
2070 return await self.model.add_relation(*endpoints)
2071
2072 async def deploy(self, charm, series, application, options, constraints,
Adam Israelb8a82812019-03-27 14:50:11 -04002073 storage, endpoint_bindings, *args):
Adam Israeldcdf82b2017-08-15 15:26:43 -04002074 """
2075 :param charm string:
2076 Charm holds the URL of the charm to be used to deploy this
2077 application.
2078
2079 :param series string:
2080 Series holds the series of the application to be deployed
2081 if the charm default is not sufficient.
2082
2083 :param application string:
2084 Application holds the application name.
2085
2086 :param options map[string]interface{}:
2087 Options holds application options.
2088
2089 :param constraints string:
2090 Constraints holds the optional application constraints.
2091
2092 :param storage map[string]string:
2093 Storage holds the optional storage constraints.
2094
2095 :param endpoint_bindings map[string]string:
2096 EndpointBindings holds the optional endpoint bindings
2097
Adam Israelb8a82812019-03-27 14:50:11 -04002098 :param devices map[string]string:
2099 Devices holds the optional devices constraints.
2100 (Only given on Juju 2.5+)
2101
Adam Israeldcdf82b2017-08-15 15:26:43 -04002102 :param resources map[string]int:
2103 Resources identifies the revision to use for each resource
2104 of the application's charm.
Adam Israelb8a82812019-03-27 14:50:11 -04002105
2106 :param num_units int:
2107 NumUnits holds the number of units required. For IAAS models, this
2108 will be 0 and separate AddUnitChanges will be used. For Kubernetes
2109 models, this will be used to scale the application.
2110 (Only given on Juju 2.5+)
Adam Israeldcdf82b2017-08-15 15:26:43 -04002111 """
2112 # resolve indirect references
2113 charm = self.resolve(charm)
Adam Israelb8a82812019-03-27 14:50:11 -04002114
2115 if len(args) == 1:
2116 # Juju 2.4 and below only sends the resources
2117 resources = args[0]
2118 devices, num_units = None, None
2119 else:
2120 # Juju 2.5+ sends devices before resources, as well as num_units
2121 # There might be placement but we need to ignore that.
2122 devices, resources, num_units = args[:3]
2123
Adam Israeldcdf82b2017-08-15 15:26:43 -04002124 if not charm.startswith('local:'):
Adam Israelb8a82812019-03-27 14:50:11 -04002125 resources = await self.model._add_store_resources(
2126 application, charm, overrides=resources)
Adam Israeldcdf82b2017-08-15 15:26:43 -04002127 await self.model._deploy(
2128 charm_url=charm,
2129 application=application,
2130 series=series,
2131 config=options,
2132 constraints=constraints,
2133 endpoint_bindings=endpoint_bindings,
2134 resources=resources,
2135 storage=storage,
Adam Israelb8a82812019-03-27 14:50:11 -04002136 devices=devices,
2137 num_units=num_units,
Adam Israeldcdf82b2017-08-15 15:26:43 -04002138 )
2139 return application
2140
2141 async def addUnit(self, application, to):
2142 """
2143 :param application string:
2144 Application holds the application placeholder name for which a unit
2145 is added.
2146
2147 :param to string:
2148 To holds the optional location where to add the unit, as a
2149 placeholder pointing to another unit change or to a machine change.
2150 """
2151 application = self.resolve(application)
2152 placement = self.resolve(to)
2153 if self._units_by_app.get(application):
2154 # enough units for this application already exist;
2155 # claim one, and carry on
2156 # NB: this should probably honor placement, but the juju client
2157 # doesn't, so we're not bothering, either
2158 unit_name = self._units_by_app[application].pop()
2159 log.debug('Reusing unit %s for %s', unit_name, application)
2160 return self.model.units[unit_name]
2161
2162 log.debug('Adding new unit for %s%s', application,
2163 ' to %s' % placement if placement else '')
2164 return await self.model.applications[application].add_unit(
2165 count=1,
2166 to=placement,
2167 )
2168
Adam Israelb8a82812019-03-27 14:50:11 -04002169 async def scale(self, application, scale):
2170 """
2171 Handle a change of scale to a k8s application.
2172
2173 :param string application:
2174 Application holds the application placeholder name for which a unit
2175 is added.
2176
2177 :param int scale:
2178 New scale value to use.
2179 """
2180 application = self.resolve(application)
2181 return await self.model.applications[application].scale(scale=scale)
2182
Adam Israeldcdf82b2017-08-15 15:26:43 -04002183 async def expose(self, application):
2184 """
2185 :param application string:
2186 Application holds the placeholder name of the application that must
2187 be exposed.
2188 """
2189 application = self.resolve(application)
2190 log.info('Exposing %s', application)
2191 return await self.model.applications[application].expose()
2192
2193 async def setAnnotations(self, id_, entity_type, annotations):
2194 """
2195 :param id_ string:
2196 Id is the placeholder for the application or machine change
2197 corresponding to the entity to be annotated.
2198
2199 :param entity_type EntityType:
2200 EntityType holds the type of the entity, "application" or
2201 "machine".
2202
2203 :param annotations map[string]string:
2204 Annotations holds the annotations as key/value pairs.
2205 """
2206 entity_id = self.resolve(id_)
2207 try:
2208 entity = self.model.state.get_entity(entity_type, entity_id)
2209 except KeyError:
2210 entity = await self.model._wait_for_new(entity_type, entity_id)
2211 return await entity.set_annotations(annotations)
2212
2213
Adam Israelb8a82812019-03-27 14:50:11 -04002214class CharmStore:
Adam Israeldcdf82b2017-08-15 15:26:43 -04002215 """
2216 Async wrapper around theblues.charmstore.CharmStore
2217 """
Adam Israelb8a82812019-03-27 14:50:11 -04002218 def __init__(self, loop, cs_timeout=20):
Adam Israeldcdf82b2017-08-15 15:26:43 -04002219 self.loop = loop
Adam Israelb8a82812019-03-27 14:50:11 -04002220 self._cs = theblues.charmstore.CharmStore(timeout=cs_timeout)
Adam Israeldcdf82b2017-08-15 15:26:43 -04002221
2222 def __getattr__(self, name):
2223 """
2224 Wrap method calls in coroutines that use run_in_executor to make them
2225 async.
2226 """
2227 attr = getattr(self._cs, name)
2228 if not callable(attr):
2229 wrapper = partial(getattr, self._cs, name)
2230 setattr(self, name, wrapper)
2231 else:
2232 async def coro(*args, **kwargs):
2233 method = partial(attr, *args, **kwargs)
2234 for attempt in range(1, 4):
2235 try:
2236 return await self.loop.run_in_executor(None, method)
2237 except theblues.errors.ServerError:
2238 if attempt == 3:
2239 raise
2240 await asyncio.sleep(1, loop=self.loop)
2241 setattr(self, name, coro)
2242 wrapper = coro
2243 return wrapper
2244
2245
Adam Israelb8a82812019-03-27 14:50:11 -04002246class CharmArchiveGenerator:
Adam Israeldcdf82b2017-08-15 15:26:43 -04002247 """
2248 Create a Zip archive of a local charm directory for upload to a controller.
2249
2250 This is used automatically by
2251 `Model.add_local_charm_dir <#juju.model.Model.add_local_charm_dir>`_.
2252 """
2253 def __init__(self, path):
2254 self.path = os.path.abspath(os.path.expanduser(path))
2255
2256 def make_archive(self, path):
2257 """Create archive of directory and write to ``path``.
2258
2259 :param path: Path to archive
2260
2261 Ignored::
2262
Adam Israelb8a82812019-03-27 14:50:11 -04002263 * build/* - This is used for packing the charm itself and any
Adam Israeldcdf82b2017-08-15 15:26:43 -04002264 similar tasks.
Adam Israelb8a82812019-03-27 14:50:11 -04002265 * */.* - Hidden files are all ignored for now. This will most
Adam Israeldcdf82b2017-08-15 15:26:43 -04002266 likely be changed into a specific ignore list
2267 (.bzr, etc)
2268
2269 """
2270 zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
2271 for dirpath, dirnames, filenames in os.walk(self.path):
2272 relative_path = dirpath[len(self.path) + 1:]
2273 if relative_path and not self._ignore(relative_path):
2274 zf.write(dirpath, relative_path)
2275 for name in filenames:
2276 archive_name = os.path.join(relative_path, name)
2277 if not self._ignore(archive_name):
2278 real_path = os.path.join(dirpath, name)
2279 self._check_type(real_path)
2280 if os.path.islink(real_path):
2281 self._check_link(real_path)
2282 self._write_symlink(
2283 zf, os.readlink(real_path), archive_name)
2284 else:
2285 zf.write(real_path, archive_name)
2286 zf.close()
2287 return path
2288
2289 def _check_type(self, path):
2290 """Check the path
2291 """
2292 s = os.stat(path)
2293 if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
2294 return path
2295 raise ValueError("Invalid Charm at % %s" % (
2296 path, "Invalid file type for a charm"))
2297
2298 def _check_link(self, path):
2299 link_path = os.readlink(path)
2300 if link_path[0] == "/":
2301 raise ValueError(
2302 "Invalid Charm at %s: %s" % (
2303 path, "Absolute links are invalid"))
2304 path_dir = os.path.dirname(path)
2305 link_path = os.path.join(path_dir, link_path)
2306 if not link_path.startswith(os.path.abspath(self.path)):
2307 raise ValueError(
2308 "Invalid charm at %s %s" % (
2309 path, "Only internal symlinks are allowed"))
2310
2311 def _write_symlink(self, zf, link_target, link_path):
2312 """Package symlinks with appropriate zipfile metadata."""
2313 info = zipfile.ZipInfo()
2314 info.filename = link_path
2315 info.create_system = 3
2316 # Magic code for symlinks / py2/3 compat
2317 # 27166663808 = (stat.S_IFLNK | 0755) << 16
2318 info.external_attr = 2716663808
2319 zf.writestr(info, link_target)
2320
2321 def _ignore(self, path):
2322 if path == "build" or path.startswith("build/"):
2323 return True
2324 if path.startswith('.'):
2325 return True