blob: a46135cdf2f5df403e3a607f40b0da0c72759084 [file] [log] [blame]
import asyncio
import logging
import os
import pyrfc3339
from . import model, utils
from .client import client
from .errors import JujuError
log = logging.getLogger(__name__)
class Machine(model.ModelEntity):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.model.loop.create_task(self._queue_workarounds())
async def _queue_workarounds(self):
model = self.model
if not model.info:
await utils.run_with_interrupt(model.get_info(),
model._watch_stopping,
model.loop)
if model._watch_stopping.is_set():
return
if model.info.agent_version < client.Number.from_json('2.2.3'):
self.on_change(self._workaround_1695335)
async def _workaround_1695335(self, delta, old, new, model):
"""
This is a (hacky) temporary work around for a bug in Juju where the
instance status and agent version fields don't get updated properly
by the AllWatcher.
Deltas never contain a value for `data['agent-status']['version']`,
and once the `instance-status` reaches `pending`, we no longer get
any updates for it (the deltas come in, but the `instance-status`
data is always the same after that).
To work around this, whenever a delta comes in for this machine, we
query FullStatus and use the data from there if and only if it's newer.
Luckily, the timestamps on the `since` field does seem to be accurate.
See https://bugs.launchpad.net/juju/+bug/1695335
"""
if delta.data.get('synthetic', False):
# prevent infinite loops re-processing already processed deltas
return
full_status = await utils.run_with_interrupt(model.get_status(),
model._watch_stopping,
model.loop)
if model._watch_stopping.is_set():
return
if self.id not in full_status.machines:
return
if not full_status.machines[self.id]['instance-status']['since']:
return
machine = full_status.machines[self.id]
change_log = []
key_map = {
'status': 'current',
'info': 'message',
'since': 'since',
}
# handle agent version specially, because it's never set in
# deltas, and we don't want even a newer delta to clear it
agent_version = machine['agent-status']['version']
if agent_version:
delta.data['agent-status']['version'] = agent_version
change_log.append(('agent-version', '', agent_version))
# only update (other) delta fields if status data is newer
status_since = pyrfc3339.parse(machine['instance-status']['since'])
delta_since = pyrfc3339.parse(delta.data['instance-status']['since'])
if status_since > delta_since:
for status_key in ('status', 'info', 'since'):
delta_key = key_map[status_key]
status_value = machine['instance-status'][status_key]
delta_value = delta.data['instance-status'][delta_key]
change_log.append((delta_key, delta_value, status_value))
delta.data['instance-status'][delta_key] = status_value
if change_log:
log.debug('Overriding machine delta with FullStatus data')
for log_item in change_log:
log.debug(' {}: {} -> {}'.format(*log_item))
delta.data['synthetic'] = True
old_obj, new_obj = self.model.state.apply_delta(delta)
await model._notify_observers(delta, old_obj, new_obj)
async def destroy(self, force=False):
"""Remove this machine from the model.
Blocks until the machine is actually removed.
"""
facade = client.ClientFacade.from_connection(self.connection)
log.debug(
'Destroying machine %s', self.id)
await facade.DestroyMachines(force, [self.id])
return await self.model._wait(
'machine', self.id, 'remove')
remove = destroy
def run(self, command, timeout=None):
"""Run command on this machine.
:param str command: The command to run
:param int timeout: Time to wait before command is considered failed
"""
raise NotImplementedError()
async def set_annotations(self, annotations):
"""Set annotations on this machine.
:param annotations map[string]string: the annotations as key/value
pairs.
"""
log.debug('Updating annotations on machine %s', self.id)
self.ann_facade = client.AnnotationsFacade.from_connection(
self.connection)
ann = client.EntityAnnotations(
entity=self.id,
annotations=annotations,
)
return await self.ann_facade.Set([ann])
async def scp_to(self, source, destination, user='ubuntu', proxy=False,
scp_opts=''):
"""Transfer files to this machine.
:param str source: Local path of file(s) to transfer
:param str destination: Remote destination of transferred files
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param str scp_opts: Additional options to the `scp` command
"""
if proxy:
raise NotImplementedError('proxy option is not implemented')
address = self.dns_name
destination = '%s@%s:%s' % (user, address, destination)
await self._scp(source, destination, scp_opts)
async def scp_from(self, source, destination, user='ubuntu', proxy=False,
scp_opts=''):
"""Transfer files from this machine.
:param str source: Remote path of file(s) to transfer
:param str destination: Local destination of transferred files
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param str scp_opts: Additional options to the `scp` command
"""
if proxy:
raise NotImplementedError('proxy option is not implemented')
address = self.dns_name
source = '%s@%s:%s' % (user, address, source)
await self._scp(source, destination, scp_opts)
async def _scp(self, source, destination, scp_opts):
""" Execute an scp command. Requires a fully qualified source and
destination.
"""
cmd = [
'scp',
'-i', os.path.expanduser('~/.local/share/juju/ssh/juju_id_rsa'),
'-o', 'StrictHostKeyChecking=no',
'-q',
'-B',
source, destination
]
cmd += scp_opts.split()
loop = self.model.loop
process = await asyncio.create_subprocess_exec(*cmd, loop=loop)
await process.wait()
if process.returncode != 0:
raise JujuError("command failed: %s" % cmd)
def ssh(
self, command, user=None, proxy=False, ssh_opts=None):
"""Execute a command over SSH on this machine.
:param str command: Command to execute
:param str user: Remote username
:param bool proxy: Proxy through the Juju API server
:param str ssh_opts: Additional options to the `ssh` command
"""
raise NotImplementedError()
def status_history(self, num=20, utc=False):
"""Get status history for this machine.
:param int num: Size of history backlog
:param bool utc: Display time as UTC in RFC3339 format
"""
raise NotImplementedError()
@property
def agent_status(self):
"""Returns the current Juju agent status string.
"""
return self.safe_data['agent-status']['current']
@property
def agent_status_since(self):
"""Get the time when the `agent_status` was last updated.
"""
return pyrfc3339.parse(self.safe_data['agent-status']['since'])
@property
def agent_version(self):
"""Get the version of the Juju machine agent.
May return None if the agent is not yet available.
"""
version = self.safe_data['agent-status']['version']
if version:
return client.Number.from_json(version)
else:
return None
@property
def status(self):
"""Returns the current machine provisioning status string.
"""
return self.safe_data['instance-status']['current']
@property
def status_message(self):
"""Returns the current machine provisioning status message.
"""
return self.safe_data['instance-status']['message']
@property
def status_since(self):
"""Get the time when the `status` was last updated.
"""
return pyrfc3339.parse(self.safe_data['instance-status']['since'])
@property
def dns_name(self):
"""Get the DNS name for this machine. This is a best guess based on the
addresses available in current data.
May return None if no suitable address is found.
"""
for scope in ['public', 'local-cloud']:
addresses = self.safe_data['addresses'] or []
addresses = [address for address in addresses
if address['scope'] == scope]
if addresses:
return addresses[0]['value']
return None
@property
def series(self):
"""Returns the series of the current machine
"""
return self.safe_data['series']