7 from . import model
, utils
8 from .client
import client
9 from .errors
import JujuError
11 log
= logging
.getLogger(__name__
)
14 class Machine(model
.ModelEntity
):
15 def __init__(self
, *args
, **kwargs
):
16 super().__init
__(*args
, **kwargs
)
17 self
.model
.loop
.create_task(self
._queue
_workarounds
())
19 async def _queue_workarounds(self
):
22 await utils
.run_with_interrupt(model
.get_info(),
23 model
._watch
_stopping
,
25 if model
._watch
_stopping
.is_set():
27 if model
.info
.agent_version
< client
.Number
.from_json('2.2.3'):
28 self
.on_change(self
._workaround
_1695335)
30 async def _workaround_1695335(self
, delta
, old
, new
, model
):
32 This is a (hacky) temporary work around for a bug in Juju where the
33 instance status and agent version fields don't get updated properly
36 Deltas never contain a value for `data['agent-status']['version']`,
37 and once the `instance-status` reaches `pending`, we no longer get
38 any updates for it (the deltas come in, but the `instance-status`
39 data is always the same after that).
41 To work around this, whenever a delta comes in for this machine, we
42 query FullStatus and use the data from there if and only if it's newer.
43 Luckily, the timestamps on the `since` field does seem to be accurate.
45 See https://bugs.launchpad.net/juju/+bug/1695335
47 if delta
.data
.get('synthetic', False):
48 # prevent infinite loops re-processing already processed deltas
51 full_status
= await utils
.run_with_interrupt(model
.get_status(),
52 model
._watch
_stopping
,
54 if model
._watch
_stopping
.is_set():
57 if self
.id not in full_status
.machines
:
60 if not full_status
.machines
[self
.id]['instance-status']['since']:
63 machine
= full_status
.machines
[self
.id]
72 # handle agent version specially, because it's never set in
73 # deltas, and we don't want even a newer delta to clear it
74 agent_version
= machine
['agent-status']['version']
76 delta
.data
['agent-status']['version'] = agent_version
77 change_log
.append(('agent-version', '', agent_version
))
79 # only update (other) delta fields if status data is newer
80 status_since
= pyrfc3339
.parse(machine
['instance-status']['since'])
81 delta_since
= pyrfc3339
.parse(delta
.data
['instance-status']['since'])
82 if status_since
> delta_since
:
83 for status_key
in ('status', 'info', 'since'):
84 delta_key
= key_map
[status_key
]
85 status_value
= machine
['instance-status'][status_key
]
86 delta_value
= delta
.data
['instance-status'][delta_key
]
87 change_log
.append((delta_key
, delta_value
, status_value
))
88 delta
.data
['instance-status'][delta_key
] = status_value
91 log
.debug('Overriding machine delta with FullStatus data')
92 for log_item
in change_log
:
93 log
.debug(' {}: {} -> {}'.format(*log_item
))
94 delta
.data
['synthetic'] = True
95 old_obj
, new_obj
= self
.model
.state
.apply_delta(delta
)
96 await model
._notify
_observers
(delta
, old_obj
, new_obj
)
98 async def destroy(self
, force
=False):
99 """Remove this machine from the model.
101 Blocks until the machine is actually removed.
104 facade
= client
.ClientFacade
.from_connection(self
.connection
)
107 'Destroying machine %s', self
.id)
109 await facade
.DestroyMachines(force
, [self
.id])
110 return await self
.model
._wait
(
111 'machine', self
.id, 'remove')
114 def run(self
, command
, timeout
=None):
115 """Run command on this machine.
117 :param str command: The command to run
118 :param int timeout: Time to wait before command is considered failed
121 raise NotImplementedError()
123 async def set_annotations(self
, annotations
):
124 """Set annotations on this machine.
126 :param annotations map[string]string: the annotations as key/value
130 log
.debug('Updating annotations on machine %s', self
.id)
132 self
.ann_facade
= client
.AnnotationsFacade
.from_connection(
135 ann
= client
.EntityAnnotations(
137 annotations
=annotations
,
139 return await self
.ann_facade
.Set([ann
])
141 async def scp_to(self
, source
, destination
, user
='ubuntu', proxy
=False,
143 """Transfer files to this machine.
145 :param str source: Local path of file(s) to transfer
146 :param str destination: Remote destination of transferred files
147 :param str user: Remote username
148 :param bool proxy: Proxy through the Juju API server
149 :param scp_opts: Additional options to the `scp` command
150 :type scp_opts: str or list
153 raise NotImplementedError('proxy option is not implemented')
155 address
= self
.dns_name
156 destination
= '%s@%s:%s' % (user
, address
, destination
)
157 await self
._scp
(source
, destination
, scp_opts
)
159 async def scp_from(self
, source
, destination
, user
='ubuntu', proxy
=False,
161 """Transfer files from this machine.
163 :param str source: Remote path of file(s) to transfer
164 :param str destination: Local destination of transferred files
165 :param str user: Remote username
166 :param bool proxy: Proxy through the Juju API server
167 :param scp_opts: Additional options to the `scp` command
168 :type scp_opts: str or list
171 raise NotImplementedError('proxy option is not implemented')
173 address
= self
.dns_name
174 source
= '%s@%s:%s' % (user
, address
, source
)
175 await self
._scp
(source
, destination
, scp_opts
)
177 async def _scp(self
, source
, destination
, scp_opts
):
178 """ Execute an scp command. Requires a fully qualified source and
183 '-i', os
.path
.expanduser('~/.local/share/juju/ssh/juju_id_rsa'),
184 '-o', 'StrictHostKeyChecking=no',
188 cmd
.extend(scp_opts
.split() if isinstance(scp_opts
, str) else scp_opts
)
189 cmd
.extend([source
, destination
])
190 loop
= self
.model
.loop
191 process
= await asyncio
.create_subprocess_exec(*cmd
, loop
=loop
)
193 if process
.returncode
!= 0:
194 raise JujuError("command failed: %s" % cmd
)
197 self
, command
, user
=None, proxy
=False, ssh_opts
=None):
198 """Execute a command over SSH on this machine.
200 :param str command: Command to execute
201 :param str user: Remote username
202 :param bool proxy: Proxy through the Juju API server
203 :param str ssh_opts: Additional options to the `ssh` command
206 raise NotImplementedError()
208 def status_history(self
, num
=20, utc
=False):
209 """Get status history for this machine.
211 :param int num: Size of history backlog
212 :param bool utc: Display time as UTC in RFC3339 format
215 raise NotImplementedError()
218 def agent_status(self
):
219 """Returns the current Juju agent status string.
222 return self
.safe_data
['agent-status']['current']
225 def agent_status_since(self
):
226 """Get the time when the `agent_status` was last updated.
229 return pyrfc3339
.parse(self
.safe_data
['agent-status']['since'])
232 def agent_version(self
):
233 """Get the version of the Juju machine agent.
235 May return None if the agent is not yet available.
237 version
= self
.safe_data
['agent-status']['version']
239 return client
.Number
.from_json(version
)
245 """Returns the current machine provisioning status string.
248 return self
.safe_data
['instance-status']['current']
251 def status_message(self
):
252 """Returns the current machine provisioning status message.
255 return self
.safe_data
['instance-status']['message']
258 def status_since(self
):
259 """Get the time when the `status` was last updated.
262 return pyrfc3339
.parse(self
.safe_data
['instance-status']['since'])
266 """Get the DNS name for this machine. This is a best guess based on the
267 addresses available in current data.
269 May return None if no suitable address is found.
271 for scope
in ['public', 'local-cloud']:
272 addresses
= self
.safe_data
['addresses'] or []
273 addresses
= [address
for address
in addresses
274 if address
['scope'] == scope
]
276 return addresses
[0]['value']
281 """Returns the series of the current machine
284 return self
.safe_data
['series']