1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from juju
.client
import client
19 from n2vc
.exceptions
import EntityInvalidException
20 from n2vc
.n2vc_conn
import N2VCConnector
21 from juju
.model
import ModelEntity
, Model
22 from juju
.client
.overrides
import Delta
23 from juju
.status
import derive_status
24 from juju
.application
import Application
25 from websockets
.exceptions
import ConnectionClosed
28 logger
= logging
.getLogger("__main__")
31 def status(application
: Application
) -> str:
33 for unit
in application
.units
:
34 unit_status
.append(unit
.workload_status
)
35 return derive_status(unit_status
)
38 def entity_ready(entity
: ModelEntity
) -> bool:
40 Check if the entity is ready
42 :param: entity: Model entity. It can be a machine, action, or application.
44 :returns: boolean saying if the entity is ready or not
47 entity_type
= entity
.entity_type
48 if entity_type
== "machine":
49 return entity
.agent_status
in ["started"]
50 elif entity_type
== "action":
51 return entity
.status
in ["completed", "failed", "cancelled"]
52 elif entity_type
== "application":
53 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
54 return entity
.status
in ["active", "blocked"]
55 elif entity_type
== "unit":
56 return entity
.agent_status
in ["idle"]
58 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
61 def application_ready(application
: Application
) -> bool:
63 Check if an application has a leader
65 :param: application: Application entity.
67 :returns: boolean saying if the application has a unit that is a leader.
69 ready_status_list
= ["active", "blocked"]
70 application_ready
= application
.status
in ready_status_list
72 unit
.workload_status
in ready_status_list
for unit
in application
.units
74 return application_ready
and units_ready
77 class JujuModelWatcher
:
79 async def wait_for_model(model
: Model
, timeout
: float = 3600):
81 Wait for all entities in model to reach its final state.
83 :param: model: Model to observe
84 :param: timeout: Timeout for the model applications to be active
86 :raises: asyncio.TimeoutError when timeout reaches
92 # Coroutine to wait until the entity reaches the final state
93 async def wait_until_model_ready():
94 wait_for_entity
= asyncio
.ensure_future(
98 application_ready(application
)
99 for application
in model
.applications
.values()
106 tasks
= [wait_for_entity
]
108 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
114 await wait_until_model_ready()
115 # Check model is still ready after 10 seconds
117 await asyncio
.sleep(10)
118 await wait_until_model_ready()
124 progress_timeout
: float = 3600,
125 total_timeout
: float = 3600,
126 db_dict
: dict = None,
127 n2vc
: N2VCConnector
= None,
131 Wait for entity to reach its final state.
133 :param: model: Model to observe
134 :param: entity: Entity object
135 :param: progress_timeout: Maximum time between two updates in the model
136 :param: total_timeout: Timeout for the entity to be active
137 :param: db_dict: Dictionary with data of the DB to write the updates
138 :param: n2vc: N2VC Connector objector
139 :param: vca_id: VCA ID
141 :raises: asyncio.TimeoutError when timeout reaches
144 if progress_timeout
is None:
145 progress_timeout
= 3600.0
146 if total_timeout
is None:
147 total_timeout
= 3600.0
149 entity_type
= entity
.entity_type
150 if entity_type
not in ["application", "action", "machine", "unit"]:
151 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
153 # Coroutine to wait until the entity reaches the final state
154 wait_for_entity
= asyncio
.ensure_future(
156 model
.block_until(lambda: entity_ready(entity
)),
157 timeout
=total_timeout
,
161 # Coroutine to watch the model for changes (and write them to DB)
162 watcher
= asyncio
.ensure_future(
163 JujuModelWatcher
.model_watcher(
165 entity_id
=entity
.entity_id
,
166 entity_type
=entity_type
,
167 timeout
=progress_timeout
,
174 tasks
= [wait_for_entity
, watcher
]
176 # Execute tasks, and stop when the first is finished
177 # The watcher task won't never finish (unless it timeouts)
178 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
185 async def wait_for_units_idle(
186 model
: Model
, application
: Application
, timeout
: float = 60
189 Waits for the application and all its units to transition back to idle
191 :param: model: Model to observe
192 :param: application: The application to be observed
193 :param: timeout: Maximum time between two updates in the model
195 :raises: asyncio.TimeoutError when timeout reaches
198 ensure_units_idle
= asyncio
.ensure_future(
200 JujuModelWatcher
.ensure_units_idle(model
, application
), timeout
206 (done
, pending
) = await asyncio
.wait(
207 tasks
, timeout
=timeout
, return_when
=asyncio
.FIRST_COMPLETED
210 if ensure_units_idle
in pending
:
211 ensure_units_idle
.cancel()
213 "Application's units failed to return to idle after {} seconds".format(
217 if ensure_units_idle
.result():
221 async def ensure_units_idle(model
: Model
, application
: Application
):
223 Waits forever until the application's units to transition back to idle
225 :param: model: Model to observe
226 :param: application: The application to be observed
230 allwatcher
= client
.AllWatcherFacade
.from_connection(model
.connection())
231 unit_wanted_state
= "executing"
232 final_state_reached
= False
234 units
= application
.units
235 final_state_seen
= {unit
.entity_id
: False for unit
in units
}
236 agent_state_seen
= {unit
.entity_id
: False for unit
in units
}
237 workload_state
= {unit
.entity_id
: False for unit
in units
}
240 while not final_state_reached
:
241 change
= await allwatcher
.Next()
243 # Keep checking to see if new units were added during the change
245 if unit
.entity_id
not in final_state_seen
:
246 final_state_seen
[unit
.entity_id
] = False
247 agent_state_seen
[unit
.entity_id
] = False
248 workload_state
[unit
.entity_id
] = False
250 for delta
in change
.deltas
:
251 await asyncio
.sleep(0)
252 if delta
.entity
!= units
[0].entity_type
:
255 final_state_reached
= True
257 if delta
.data
["name"] == unit
.entity_id
:
258 status
= delta
.data
["agent-status"]["current"]
259 workload_state
[unit
.entity_id
] = delta
.data
[
263 if status
== unit_wanted_state
:
264 agent_state_seen
[unit
.entity_id
] = True
265 final_state_seen
[unit
.entity_id
] = False
269 and agent_state_seen
[unit
.entity_id
]
271 final_state_seen
[unit
.entity_id
] = True
273 final_state_reached
= (
275 and final_state_seen
[unit
.entity_id
]
276 and workload_state
[unit
.entity_id
]
283 except ConnectionClosed
:
285 # This is expected to happen when the
286 # entity reaches its final state, because
287 # the model connection is closed afterwards
288 except Exception as e
:
292 async def model_watcher(
297 db_dict
: dict = None,
298 n2vc
: N2VCConnector
= None,
302 Observes the changes related to an specific entity in a model
304 :param: model: Model to observe
305 :param: entity_id: ID of the entity to be observed
306 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
307 :param: timeout: Maximum time between two updates in the model
308 :param: db_dict: Dictionary with data of the DB to write the updates
309 :param: n2vc: N2VC Connector objector
310 :param: vca_id: VCA ID
312 :raises: asyncio.TimeoutError when timeout reaches
316 allwatcher
= client
.AllWatcherFacade
.from_connection(model
.connection())
318 # Genenerate array with entity types to listen
320 [entity_type
, "unit"]
321 if entity_type
== "application" # TODO: Add "action" too
325 # Get time when it should timeout
326 timeout_end
= time
.time() + timeout
330 change
= await allwatcher
.Next()
331 for delta
in change
.deltas
:
335 # Get delta EntityType
336 delta_entity
= delta
.entity
338 if delta_entity
in entity_types
:
341 if entity_type
== "application":
343 delta
.data
["application"]
344 if delta_entity
== "unit"
345 else delta
.data
["name"]
348 if "id" in delta
.data
:
349 id = delta
.data
["id"]
351 print("No id {}".format(delta
.data
))
353 # Write if the entity id match
354 write
= True if id == entity_id
else False
357 timeout_end
= time
.time() + timeout
362 ) = JujuModelWatcher
.get_status(delta
)
364 if write
and n2vc
is not None and db_dict
:
366 status
= n2vc
.osm_status(delta_entity
, status
)
367 await n2vc
.write_app_status_to_db(
370 detailed_status
=status_message
,
371 vca_status
=vca_status
,
372 entity_type
=delta_entity
,
376 if time
.time() > timeout_end
:
377 raise asyncio
.TimeoutError()
378 except ConnectionClosed
:
380 # This is expected to happen when the
381 # entity reaches its final state, because
382 # the model connection is closed afterwards
383 except Exception as e
:
387 def get_status(delta
: Delta
) -> (str, str, str):
389 Get status from delta
391 :param: delta: Delta generated by the allwatcher
392 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
394 :return (status, message, vca_status)
396 if delta
.entity
== "machine":
398 delta
.data
["agent-status"]["current"],
399 delta
.data
["instance-status"]["message"],
400 delta
.data
["instance-status"]["current"],
402 elif delta
.entity
== "action":
404 delta
.data
["status"],
405 delta
.data
["status"],
406 delta
.data
["status"],
408 elif delta
.entity
== "application":
410 delta
.data
["status"]["current"],
411 delta
.data
["status"]["message"],
412 delta
.data
["status"]["current"],
414 elif delta
.entity
== "unit":
416 delta
.data
["workload-status"]["current"],
417 delta
.data
["workload-status"]["message"],
418 delta
.data
["workload-status"]["current"],