X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=modules%2Flibjuju%2Fjuju%2Fmachine.py;fp=modules%2Flibjuju%2Fjuju%2Fmachine.py;h=0000000000000000000000000000000000000000;hp=39bfa119195b864e32500f0a8fed49f2d0509f48;hb=5aa242fe2d61c2da9fdb29ed65029f3e27b6966d;hpb=e2051cca7dac12aa09f6ed33555dcc4548c4b52b diff --git a/modules/libjuju/juju/machine.py b/modules/libjuju/juju/machine.py deleted file mode 100644 index 39bfa11..0000000 --- a/modules/libjuju/juju/machine.py +++ /dev/null @@ -1,284 +0,0 @@ -import asyncio -import logging -import os - -import pyrfc3339 - -from . import model, utils -from .client import client -from .errors import JujuError - -log = logging.getLogger(__name__) - - -class Machine(model.ModelEntity): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.model.loop.create_task(self._queue_workarounds()) - - async def _queue_workarounds(self): - model = self.model - if not model.info: - await utils.run_with_interrupt(model.get_info(), - model._watch_stopping, - loop=model.loop) - if model._watch_stopping.is_set(): - return - if model.info.agent_version < client.Number.from_json('2.2.3'): - self.on_change(self._workaround_1695335) - - async def _workaround_1695335(self, delta, old, new, model): - """ - This is a (hacky) temporary work around for a bug in Juju where the - instance status and agent version fields don't get updated properly - by the AllWatcher. - - Deltas never contain a value for `data['agent-status']['version']`, - and once the `instance-status` reaches `pending`, we no longer get - any updates for it (the deltas come in, but the `instance-status` - data is always the same after that). - - To work around this, whenever a delta comes in for this machine, we - query FullStatus and use the data from there if and only if it's newer. - Luckily, the timestamps on the `since` field does seem to be accurate. - - See https://bugs.launchpad.net/juju/+bug/1695335 - """ - if delta.data.get('synthetic', False): - # prevent infinite loops re-processing already processed deltas - return - - full_status = await utils.run_with_interrupt(model.get_status(), - model._watch_stopping, - loop=model.loop) - if model._watch_stopping.is_set(): - return - - if self.id not in full_status.machines: - return - - if not full_status.machines[self.id]['instance-status']['since']: - return - - machine = full_status.machines[self.id] - - change_log = [] - key_map = { - 'status': 'current', - 'info': 'message', - 'since': 'since', - } - - # handle agent version specially, because it's never set in - # deltas, and we don't want even a newer delta to clear it - agent_version = machine['agent-status']['version'] - if agent_version: - delta.data['agent-status']['version'] = agent_version - change_log.append(('agent-version', '', agent_version)) - - # only update (other) delta fields if status data is newer - status_since = pyrfc3339.parse(machine['instance-status']['since']) - delta_since = pyrfc3339.parse(delta.data['instance-status']['since']) - if status_since > delta_since: - for status_key in ('status', 'info', 'since'): - delta_key = key_map[status_key] - status_value = machine['instance-status'][status_key] - delta_value = delta.data['instance-status'][delta_key] - change_log.append((delta_key, delta_value, status_value)) - delta.data['instance-status'][delta_key] = status_value - - if change_log: - log.debug('Overriding machine delta with FullStatus data') - for log_item in change_log: - log.debug(' {}: {} -> {}'.format(*log_item)) - delta.data['synthetic'] = True - old_obj, new_obj = self.model.state.apply_delta(delta) - await model._notify_observers(delta, old_obj, new_obj) - - async def destroy(self, force=False): - """Remove this machine from the model. - - Blocks until the machine is actually removed. - - """ - facade = client.ClientFacade.from_connection(self.connection) - - log.debug( - 'Destroying machine %s', self.id) - - await facade.DestroyMachines(force, [self.id]) - return await self.model._wait( - 'machine', self.id, 'remove') - remove = destroy - - def run(self, command, timeout=None): - """Run command on this machine. - - :param str command: The command to run - :param int timeout: Time to wait before command is considered failed - - """ - raise NotImplementedError() - - async def set_annotations(self, annotations): - """Set annotations on this machine. - - :param annotations map[string]string: the annotations as key/value - pairs. - - """ - log.debug('Updating annotations on machine %s', self.id) - - self.ann_facade = client.AnnotationsFacade.from_connection( - self.connection) - - ann = client.EntityAnnotations( - entity=self.id, - annotations=annotations, - ) - return await self.ann_facade.Set([ann]) - - async def scp_to(self, source, destination, user='ubuntu', proxy=False, - scp_opts=''): - """Transfer files to this machine. - - :param str source: Local path of file(s) to transfer - :param str destination: Remote destination of transferred files - :param str user: Remote username - :param bool proxy: Proxy through the Juju API server - :param scp_opts: Additional options to the `scp` command - :type scp_opts: str or list - """ - if proxy: - raise NotImplementedError('proxy option is not implemented') - - address = self.dns_name - destination = '%s@%s:%s' % (user, address, destination) - await self._scp(source, destination, scp_opts) - - async def scp_from(self, source, destination, user='ubuntu', proxy=False, - scp_opts=''): - """Transfer files from this machine. - - :param str source: Remote path of file(s) to transfer - :param str destination: Local destination of transferred files - :param str user: Remote username - :param bool proxy: Proxy through the Juju API server - :param scp_opts: Additional options to the `scp` command - :type scp_opts: str or list - """ - if proxy: - raise NotImplementedError('proxy option is not implemented') - - address = self.dns_name - source = '%s@%s:%s' % (user, address, source) - await self._scp(source, destination, scp_opts) - - async def _scp(self, source, destination, scp_opts): - """ Execute an scp command. Requires a fully qualified source and - destination. - """ - cmd = [ - 'scp', - '-i', os.path.expanduser('~/.local/share/juju/ssh/juju_id_rsa'), - '-o', 'StrictHostKeyChecking=no', - '-q', - '-B' - ] - cmd.extend(scp_opts.split() if isinstance(scp_opts, str) else scp_opts) - cmd.extend([source, destination]) - loop = self.model.loop - process = await asyncio.create_subprocess_exec(*cmd, loop=loop) - await process.wait() - if process.returncode != 0: - raise JujuError("command failed: %s" % cmd) - - def ssh( - self, command, user=None, proxy=False, ssh_opts=None): - """Execute a command over SSH on this machine. - - :param str command: Command to execute - :param str user: Remote username - :param bool proxy: Proxy through the Juju API server - :param str ssh_opts: Additional options to the `ssh` command - - """ - raise NotImplementedError() - - def status_history(self, num=20, utc=False): - """Get status history for this machine. - - :param int num: Size of history backlog - :param bool utc: Display time as UTC in RFC3339 format - - """ - raise NotImplementedError() - - @property - def agent_status(self): - """Returns the current Juju agent status string. - - """ - return self.safe_data['agent-status']['current'] - - @property - def agent_status_since(self): - """Get the time when the `agent_status` was last updated. - - """ - return pyrfc3339.parse(self.safe_data['agent-status']['since']) - - @property - def agent_version(self): - """Get the version of the Juju machine agent. - - May return None if the agent is not yet available. - """ - version = self.safe_data['agent-status']['version'] - if version: - return client.Number.from_json(version) - else: - return None - - @property - def status(self): - """Returns the current machine provisioning status string. - - """ - return self.safe_data['instance-status']['current'] - - @property - def status_message(self): - """Returns the current machine provisioning status message. - - """ - return self.safe_data['instance-status']['message'] - - @property - def status_since(self): - """Get the time when the `status` was last updated. - - """ - return pyrfc3339.parse(self.safe_data['instance-status']['since']) - - @property - def dns_name(self): - """Get the DNS name for this machine. This is a best guess based on the - addresses available in current data. - - May return None if no suitable address is found. - """ - for scope in ['public', 'local-cloud']: - addresses = self.safe_data['addresses'] or [] - addresses = [address for address in addresses - if address['scope'] == scope] - if addresses: - return addresses[0]['value'] - return None - - @property - def series(self): - """Returns the series of the current machine - - """ - return self.safe_data['series']