1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.client
import client
18 from n2vc
.exceptions
import EntityInvalidException
19 from n2vc
.n2vc_conn
import N2VCConnector
20 from juju
.model
import ModelEntity
, Model
21 from juju
.client
.overrides
import Delta
22 from juju
.status
import derive_status
23 from juju
.application
import Application
24 from websockets
.exceptions
import ConnectionClosed
27 logger
= logging
.getLogger("__main__")
30 def status(application
: Application
) -> str:
32 for unit
in application
.units
:
33 unit_status
.append(unit
.workload_status
)
34 return derive_status(unit_status
)
37 def entity_ready(entity
: ModelEntity
) -> bool:
39 Check if the entity is ready
41 :param: entity: Model entity. It can be a machine, action, or application.
43 :returns: boolean saying if the entity is ready or not
45 entity_type
= entity
.entity_type
46 if entity_type
== "machine":
47 return entity
.agent_status
in ["started"]
48 elif entity_type
== "action":
49 return entity
.status
in ["completed", "failed", "cancelled"]
50 elif entity_type
== "application":
51 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
52 return entity
.status
in ["active", "blocked"]
54 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
57 def application_ready(application
: Application
) -> bool:
59 Check if an application has a leader
61 :param: application: Application entity.
63 :returns: boolean saying if the application has a unit that is a leader.
65 ready_status_list
= ["active", "blocked"]
66 application_ready
= application
.status
in ready_status_list
68 unit
.workload_status
in ready_status_list
for unit
in application
.units
70 return application_ready
and units_ready
73 class JujuModelWatcher
:
75 async def wait_for_model(model
: Model
, timeout
: float = 3600):
77 Wait for all entities in model to reach its final state.
79 :param: model: Model to observe
80 :param: timeout: Timeout for the model applications to be active
82 :raises: asyncio.TimeoutError when timeout reaches
88 # Coroutine to wait until the entity reaches the final state
89 async def wait_until_model_ready():
90 wait_for_entity
= asyncio
.ensure_future(
94 application_ready(application
)
95 for application
in model
.applications
.values()
102 tasks
= [wait_for_entity
]
104 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
110 await wait_until_model_ready()
111 # Check model is still ready after 10 seconds
113 await asyncio
.sleep(10)
114 await wait_until_model_ready()
120 progress_timeout
: float = 3600,
121 total_timeout
: float = 3600,
122 db_dict
: dict = None,
123 n2vc
: N2VCConnector
= None,
127 Wait for entity to reach its final state.
129 :param: model: Model to observe
130 :param: entity: Entity object
131 :param: progress_timeout: Maximum time between two updates in the model
132 :param: total_timeout: Timeout for the entity to be active
133 :param: db_dict: Dictionary with data of the DB to write the updates
134 :param: n2vc: N2VC Connector objector
135 :param: vca_id: VCA ID
137 :raises: asyncio.TimeoutError when timeout reaches
140 if progress_timeout
is None:
141 progress_timeout
= 3600.0
142 if total_timeout
is None:
143 total_timeout
= 3600.0
145 entity_type
= entity
.entity_type
146 if entity_type
not in ["application", "action", "machine"]:
147 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
149 # Coroutine to wait until the entity reaches the final state
150 wait_for_entity
= asyncio
.ensure_future(
152 model
.block_until(lambda: entity_ready(entity
)),
153 timeout
=total_timeout
,
157 # Coroutine to watch the model for changes (and write them to DB)
158 watcher
= asyncio
.ensure_future(
159 JujuModelWatcher
.model_watcher(
161 entity_id
=entity
.entity_id
,
162 entity_type
=entity_type
,
163 timeout
=progress_timeout
,
170 tasks
= [wait_for_entity
, watcher
]
172 # Execute tasks, and stop when the first is finished
173 # The watcher task won't never finish (unless it timeouts)
174 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
181 async def model_watcher(
186 db_dict
: dict = None,
187 n2vc
: N2VCConnector
= None,
191 Observes the changes related to an specific entity in a model
193 :param: model: Model to observe
194 :param: entity_id: ID of the entity to be observed
195 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
196 :param: timeout: Maximum time between two updates in the model
197 :param: db_dict: Dictionary with data of the DB to write the updates
198 :param: n2vc: N2VC Connector objector
199 :param: vca_id: VCA ID
201 :raises: asyncio.TimeoutError when timeout reaches
204 allwatcher
= client
.AllWatcherFacade
.from_connection(model
.connection())
206 # Genenerate array with entity types to listen
208 [entity_type
, "unit"]
209 if entity_type
== "application" # TODO: Add "action" too
213 # Get time when it should timeout
214 timeout_end
= time
.time() + timeout
218 change
= await allwatcher
.Next()
219 for delta
in change
.deltas
:
223 # Get delta EntityType
224 delta_entity
= delta
.entity
226 if delta_entity
in entity_types
:
228 if entity_type
== "application":
230 delta
.data
["application"]
231 if delta_entity
== "unit"
232 else delta
.data
["name"]
235 id = delta
.data
["id"]
237 # Write if the entity id match
238 write
= True if id == entity_id
else False
241 timeout_end
= time
.time() + timeout
246 ) = JujuModelWatcher
.get_status(delta
)
248 if write
and n2vc
is not None and db_dict
:
250 status
= n2vc
.osm_status(delta_entity
, status
)
251 await n2vc
.write_app_status_to_db(
254 detailed_status
=status_message
,
255 vca_status
=vca_status
,
256 entity_type
=delta_entity
,
260 if time
.time() > timeout_end
:
261 raise asyncio
.TimeoutError()
262 except ConnectionClosed
:
264 # This is expected to happen when the
265 # entity reaches its final state, because
266 # the model connection is closed afterwards
269 def get_status(delta
: Delta
) -> (str, str, str):
271 Get status from delta
273 :param: delta: Delta generated by the allwatcher
274 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
276 :return (status, message, vca_status)
278 if delta
.entity
== "machine":
280 delta
.data
["agent-status"]["current"],
281 delta
.data
["instance-status"]["message"],
282 delta
.data
["instance-status"]["current"],
284 elif delta
.entity
== "action":
286 delta
.data
["status"],
287 delta
.data
["status"],
288 delta
.data
["status"],
290 elif delta
.entity
== "application":
292 delta
.data
["status"]["current"],
293 delta
.data
["status"]["message"],
294 delta
.data
["status"]["current"],
296 elif delta
.entity
== "unit":
298 delta
.data
["workload-status"]["current"],
299 delta
.data
["workload-status"]["message"],
300 delta
.data
["workload-status"]["current"],