Code Coverage

Cobertura Coverage Report > n2vc >

juju_watcher.py

Trend

File Coverage summary

NameClassesLinesConditionals
juju_watcher.py
100%
1/1
93%
141/152
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
juju_watcher.py
93%
141/152
N/A

Source

n2vc/juju_watcher.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import asyncio
16 1 import time
17
18 1 from juju.client import client
19 1 from n2vc.exceptions import EntityInvalidException
20 1 from n2vc.n2vc_conn import N2VCConnector
21 1 from juju.model import ModelEntity, Model
22 1 from juju.client.overrides import Delta
23 1 from juju.status import derive_status
24 1 from juju.application import Application
25 1 from websockets.exceptions import ConnectionClosed
26 1 import logging
27
28 1 logger = logging.getLogger("__main__")
29
30
31 1 def status(application: Application) -> str:
32 1     unit_status = []
33 1     for unit in application.units:
34 1         unit_status.append(unit.workload_status)
35 1     return derive_status(unit_status)
36
37
38 1 def entity_ready(entity: ModelEntity) -> bool:
39     """
40     Check if the entity is ready
41
42     :param: entity: Model entity. It can be a machine, action, or application.
43
44     :returns: boolean saying if the entity is ready or not
45     """
46
47 1     entity_type = entity.entity_type
48 1     if entity_type == "machine":
49 1         return entity.agent_status in ["started"]
50 1     elif entity_type == "action":
51 1         return entity.status in ["completed", "failed", "cancelled"]
52 1     elif entity_type == "application":
53         # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
54 1         return entity.status in ["active", "blocked"]
55 1     elif entity_type == "unit":
56 0         return entity.agent_status in ["idle"]
57     else:
58 1         raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
59
60
61 1 def application_ready(application: Application) -> bool:
62     """
63     Check if an application has a leader
64
65     :param: application: Application entity.
66
67     :returns: boolean saying if the application has a unit that is a leader.
68     """
69 0     ready_status_list = ["active", "blocked"]
70 0     application_ready = application.status in ready_status_list
71 0     units_ready = all(
72         unit.workload_status in ready_status_list for unit in application.units
73     )
74 0     return application_ready and units_ready
75
76
77 1 class JujuModelWatcher:
78 1     @staticmethod
79 1     async def wait_for_model(model: Model, timeout: float = 3600):
80         """
81         Wait for all entities in model to reach its final state.
82
83         :param: model:              Model to observe
84         :param: timeout:            Timeout for the model applications to be active
85
86         :raises: asyncio.TimeoutError when timeout reaches
87         """
88
89 1         if timeout is None:
90 1             timeout = 3600.0
91
92         # Coroutine to wait until the entity reaches the final state
93 1         async def wait_until_model_ready():
94 1             wait_for_entity = asyncio.ensure_future(
95                 asyncio.wait_for(
96                     model.block_until(
97                         lambda: all(
98                             application_ready(application)
99                             for application in model.applications.values()
100                         ),
101                     ),
102                     timeout=timeout,
103                 )
104             )
105
106 1             tasks = [wait_for_entity]
107 1             try:
108 1                 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
109             finally:
110                 # Cancel tasks
111 1                 for task in tasks:
112 1                     task.cancel()
113
114 1         await wait_until_model_ready()
115         # Check model is still ready after 10 seconds
116
117 1         await asyncio.sleep(10)
118 1         await wait_until_model_ready()
119
120 1     @staticmethod
121 1     async def wait_for(
122         model: Model,
123         entity: ModelEntity,
124         progress_timeout: float = 3600,
125         total_timeout: float = 3600,
126         db_dict: dict = None,
127         n2vc: N2VCConnector = None,
128         vca_id: str = None,
129     ):
130         """
131         Wait for entity to reach its final state.
132
133         :param: model:              Model to observe
134         :param: entity:             Entity object
135         :param: progress_timeout:   Maximum time between two updates in the model
136         :param: total_timeout:      Timeout for the entity to be active
137         :param: db_dict:            Dictionary with data of the DB to write the updates
138         :param: n2vc:               N2VC Connector objector
139         :param: vca_id:             VCA ID
140
141         :raises: asyncio.TimeoutError when timeout reaches
142         """
143
144 1         if progress_timeout is None:
145 1             progress_timeout = 3600.0
146 1         if total_timeout is None:
147 1             total_timeout = 3600.0
148
149 1         entity_type = entity.entity_type
150 1         if entity_type not in ["application", "action", "machine", "unit"]:
151 1             raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
152
153         # Coroutine to wait until the entity reaches the final state
154 1         wait_for_entity = asyncio.ensure_future(
155             asyncio.wait_for(
156                 model.block_until(lambda: entity_ready(entity)),
157                 timeout=total_timeout,
158             )
159         )
160
161         # Coroutine to watch the model for changes (and write them to DB)
162 1         watcher = asyncio.ensure_future(
163             JujuModelWatcher.model_watcher(
164                 model,
165                 entity_id=entity.entity_id,
166                 entity_type=entity_type,
167                 timeout=progress_timeout,
168                 db_dict=db_dict,
169                 n2vc=n2vc,
170                 vca_id=vca_id,
171             )
172         )
173
174 1         tasks = [wait_for_entity, watcher]
175 1         try:
176             # Execute tasks, and stop when the first is finished
177             # The watcher task won't never finish (unless it timeouts)
178 1             await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
179         finally:
180             # Cancel tasks
181 1             for task in tasks:
182 1                 task.cancel()
183
184 1     @staticmethod
185 1     async def wait_for_units_idle(
186         model: Model, application: Application, timeout: float = 60
187     ):
188         """
189         Waits for the application and all its units to transition back to idle
190
191         :param: model:          Model to observe
192         :param: application:    The application to be observed
193         :param: timeout:        Maximum time between two updates in the model
194
195         :raises: asyncio.TimeoutError when timeout reaches
196         """
197
198 1         ensure_units_idle = asyncio.ensure_future(
199             asyncio.wait_for(
200                 JujuModelWatcher.ensure_units_idle(model, application), timeout
201             )
202         )
203 1         tasks = [
204             ensure_units_idle,
205         ]
206 1         (done, pending) = await asyncio.wait(
207             tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
208         )
209
210 1         if ensure_units_idle in pending:
211 1             ensure_units_idle.cancel()
212 1             raise TimeoutError(
213                 "Application's units failed to return to idle after {} seconds".format(
214                     timeout
215                 )
216             )
217 0         if ensure_units_idle.result():
218 0             pass
219
220 1     @staticmethod
221 1     async def ensure_units_idle(model: Model, application: Application):
222         """
223         Waits forever until the application's units to transition back to idle
224
225         :param: model:          Model to observe
226         :param: application:    The application to be observed
227         """
228
229 1         try:
230 1             allwatcher = client.AllWatcherFacade.from_connection(model.connection())
231 1             unit_wanted_state = "executing"
232 1             final_state_reached = False
233
234 1             units = application.units
235 1             final_state_seen = {unit.entity_id: False for unit in units}
236 1             agent_state_seen = {unit.entity_id: False for unit in units}
237 1             workload_state = {unit.entity_id: False for unit in units}
238
239 1             try:
240 1                 while not final_state_reached:
241 1                     change = await allwatcher.Next()
242
243                     # Keep checking to see if new units were added during the change
244 1                     for unit in units:
245 1                         if unit.entity_id not in final_state_seen:
246 1                             final_state_seen[unit.entity_id] = False
247 1                             agent_state_seen[unit.entity_id] = False
248 1                             workload_state[unit.entity_id] = False
249
250 1                     for delta in change.deltas:
251 1                         await asyncio.sleep(0)
252 1                         if delta.entity != units[0].entity_type:
253 1                             continue
254
255 1                         final_state_reached = True
256 1                         for unit in units:
257 1                             if delta.data["name"] == unit.entity_id:
258 1                                 status = delta.data["agent-status"]["current"]
259 1                                 workload_state[unit.entity_id] = delta.data[
260                                     "workload-status"
261                                 ]["current"]
262
263 1                                 if status == unit_wanted_state:
264 1                                     agent_state_seen[unit.entity_id] = True
265 1                                     final_state_seen[unit.entity_id] = False
266
267 1                                 if (
268                                     status == "idle"
269                                     and agent_state_seen[unit.entity_id]
270                                 ):
271 1                                     final_state_seen[unit.entity_id] = True
272
273 1                             final_state_reached = (
274                                 final_state_reached
275                                 and final_state_seen[unit.entity_id]
276                                 and workload_state[unit.entity_id]
277                                 in [
278                                     "active",
279                                     "error",
280                                 ]
281                             )
282
283 1             except ConnectionClosed:
284 0                 pass
285                 # This is expected to happen when the
286                 # entity reaches its final state, because
287                 # the model connection is closed afterwards
288 1         except Exception as e:
289 0             raise e
290
291 1     @staticmethod
292 1     async def model_watcher(
293         model: Model,
294         entity_id: str,
295         entity_type: str,
296         timeout: float,
297         db_dict: dict = None,
298         n2vc: N2VCConnector = None,
299         vca_id: str = None,
300     ):
301         """
302         Observes the changes related to an specific entity in a model
303
304         :param: model:          Model to observe
305         :param: entity_id:      ID of the entity to be observed
306         :param: entity_type:    Entity Type (p.e. "application", "machine, and "action")
307         :param: timeout:        Maximum time between two updates in the model
308         :param: db_dict:        Dictionary with data of the DB to write the updates
309         :param: n2vc:           N2VC Connector objector
310         :param: vca_id:         VCA ID
311
312         :raises: asyncio.TimeoutError when timeout reaches
313         """
314
315 1         try:
316 1             allwatcher = client.AllWatcherFacade.from_connection(model.connection())
317
318             # Genenerate array with entity types to listen
319 1             entity_types = (
320                 [entity_type, "unit"]
321                 if entity_type == "application"  # TODO: Add "action" too
322                 else [entity_type]
323             )
324
325             # Get time when it should timeout
326 1             timeout_end = time.time() + timeout
327
328 1             try:
329                 while True:
330 1                     change = await allwatcher.Next()
331 1                     for delta in change.deltas:
332 1                         write = False
333 1                         delta_entity = None
334
335                         # Get delta EntityType
336 1                         delta_entity = delta.entity
337
338 1                         if delta_entity in entity_types:
339                             # Get entity id
340 1                             id = None
341 1                             if entity_type == "application":
342 1                                 id = (
343                                     delta.data["application"]
344                                     if delta_entity == "unit"
345                                     else delta.data["name"]
346                                 )
347                             else:
348 1                                 if "id" in delta.data:
349 1                                     id = delta.data["id"]
350                                 else:
351 0                                     print("No id {}".format(delta.data))
352
353                             # Write if the entity id match
354 1                             write = True if id == entity_id else False
355
356                             # Update timeout
357 1                             timeout_end = time.time() + timeout
358 1                             (
359                                 status,
360                                 status_message,
361                                 vca_status,
362                             ) = JujuModelWatcher.get_status(delta)
363
364 1                             if write and n2vc is not None and db_dict:
365                                 # Write status to DB
366 1                                 status = n2vc.osm_status(delta_entity, status)
367 1                                 await n2vc.write_app_status_to_db(
368                                     db_dict=db_dict,
369                                     status=status,
370                                     detailed_status=status_message,
371                                     vca_status=vca_status,
372                                     entity_type=delta_entity,
373                                     vca_id=vca_id,
374                                 )
375                     # Check if timeout
376 1                     if time.time() > timeout_end:
377 1                         raise asyncio.TimeoutError()
378 1             except ConnectionClosed:
379 0                 pass
380                 # This is expected to happen when the
381                 # entity reaches its final state, because
382                 # the model connection is closed afterwards
383 1         except Exception as e:
384 1             raise e
385
386 1     @staticmethod
387 1     def get_status(delta: Delta) -> (str, str, str):
388         """
389         Get status from delta
390
391         :param: delta:          Delta generated by the allwatcher
392         :param: entity_type:    Entity Type (p.e. "application", "machine, and "action")
393
394         :return (status, message, vca_status)
395         """
396 1         if delta.entity == "machine":
397 1             return (
398                 delta.data["agent-status"]["current"],
399                 delta.data["instance-status"]["message"],
400                 delta.data["instance-status"]["current"],
401             )
402 1         elif delta.entity == "action":
403 1             return (
404                 delta.data["status"],
405                 delta.data["status"],
406                 delta.data["status"],
407             )
408 1         elif delta.entity == "application":
409 1             return (
410                 delta.data["status"]["current"],
411                 delta.data["status"]["message"],
412                 delta.data["status"]["current"],
413             )
414 1         elif delta.entity == "unit":
415 1             return (
416                 delta.data["workload-status"]["current"],
417                 delta.data["workload-status"]["message"],
418                 delta.data["workload-status"]["current"],
419             )