Feature 10947: Add methods to create certificates
[osm/N2VC.git] / n2vc / juju_watcher.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import time
17
18 from juju.client import client
19 from n2vc.exceptions import EntityInvalidException
20 from n2vc.n2vc_conn import N2VCConnector
21 from juju.model import ModelEntity, Model
22 from juju.client.overrides import Delta
23 from juju.status import derive_status
24 from juju.application import Application
25 from websockets.exceptions import ConnectionClosed
26 import logging
27
28 logger = logging.getLogger("__main__")
29
30
31 def status(application: Application) -> str:
32 unit_status = []
33 for unit in application.units:
34 unit_status.append(unit.workload_status)
35 return derive_status(unit_status)
36
37
38 def entity_ready(entity: ModelEntity) -> bool:
39 """
40 Check if the entity is ready
41
42 :param: entity: Model entity. It can be a machine, action, or application.
43
44 :returns: boolean saying if the entity is ready or not
45 """
46
47 entity_type = entity.entity_type
48 if entity_type == "machine":
49 return entity.agent_status in ["started"]
50 elif entity_type == "action":
51 return entity.status in ["completed", "failed", "cancelled"]
52 elif entity_type == "application":
53 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
54 return entity.status in ["active", "blocked"]
55 elif entity_type == "unit":
56 return entity.agent_status in ["idle"]
57 else:
58 raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
59
60
61 def application_ready(application: Application) -> bool:
62 """
63 Check if an application has a leader
64
65 :param: application: Application entity.
66
67 :returns: boolean saying if the application has a unit that is a leader.
68 """
69 ready_status_list = ["active", "blocked"]
70 application_ready = application.status in ready_status_list
71 units_ready = all(
72 unit.workload_status in ready_status_list for unit in application.units
73 )
74 return application_ready and units_ready
75
76
77 class JujuModelWatcher:
78 @staticmethod
79 async def wait_for_model(model: Model, timeout: float = 3600):
80 """
81 Wait for all entities in model to reach its final state.
82
83 :param: model: Model to observe
84 :param: timeout: Timeout for the model applications to be active
85
86 :raises: asyncio.TimeoutError when timeout reaches
87 """
88
89 if timeout is None:
90 timeout = 3600.0
91
92 # Coroutine to wait until the entity reaches the final state
93 async def wait_until_model_ready():
94 wait_for_entity = asyncio.ensure_future(
95 asyncio.wait_for(
96 model.block_until(
97 lambda: all(
98 application_ready(application)
99 for application in model.applications.values()
100 ),
101 ),
102 timeout=timeout,
103 )
104 )
105
106 tasks = [wait_for_entity]
107 try:
108 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
109 finally:
110 # Cancel tasks
111 for task in tasks:
112 task.cancel()
113
114 await wait_until_model_ready()
115 # Check model is still ready after 10 seconds
116
117 await asyncio.sleep(10)
118 await wait_until_model_ready()
119
120 @staticmethod
121 async def wait_for(
122 model: Model,
123 entity: ModelEntity,
124 progress_timeout: float = 3600,
125 total_timeout: float = 3600,
126 db_dict: dict = None,
127 n2vc: N2VCConnector = None,
128 vca_id: str = None,
129 ):
130 """
131 Wait for entity to reach its final state.
132
133 :param: model: Model to observe
134 :param: entity: Entity object
135 :param: progress_timeout: Maximum time between two updates in the model
136 :param: total_timeout: Timeout for the entity to be active
137 :param: db_dict: Dictionary with data of the DB to write the updates
138 :param: n2vc: N2VC Connector objector
139 :param: vca_id: VCA ID
140
141 :raises: asyncio.TimeoutError when timeout reaches
142 """
143
144 if progress_timeout is None:
145 progress_timeout = 3600.0
146 if total_timeout is None:
147 total_timeout = 3600.0
148
149 entity_type = entity.entity_type
150 if entity_type not in ["application", "action", "machine", "unit"]:
151 raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
152
153 # Coroutine to wait until the entity reaches the final state
154 wait_for_entity = asyncio.ensure_future(
155 asyncio.wait_for(
156 model.block_until(lambda: entity_ready(entity)),
157 timeout=total_timeout,
158 )
159 )
160
161 # Coroutine to watch the model for changes (and write them to DB)
162 watcher = asyncio.ensure_future(
163 JujuModelWatcher.model_watcher(
164 model,
165 entity_id=entity.entity_id,
166 entity_type=entity_type,
167 timeout=progress_timeout,
168 db_dict=db_dict,
169 n2vc=n2vc,
170 vca_id=vca_id,
171 )
172 )
173
174 tasks = [wait_for_entity, watcher]
175 try:
176 # Execute tasks, and stop when the first is finished
177 # The watcher task won't never finish (unless it timeouts)
178 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
179 finally:
180 # Cancel tasks
181 for task in tasks:
182 task.cancel()
183
184 @staticmethod
185 async def wait_for_units_idle(
186 model: Model, application: Application, timeout: float = 60
187 ):
188 """
189 Waits for the application and all its units to transition back to idle
190
191 :param: model: Model to observe
192 :param: application: The application to be observed
193 :param: timeout: Maximum time between two updates in the model
194
195 :raises: asyncio.TimeoutError when timeout reaches
196 """
197
198 ensure_units_idle = asyncio.ensure_future(
199 asyncio.wait_for(
200 JujuModelWatcher.ensure_units_idle(model, application), timeout
201 )
202 )
203 tasks = [
204 ensure_units_idle,
205 ]
206 (done, pending) = await asyncio.wait(
207 tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
208 )
209
210 if ensure_units_idle in pending:
211 ensure_units_idle.cancel()
212 raise TimeoutError(
213 "Application's units failed to return to idle after {} seconds".format(
214 timeout
215 )
216 )
217 if ensure_units_idle.result():
218 pass
219
220 @staticmethod
221 async def ensure_units_idle(model: Model, application: Application):
222 """
223 Waits forever until the application's units to transition back to idle
224
225 :param: model: Model to observe
226 :param: application: The application to be observed
227 """
228
229 try:
230 allwatcher = client.AllWatcherFacade.from_connection(model.connection())
231 unit_wanted_state = "executing"
232 final_state_reached = False
233
234 units = application.units
235 final_state_seen = {unit.entity_id: False for unit in units}
236 agent_state_seen = {unit.entity_id: False for unit in units}
237 workload_state = {unit.entity_id: False for unit in units}
238
239 try:
240 while not final_state_reached:
241 change = await allwatcher.Next()
242
243 # Keep checking to see if new units were added during the change
244 for unit in units:
245 if unit.entity_id not in final_state_seen:
246 final_state_seen[unit.entity_id] = False
247 agent_state_seen[unit.entity_id] = False
248 workload_state[unit.entity_id] = False
249
250 for delta in change.deltas:
251 await asyncio.sleep(0)
252 if delta.entity != units[0].entity_type:
253 continue
254
255 final_state_reached = True
256 for unit in units:
257 if delta.data["name"] == unit.entity_id:
258 status = delta.data["agent-status"]["current"]
259 workload_state[unit.entity_id] = delta.data[
260 "workload-status"
261 ]["current"]
262
263 if status == unit_wanted_state:
264 agent_state_seen[unit.entity_id] = True
265 final_state_seen[unit.entity_id] = False
266
267 if (
268 status == "idle"
269 and agent_state_seen[unit.entity_id]
270 ):
271 final_state_seen[unit.entity_id] = True
272
273 final_state_reached = (
274 final_state_reached
275 and final_state_seen[unit.entity_id]
276 and workload_state[unit.entity_id]
277 in [
278 "active",
279 "error",
280 ]
281 )
282
283 except ConnectionClosed:
284 pass
285 # This is expected to happen when the
286 # entity reaches its final state, because
287 # the model connection is closed afterwards
288 except Exception as e:
289 raise e
290
291 @staticmethod
292 async def model_watcher(
293 model: Model,
294 entity_id: str,
295 entity_type: str,
296 timeout: float,
297 db_dict: dict = None,
298 n2vc: N2VCConnector = None,
299 vca_id: str = None,
300 ):
301 """
302 Observes the changes related to an specific entity in a model
303
304 :param: model: Model to observe
305 :param: entity_id: ID of the entity to be observed
306 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
307 :param: timeout: Maximum time between two updates in the model
308 :param: db_dict: Dictionary with data of the DB to write the updates
309 :param: n2vc: N2VC Connector objector
310 :param: vca_id: VCA ID
311
312 :raises: asyncio.TimeoutError when timeout reaches
313 """
314
315 try:
316 allwatcher = client.AllWatcherFacade.from_connection(model.connection())
317
318 # Genenerate array with entity types to listen
319 entity_types = (
320 [entity_type, "unit"]
321 if entity_type == "application" # TODO: Add "action" too
322 else [entity_type]
323 )
324
325 # Get time when it should timeout
326 timeout_end = time.time() + timeout
327
328 try:
329 while True:
330 change = await allwatcher.Next()
331 for delta in change.deltas:
332 write = False
333 delta_entity = None
334
335 # Get delta EntityType
336 delta_entity = delta.entity
337
338 if delta_entity in entity_types:
339 # Get entity id
340 id = None
341 if entity_type == "application":
342 id = (
343 delta.data["application"]
344 if delta_entity == "unit"
345 else delta.data["name"]
346 )
347 else:
348 if "id" in delta.data:
349 id = delta.data["id"]
350 else:
351 print("No id {}".format(delta.data))
352
353 # Write if the entity id match
354 write = True if id == entity_id else False
355
356 # Update timeout
357 timeout_end = time.time() + timeout
358 (
359 status,
360 status_message,
361 vca_status,
362 ) = JujuModelWatcher.get_status(delta)
363
364 if write and n2vc is not None and db_dict:
365 # Write status to DB
366 status = n2vc.osm_status(delta_entity, status)
367 await n2vc.write_app_status_to_db(
368 db_dict=db_dict,
369 status=status,
370 detailed_status=status_message,
371 vca_status=vca_status,
372 entity_type=delta_entity,
373 vca_id=vca_id,
374 )
375 # Check if timeout
376 if time.time() > timeout_end:
377 raise asyncio.TimeoutError()
378 except ConnectionClosed:
379 pass
380 # This is expected to happen when the
381 # entity reaches its final state, because
382 # the model connection is closed afterwards
383 except Exception as e:
384 raise e
385
386 @staticmethod
387 def get_status(delta: Delta) -> (str, str, str):
388 """
389 Get status from delta
390
391 :param: delta: Delta generated by the allwatcher
392 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
393
394 :return (status, message, vca_status)
395 """
396 if delta.entity == "machine":
397 return (
398 delta.data["agent-status"]["current"],
399 delta.data["instance-status"]["message"],
400 delta.data["instance-status"]["current"],
401 )
402 elif delta.entity == "action":
403 return (
404 delta.data["status"],
405 delta.data["status"],
406 delta.data["status"],
407 )
408 elif delta.entity == "application":
409 return (
410 delta.data["status"]["current"],
411 delta.data["status"]["message"],
412 delta.data["status"]["current"],
413 )
414 elif delta.entity == "unit":
415 return (
416 delta.data["workload-status"]["current"],
417 delta.data["workload-status"]["message"],
418 delta.data["workload-status"]["current"],
419 )