Added observer + callback for unit.run.
[osm/N2VC.git] / juju / unit.py
1 import asyncio
2 import logging
3 from datetime import datetime
4
5 from . import model
6 from .client import client
7
8 log = logging.getLogger(__name__)
9
10
11 class Unit(model.ModelEntity):
12 @property
13 def agent_status(self):
14 """Returns the current agent status string.
15
16 """
17 return self.data['agent-status']['current']
18
19 @property
20 def agent_status_since(self):
21 """Get the time when the `agent_status` was last updated.
22
23 """
24 since = self.data['agent-status']['since']
25 # Juju gives us nanoseconds, but Python only supports microseconds
26 since = since[:26]
27 return datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%f")
28
29 @property
30 def agent_status_message(self):
31 """Get the agent status message.
32
33 """
34 return self.data['agent-status']['message']
35
36 @property
37 def workload_status(self):
38 """Returns the current workload status string.
39
40 """
41 return self.data['workload-status']['current']
42
43 @property
44 def workload_status_since(self):
45 """Get the time when the `workload_status` was last updated.
46
47 """
48 since = self.data['workload-status']['since']
49 # Juju gives us nanoseconds, but Python only supports microseconds
50 since = since[:26]
51 return datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%f")
52
53 @property
54 def workload_status_message(self):
55 """Get the workload status message.
56
57 """
58 return self.data['workload-status']['message']
59
60 def add_storage(self, name, constraints=None):
61 """Add unit storage dynamically.
62
63 :param str name: Storage name, as specified by the charm
64 :param str constraints: Comma-separated list of constraints in the
65 form 'POOL,COUNT,SIZE'
66
67 """
68 pass
69
70 def collect_metrics(self):
71 """Collect metrics on this unit.
72
73 """
74 pass
75
76 async def destroy(self):
77 """Destroy this unit.
78
79 """
80 app_facade = client.ApplicationFacade()
81 app_facade.connect(self.connection)
82
83 log.debug(
84 'Destroying %s', self.name)
85
86 return await app_facade.DestroyUnits([self.name])
87 remove = destroy
88
89 def get_resources(self, details=False):
90 """Return resources for this unit.
91
92 :param bool details: Include detailed info about resources used by each
93 unit
94
95 """
96 pass
97
98 def resolved(self, retry=False):
99 """Mark unit errors resolved.
100
101 :param bool retry: Re-execute failed hooks
102
103 """
104 pass
105
106 async def run(self, command, timeout=None):
107 """Run command on this unit.
108
109 :param str command: The command to run
110 :param int timeout: Time to wait before command is considered failed
111
112 Returns a tuple containing the stdout, stderr, and return code
113 from the command.
114
115 """
116 action = client.ActionFacade()
117 action.connect(self.connection)
118
119 log.debug(
120 'Running `%s` on %s', command, self.name)
121
122 action_status = asyncio.Queue(loop=self.model.loop)
123 tag = None
124
125 async def wait_for_tag():
126 while tag is None:
127 asyncio.sleep(0.1)
128 return tag
129
130 async def callback(delta, old, new, model):
131 # Wait until we have something to report
132 if not new:
133 return
134
135 # Verify that we have the the right action.
136 tag = await wait_for_tag()
137 if not new.id in tag:
138 return
139
140 # Wait until the action has completed, or errored out.
141 if new.status not in ['completed', 'error']:
142 return
143
144 # Put the action in our queue, so that we can fetch it
145 # with the await below.
146 await action_status.put(new)
147
148 self.model.add_observer(callback, 'action', None)
149
150 res = await action.Run(
151 [],
152 command,
153 [],
154 timeout,
155 [self.name],
156 )
157 tag = res.results[0].action.tag # Set the tag for our waiter above.
158 ret = await action_status.get() # Wait for our callback to fire
159 return (
160 ret.results['Stdout'],
161 ret.results['Stderr'],
162 ret.results['Code']
163 )
164
165 def run_action(self, action_name, **params):
166 """Run action on this unit.
167
168 :param str action_name: Name of action to run
169 :param \*\*params: Action parameters
170
171 """
172 pass
173
174 def scp(
175 self, source_path, user=None, destination_path=None, proxy=False,
176 scp_opts=None):
177 """Transfer files to this unit.
178
179 :param str source_path: Path of file(s) to transfer
180 :param str user: Remote username
181 :param str destination_path: Destination of transferred files on
182 remote machine
183 :param bool proxy: Proxy through the Juju API server
184 :param str scp_opts: Additional options to the `scp` command
185
186 """
187 pass
188
189 def set_meter_status(self):
190 """Set the meter status on this unit.
191
192 """
193 pass
194
195 def ssh(
196 self, command, user=None, proxy=False, ssh_opts=None):
197 """Execute a command over SSH on this unit.
198
199 :param str command: Command to execute
200 :param str user: Remote username
201 :param bool proxy: Proxy through the Juju API server
202 :param str ssh_opts: Additional options to the `ssh` command
203
204 """
205 pass
206
207 def status_history(self, num=20, utc=False):
208 """Get status history for this unit.
209
210 :param int num: Size of history backlog
211 :param bool utc: Display time as UTC in RFC3339 format
212
213 """
214 pass