Revert "Remove vendored libjuju"
[osm/N2VC.git] / modules / libjuju / juju / machine.py
1 import asyncio
2 import logging
3 import os
4
5 import pyrfc3339
6
7 from . import model, utils
8 from .client import client
9 from .errors import JujuError
10
11 log = logging.getLogger(__name__)
12
13
14 class Machine(model.ModelEntity):
15 def __init__(self, *args, **kwargs):
16 super().__init__(*args, **kwargs)
17 self.model.loop.create_task(self._queue_workarounds())
18
19 async def _queue_workarounds(self):
20 model = self.model
21 if not model.info:
22 await utils.run_with_interrupt(model.get_info(),
23 model._watch_stopping,
24 loop=model.loop)
25 if model._watch_stopping.is_set():
26 return
27 if model.info.agent_version < client.Number.from_json('2.2.3'):
28 self.on_change(self._workaround_1695335)
29
30 async def _workaround_1695335(self, delta, old, new, model):
31 """
32 This is a (hacky) temporary work around for a bug in Juju where the
33 instance status and agent version fields don't get updated properly
34 by the AllWatcher.
35
36 Deltas never contain a value for `data['agent-status']['version']`,
37 and once the `instance-status` reaches `pending`, we no longer get
38 any updates for it (the deltas come in, but the `instance-status`
39 data is always the same after that).
40
41 To work around this, whenever a delta comes in for this machine, we
42 query FullStatus and use the data from there if and only if it's newer.
43 Luckily, the timestamps on the `since` field does seem to be accurate.
44
45 See https://bugs.launchpad.net/juju/+bug/1695335
46 """
47 if delta.data.get('synthetic', False):
48 # prevent infinite loops re-processing already processed deltas
49 return
50
51 full_status = await utils.run_with_interrupt(model.get_status(),
52 model._watch_stopping,
53 loop=model.loop)
54 if model._watch_stopping.is_set():
55 return
56
57 if self.id not in full_status.machines:
58 return
59
60 if not full_status.machines[self.id]['instance-status']['since']:
61 return
62
63 machine = full_status.machines[self.id]
64
65 change_log = []
66 key_map = {
67 'status': 'current',
68 'info': 'message',
69 'since': 'since',
70 }
71
72 # handle agent version specially, because it's never set in
73 # deltas, and we don't want even a newer delta to clear it
74 agent_version = machine['agent-status']['version']
75 if agent_version:
76 delta.data['agent-status']['version'] = agent_version
77 change_log.append(('agent-version', '', agent_version))
78
79 # only update (other) delta fields if status data is newer
80 status_since = pyrfc3339.parse(machine['instance-status']['since'])
81 delta_since = pyrfc3339.parse(delta.data['instance-status']['since'])
82 if status_since > delta_since:
83 for status_key in ('status', 'info', 'since'):
84 delta_key = key_map[status_key]
85 status_value = machine['instance-status'][status_key]
86 delta_value = delta.data['instance-status'][delta_key]
87 change_log.append((delta_key, delta_value, status_value))
88 delta.data['instance-status'][delta_key] = status_value
89
90 if change_log:
91 log.debug('Overriding machine delta with FullStatus data')
92 for log_item in change_log:
93 log.debug(' {}: {} -> {}'.format(*log_item))
94 delta.data['synthetic'] = True
95 old_obj, new_obj = self.model.state.apply_delta(delta)
96 await model._notify_observers(delta, old_obj, new_obj)
97
98 async def destroy(self, force=False):
99 """Remove this machine from the model.
100
101 Blocks until the machine is actually removed.
102
103 """
104 facade = client.ClientFacade.from_connection(self.connection)
105
106 log.debug(
107 'Destroying machine %s', self.id)
108
109 await facade.DestroyMachines(force, [self.id])
110 return await self.model._wait(
111 'machine', self.id, 'remove')
112 remove = destroy
113
114 def run(self, command, timeout=None):
115 """Run command on this machine.
116
117 :param str command: The command to run
118 :param int timeout: Time to wait before command is considered failed
119
120 """
121 raise NotImplementedError()
122
123 async def set_annotations(self, annotations):
124 """Set annotations on this machine.
125
126 :param annotations map[string]string: the annotations as key/value
127 pairs.
128
129 """
130 log.debug('Updating annotations on machine %s', self.id)
131
132 self.ann_facade = client.AnnotationsFacade.from_connection(
133 self.connection)
134
135 ann = client.EntityAnnotations(
136 entity=self.id,
137 annotations=annotations,
138 )
139 return await self.ann_facade.Set([ann])
140
141 async def scp_to(self, source, destination, user='ubuntu', proxy=False,
142 scp_opts=''):
143 """Transfer files to this machine.
144
145 :param str source: Local path of file(s) to transfer
146 :param str destination: Remote destination of transferred files
147 :param str user: Remote username
148 :param bool proxy: Proxy through the Juju API server
149 :param scp_opts: Additional options to the `scp` command
150 :type scp_opts: str or list
151 """
152 if proxy:
153 raise NotImplementedError('proxy option is not implemented')
154
155 address = self.dns_name
156 destination = '%s@%s:%s' % (user, address, destination)
157 await self._scp(source, destination, scp_opts)
158
159 async def scp_from(self, source, destination, user='ubuntu', proxy=False,
160 scp_opts=''):
161 """Transfer files from this machine.
162
163 :param str source: Remote path of file(s) to transfer
164 :param str destination: Local destination of transferred files
165 :param str user: Remote username
166 :param bool proxy: Proxy through the Juju API server
167 :param scp_opts: Additional options to the `scp` command
168 :type scp_opts: str or list
169 """
170 if proxy:
171 raise NotImplementedError('proxy option is not implemented')
172
173 address = self.dns_name
174 source = '%s@%s:%s' % (user, address, source)
175 await self._scp(source, destination, scp_opts)
176
177 async def _scp(self, source, destination, scp_opts):
178 """ Execute an scp command. Requires a fully qualified source and
179 destination.
180 """
181 cmd = [
182 'scp',
183 '-i', os.path.expanduser('~/.local/share/juju/ssh/juju_id_rsa'),
184 '-o', 'StrictHostKeyChecking=no',
185 '-q',
186 '-B'
187 ]
188 cmd.extend(scp_opts.split() if isinstance(scp_opts, str) else scp_opts)
189 cmd.extend([source, destination])
190 loop = self.model.loop
191 process = await asyncio.create_subprocess_exec(*cmd, loop=loop)
192 await process.wait()
193 if process.returncode != 0:
194 raise JujuError("command failed: %s" % cmd)
195
196 def ssh(
197 self, command, user=None, proxy=False, ssh_opts=None):
198 """Execute a command over SSH on this machine.
199
200 :param str command: Command to execute
201 :param str user: Remote username
202 :param bool proxy: Proxy through the Juju API server
203 :param str ssh_opts: Additional options to the `ssh` command
204
205 """
206 raise NotImplementedError()
207
208 def status_history(self, num=20, utc=False):
209 """Get status history for this machine.
210
211 :param int num: Size of history backlog
212 :param bool utc: Display time as UTC in RFC3339 format
213
214 """
215 raise NotImplementedError()
216
217 @property
218 def agent_status(self):
219 """Returns the current Juju agent status string.
220
221 """
222 return self.safe_data['agent-status']['current']
223
224 @property
225 def agent_status_since(self):
226 """Get the time when the `agent_status` was last updated.
227
228 """
229 return pyrfc3339.parse(self.safe_data['agent-status']['since'])
230
231 @property
232 def agent_version(self):
233 """Get the version of the Juju machine agent.
234
235 May return None if the agent is not yet available.
236 """
237 version = self.safe_data['agent-status']['version']
238 if version:
239 return client.Number.from_json(version)
240 else:
241 return None
242
243 @property
244 def status(self):
245 """Returns the current machine provisioning status string.
246
247 """
248 return self.safe_data['instance-status']['current']
249
250 @property
251 def status_message(self):
252 """Returns the current machine provisioning status message.
253
254 """
255 return self.safe_data['instance-status']['message']
256
257 @property
258 def status_since(self):
259 """Get the time when the `status` was last updated.
260
261 """
262 return pyrfc3339.parse(self.safe_data['instance-status']['since'])
263
264 @property
265 def dns_name(self):
266 """Get the DNS name for this machine. This is a best guess based on the
267 addresses available in current data.
268
269 May return None if no suitable address is found.
270 """
271 for scope in ['public', 'local-cloud']:
272 addresses = self.safe_data['addresses'] or []
273 addresses = [address for address in addresses
274 if address['scope'] == scope]
275 if addresses:
276 return addresses[0]['value']
277 return None
278
279 @property
280 def series(self):
281 """Returns the series of the current machine
282
283 """
284 return self.safe_data['series']