| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 1 | import asyncio |
| 2 | import logging |
| 3 | import os |
| 4 | |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 5 | import pyrfc3339 |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 6 | |
| 7 | from . import model, utils |
| 8 | from .client import client |
| 9 | from .errors import JujuError |
| 10 | |
| 11 | log = logging.getLogger(__name__) |
| 12 | |
| 13 | |
| 14 | class Machine(model.ModelEntity): |
| 15 | def __init__(self, *args, **kwargs): |
| 16 | super().__init__(*args, **kwargs) |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 17 | self.model.loop.create_task(self._queue_workarounds()) |
| 18 | |
| 19 | async def _queue_workarounds(self): |
| 20 | model = self.model |
| 21 | if not model.info: |
| 22 | await utils.run_with_interrupt(model.get_info(), |
| 23 | model._watch_stopping, |
| 24 | loop=model.loop) |
| 25 | if model._watch_stopping.is_set(): |
| 26 | return |
| 27 | if model.info.agent_version < client.Number.from_json('2.2.3'): |
| 28 | self.on_change(self._workaround_1695335) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 29 | |
| 30 | async def _workaround_1695335(self, delta, old, new, model): |
| 31 | """ |
| 32 | This is a (hacky) temporary work around for a bug in Juju where the |
| 33 | instance status and agent version fields don't get updated properly |
| 34 | by the AllWatcher. |
| 35 | |
| 36 | Deltas never contain a value for `data['agent-status']['version']`, |
| 37 | and once the `instance-status` reaches `pending`, we no longer get |
| 38 | any updates for it (the deltas come in, but the `instance-status` |
| 39 | data is always the same after that). |
| 40 | |
| 41 | To work around this, whenever a delta comes in for this machine, we |
| 42 | query FullStatus and use the data from there if and only if it's newer. |
| 43 | Luckily, the timestamps on the `since` field does seem to be accurate. |
| 44 | |
| 45 | See https://bugs.launchpad.net/juju/+bug/1695335 |
| 46 | """ |
| 47 | if delta.data.get('synthetic', False): |
| 48 | # prevent infinite loops re-processing already processed deltas |
| 49 | return |
| 50 | |
| 51 | full_status = await utils.run_with_interrupt(model.get_status(), |
| 52 | model._watch_stopping, |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 53 | loop=model.loop) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 54 | if model._watch_stopping.is_set(): |
| 55 | return |
| 56 | |
| 57 | if self.id not in full_status.machines: |
| 58 | return |
| 59 | |
| 60 | if not full_status.machines[self.id]['instance-status']['since']: |
| 61 | return |
| 62 | |
| 63 | machine = full_status.machines[self.id] |
| 64 | |
| 65 | change_log = [] |
| 66 | key_map = { |
| 67 | 'status': 'current', |
| 68 | 'info': 'message', |
| 69 | 'since': 'since', |
| 70 | } |
| 71 | |
| 72 | # handle agent version specially, because it's never set in |
| 73 | # deltas, and we don't want even a newer delta to clear it |
| 74 | agent_version = machine['agent-status']['version'] |
| 75 | if agent_version: |
| 76 | delta.data['agent-status']['version'] = agent_version |
| 77 | change_log.append(('agent-version', '', agent_version)) |
| 78 | |
| 79 | # only update (other) delta fields if status data is newer |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 80 | status_since = pyrfc3339.parse(machine['instance-status']['since']) |
| 81 | delta_since = pyrfc3339.parse(delta.data['instance-status']['since']) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 82 | if status_since > delta_since: |
| 83 | for status_key in ('status', 'info', 'since'): |
| 84 | delta_key = key_map[status_key] |
| 85 | status_value = machine['instance-status'][status_key] |
| 86 | delta_value = delta.data['instance-status'][delta_key] |
| 87 | change_log.append((delta_key, delta_value, status_value)) |
| 88 | delta.data['instance-status'][delta_key] = status_value |
| 89 | |
| 90 | if change_log: |
| 91 | log.debug('Overriding machine delta with FullStatus data') |
| 92 | for log_item in change_log: |
| 93 | log.debug(' {}: {} -> {}'.format(*log_item)) |
| 94 | delta.data['synthetic'] = True |
| 95 | old_obj, new_obj = self.model.state.apply_delta(delta) |
| 96 | await model._notify_observers(delta, old_obj, new_obj) |
| 97 | |
| 98 | async def destroy(self, force=False): |
| 99 | """Remove this machine from the model. |
| 100 | |
| 101 | Blocks until the machine is actually removed. |
| 102 | |
| 103 | """ |
| 104 | facade = client.ClientFacade.from_connection(self.connection) |
| 105 | |
| 106 | log.debug( |
| 107 | 'Destroying machine %s', self.id) |
| 108 | |
| 109 | await facade.DestroyMachines(force, [self.id]) |
| 110 | return await self.model._wait( |
| 111 | 'machine', self.id, 'remove') |
| 112 | remove = destroy |
| 113 | |
| 114 | def run(self, command, timeout=None): |
| 115 | """Run command on this machine. |
| 116 | |
| 117 | :param str command: The command to run |
| 118 | :param int timeout: Time to wait before command is considered failed |
| 119 | |
| 120 | """ |
| 121 | raise NotImplementedError() |
| 122 | |
| 123 | async def set_annotations(self, annotations): |
| 124 | """Set annotations on this machine. |
| 125 | |
| 126 | :param annotations map[string]string: the annotations as key/value |
| 127 | pairs. |
| 128 | |
| 129 | """ |
| 130 | log.debug('Updating annotations on machine %s', self.id) |
| 131 | |
| 132 | self.ann_facade = client.AnnotationsFacade.from_connection( |
| 133 | self.connection) |
| 134 | |
| 135 | ann = client.EntityAnnotations( |
| 136 | entity=self.id, |
| 137 | annotations=annotations, |
| 138 | ) |
| 139 | return await self.ann_facade.Set([ann]) |
| 140 | |
| 141 | async def scp_to(self, source, destination, user='ubuntu', proxy=False, |
| 142 | scp_opts=''): |
| 143 | """Transfer files to this machine. |
| 144 | |
| 145 | :param str source: Local path of file(s) to transfer |
| 146 | :param str destination: Remote destination of transferred files |
| 147 | :param str user: Remote username |
| 148 | :param bool proxy: Proxy through the Juju API server |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 149 | :param scp_opts: Additional options to the `scp` command |
| 150 | :type scp_opts: str or list |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 151 | """ |
| 152 | if proxy: |
| 153 | raise NotImplementedError('proxy option is not implemented') |
| 154 | |
| 155 | address = self.dns_name |
| 156 | destination = '%s@%s:%s' % (user, address, destination) |
| 157 | await self._scp(source, destination, scp_opts) |
| 158 | |
| 159 | async def scp_from(self, source, destination, user='ubuntu', proxy=False, |
| 160 | scp_opts=''): |
| 161 | """Transfer files from this machine. |
| 162 | |
| 163 | :param str source: Remote path of file(s) to transfer |
| 164 | :param str destination: Local destination of transferred files |
| 165 | :param str user: Remote username |
| 166 | :param bool proxy: Proxy through the Juju API server |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 167 | :param scp_opts: Additional options to the `scp` command |
| 168 | :type scp_opts: str or list |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 169 | """ |
| 170 | if proxy: |
| 171 | raise NotImplementedError('proxy option is not implemented') |
| 172 | |
| 173 | address = self.dns_name |
| 174 | source = '%s@%s:%s' % (user, address, source) |
| 175 | await self._scp(source, destination, scp_opts) |
| 176 | |
| 177 | async def _scp(self, source, destination, scp_opts): |
| 178 | """ Execute an scp command. Requires a fully qualified source and |
| 179 | destination. |
| 180 | """ |
| 181 | cmd = [ |
| 182 | 'scp', |
| 183 | '-i', os.path.expanduser('~/.local/share/juju/ssh/juju_id_rsa'), |
| 184 | '-o', 'StrictHostKeyChecking=no', |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 185 | '-q', |
| 186 | '-B' |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 187 | ] |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 188 | cmd.extend(scp_opts.split() if isinstance(scp_opts, str) else scp_opts) |
| 189 | cmd.extend([source, destination]) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 190 | loop = self.model.loop |
| 191 | process = await asyncio.create_subprocess_exec(*cmd, loop=loop) |
| 192 | await process.wait() |
| 193 | if process.returncode != 0: |
| 194 | raise JujuError("command failed: %s" % cmd) |
| 195 | |
| 196 | def ssh( |
| 197 | self, command, user=None, proxy=False, ssh_opts=None): |
| 198 | """Execute a command over SSH on this machine. |
| 199 | |
| 200 | :param str command: Command to execute |
| 201 | :param str user: Remote username |
| 202 | :param bool proxy: Proxy through the Juju API server |
| 203 | :param str ssh_opts: Additional options to the `ssh` command |
| 204 | |
| 205 | """ |
| 206 | raise NotImplementedError() |
| 207 | |
| 208 | def status_history(self, num=20, utc=False): |
| 209 | """Get status history for this machine. |
| 210 | |
| 211 | :param int num: Size of history backlog |
| 212 | :param bool utc: Display time as UTC in RFC3339 format |
| 213 | |
| 214 | """ |
| 215 | raise NotImplementedError() |
| 216 | |
| 217 | @property |
| 218 | def agent_status(self): |
| 219 | """Returns the current Juju agent status string. |
| 220 | |
| 221 | """ |
| 222 | return self.safe_data['agent-status']['current'] |
| 223 | |
| 224 | @property |
| 225 | def agent_status_since(self): |
| 226 | """Get the time when the `agent_status` was last updated. |
| 227 | |
| 228 | """ |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 229 | return pyrfc3339.parse(self.safe_data['agent-status']['since']) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 230 | |
| 231 | @property |
| 232 | def agent_version(self): |
| 233 | """Get the version of the Juju machine agent. |
| 234 | |
| 235 | May return None if the agent is not yet available. |
| 236 | """ |
| 237 | version = self.safe_data['agent-status']['version'] |
| 238 | if version: |
| 239 | return client.Number.from_json(version) |
| 240 | else: |
| 241 | return None |
| 242 | |
| 243 | @property |
| 244 | def status(self): |
| 245 | """Returns the current machine provisioning status string. |
| 246 | |
| 247 | """ |
| 248 | return self.safe_data['instance-status']['current'] |
| 249 | |
| 250 | @property |
| 251 | def status_message(self): |
| 252 | """Returns the current machine provisioning status message. |
| 253 | |
| 254 | """ |
| 255 | return self.safe_data['instance-status']['message'] |
| 256 | |
| 257 | @property |
| 258 | def status_since(self): |
| 259 | """Get the time when the `status` was last updated. |
| 260 | |
| 261 | """ |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 262 | return pyrfc3339.parse(self.safe_data['instance-status']['since']) |
| Adam Israel | dcdf82b | 2017-08-15 15:26:43 -0400 | [diff] [blame] | 263 | |
| 264 | @property |
| 265 | def dns_name(self): |
| 266 | """Get the DNS name for this machine. This is a best guess based on the |
| 267 | addresses available in current data. |
| 268 | |
| 269 | May return None if no suitable address is found. |
| 270 | """ |
| 271 | for scope in ['public', 'local-cloud']: |
| 272 | addresses = self.safe_data['addresses'] or [] |
| 273 | addresses = [address for address in addresses |
| 274 | if address['scope'] == scope] |
| 275 | if addresses: |
| 276 | return addresses[0]['value'] |
| 277 | return None |
| Adam Israel | b8a8281 | 2019-03-27 14:50:11 -0400 | [diff] [blame] | 278 | |
| 279 | @property |
| 280 | def series(self): |
| 281 | """Returns the series of the current machine |
| 282 | |
| 283 | """ |
| 284 | return self.safe_data['series'] |