1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.client
import client
18 from n2vc
.exceptions
import EntityInvalidException
19 from n2vc
.n2vc_conn
import N2VCConnector
20 from juju
.model
import ModelEntity
, Model
21 from juju
.client
.overrides
import Delta
22 from juju
.status
import derive_status
23 from juju
.application
import Application
24 from websockets
.exceptions
import ConnectionClosed
27 logger
= logging
.getLogger("__main__")
30 def status(application
: Application
) -> str:
32 for unit
in application
.units
:
33 unit_status
.append(unit
.workload_status
)
34 return derive_status(unit_status
)
37 def entity_ready(entity
: ModelEntity
) -> bool:
39 Check if the entity is ready
41 :param: entity: Model entity. It can be a machine, action, or application.
43 :returns: boolean saying if the entity is ready or not
45 entity_type
= entity
.entity_type
46 if entity_type
== "machine":
47 return entity
.agent_status
in ["started"]
48 elif entity_type
== "action":
49 return entity
.status
in ["completed", "failed", "cancelled"]
50 elif entity_type
== "application":
51 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
52 return entity
.status
in ["active", "blocked"]
54 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
57 def application_ready(application
: Application
) -> bool:
59 Check if an application has a leader
61 :param: application: Application entity.
63 :returns: boolean saying if the application has a unit that is a leader.
65 ready_status_list
= ["active", "blocked"]
66 application_ready
= application
.status
in ready_status_list
68 unit
.workload_status
in ready_status_list
for unit
in application
.units
70 return application_ready
and units_ready
73 class JujuModelWatcher
:
75 async def wait_for_model(
80 Wait for all entities in model to reach its final state.
82 :param: model: Model to observe
83 :param: timeout: Timeout for the model applications to be active
85 :raises: asyncio.TimeoutError when timeout reaches
91 # Coroutine to wait until the entity reaches the final state
92 async def wait_until_model_ready():
93 wait_for_entity
= asyncio
.ensure_future(
97 application_ready(application
)
98 for application
in model
.applications
.values()
105 tasks
= [wait_for_entity
]
107 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
113 await wait_until_model_ready()
114 # Check model is still ready after 10 seconds
116 await asyncio
.sleep(10)
117 await wait_until_model_ready()
123 progress_timeout
: float = 3600,
124 total_timeout
: float = 3600,
125 db_dict
: dict = None,
126 n2vc
: N2VCConnector
= None,
130 Wait for entity to reach its final state.
132 :param: model: Model to observe
133 :param: entity: Entity object
134 :param: progress_timeout: Maximum time between two updates in the model
135 :param: total_timeout: Timeout for the entity to be active
136 :param: db_dict: Dictionary with data of the DB to write the updates
137 :param: n2vc: N2VC Connector objector
138 :param: vca_id: VCA ID
140 :raises: asyncio.TimeoutError when timeout reaches
143 if progress_timeout
is None:
144 progress_timeout
= 3600.0
145 if total_timeout
is None:
146 total_timeout
= 3600.0
148 entity_type
= entity
.entity_type
149 if entity_type
not in ["application", "action", "machine"]:
150 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
152 # Coroutine to wait until the entity reaches the final state
153 wait_for_entity
= asyncio
.ensure_future(
155 model
.block_until(lambda: entity_ready(entity
)),
156 timeout
=total_timeout
,
160 # Coroutine to watch the model for changes (and write them to DB)
161 watcher
= asyncio
.ensure_future(
162 JujuModelWatcher
.model_watcher(
164 entity_id
=entity
.entity_id
,
165 entity_type
=entity_type
,
166 timeout
=progress_timeout
,
173 tasks
= [wait_for_entity
, watcher
]
175 # Execute tasks, and stop when the first is finished
176 # The watcher task won't never finish (unless it timeouts)
177 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
184 async def model_watcher(
189 db_dict
: dict = None,
190 n2vc
: N2VCConnector
= None,
194 Observes the changes related to an specific entity in a model
196 :param: model: Model to observe
197 :param: entity_id: ID of the entity to be observed
198 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
199 :param: timeout: Maximum time between two updates in the model
200 :param: db_dict: Dictionary with data of the DB to write the updates
201 :param: n2vc: N2VC Connector objector
202 :param: vca_id: VCA ID
204 :raises: asyncio.TimeoutError when timeout reaches
207 allwatcher
= client
.AllWatcherFacade
.from_connection(model
.connection())
209 # Genenerate array with entity types to listen
211 [entity_type
, "unit"]
212 if entity_type
== "application" # TODO: Add "action" too
216 # Get time when it should timeout
217 timeout_end
= time
.time() + timeout
221 change
= await allwatcher
.Next()
222 for delta
in change
.deltas
:
226 # Get delta EntityType
227 delta_entity
= delta
.entity
229 if delta_entity
in entity_types
:
231 if entity_type
== "application":
233 delta
.data
["application"]
234 if delta_entity
== "unit"
235 else delta
.data
["name"]
238 id = delta
.data
["id"]
240 # Write if the entity id match
241 write
= True if id == entity_id
else False
244 timeout_end
= time
.time() + timeout
249 ) = JujuModelWatcher
.get_status(delta
)
251 if write
and n2vc
is not None and db_dict
:
253 status
= n2vc
.osm_status(delta_entity
, status
)
254 await n2vc
.write_app_status_to_db(
257 detailed_status
=status_message
,
258 vca_status
=vca_status
,
259 entity_type
=delta_entity
,
263 if time
.time() > timeout_end
:
264 raise asyncio
.TimeoutError()
265 except ConnectionClosed
:
267 # This is expected to happen when the
268 # entity reaches its final state, because
269 # the model connection is closed afterwards
272 def get_status(delta
: Delta
) -> (str, str, str):
274 Get status from delta
276 :param: delta: Delta generated by the allwatcher
277 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
279 :return (status, message, vca_status)
281 if delta
.entity
== "machine":
283 delta
.data
["agent-status"]["current"],
284 delta
.data
["instance-status"]["message"],
285 delta
.data
["instance-status"]["current"],
287 elif delta
.entity
== "action":
289 delta
.data
["status"],
290 delta
.data
["status"],
291 delta
.data
["status"],
293 elif delta
.entity
== "application":
295 delta
.data
["status"]["current"],
296 delta
.data
["status"]["message"],
297 delta
.data
["status"]["current"],
299 elif delta
.entity
== "unit":
301 delta
.data
["workload-status"]["current"],
302 delta
.data
["workload-status"]["message"],
303 delta
.data
["workload-status"]["current"],