1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.client
import client
18 from n2vc
.exceptions
import EntityInvalidException
19 from n2vc
.n2vc_conn
import N2VCConnector
20 from juju
.model
import ModelEntity
, Model
21 from juju
.client
.overrides
import Delta
22 from juju
.status
import derive_status
23 from juju
.application
import Application
24 from websockets
.exceptions
import ConnectionClosed
27 logger
= logging
.getLogger("__main__")
30 def status(application
: Application
) -> str:
32 for unit
in application
.units
:
33 unit_status
.append(unit
.workload_status
)
34 return derive_status(unit_status
)
37 def entity_ready(entity
: ModelEntity
) -> bool:
38 entity_type
= entity
.entity_type
39 if entity_type
== "machine":
40 return entity
.agent_status
in ["started"]
41 elif entity_type
== "action":
42 return entity
.status
in ["completed", "failed", "cancelled"]
43 elif entity_type
== "application":
44 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
45 return status(entity
) in ["active", "blocked"]
47 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
50 class JujuModelWatcher
:
52 async def wait_for_model(model
: Model
, timeout
: float = 3600):
54 Wait for all entities in model to reach its final state.
56 :param: model: Model to observe
57 :param: timeout: Timeout for the model applications to be active
59 :raises: asyncio.TimeoutError when timeout reaches
65 # Coroutine to wait until the entity reaches the final state
66 wait_for_entity
= asyncio
.ensure_future(
70 entity_ready(entity
) for entity
in model
.applications
.values()
77 tasks
= [wait_for_entity
]
79 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
89 progress_timeout
: float = 3600,
90 total_timeout
: float = 3600,
92 n2vc
: N2VCConnector
= None,
95 Wait for entity to reach its final state.
97 :param: model: Model to observe
98 :param: entity: Entity object
99 :param: progress_timeout: Maximum time between two updates in the model
100 :param: total_timeout: Timeout for the entity to be active
101 :param: db_dict: Dictionary with data of the DB to write the updates
102 :param: n2vc: N2VC Connector objector
104 :raises: asyncio.TimeoutError when timeout reaches
107 if progress_timeout
is None:
108 progress_timeout
= 3600.0
109 if total_timeout
is None:
110 total_timeout
= 3600.0
112 entity_type
= entity
.entity_type
113 if entity_type
not in ["application", "action", "machine"]:
114 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
116 # Coroutine to wait until the entity reaches the final state
117 wait_for_entity
= asyncio
.ensure_future(
119 model
.block_until(lambda: entity_ready(entity
)), timeout
=total_timeout
,
123 # Coroutine to watch the model for changes (and write them to DB)
124 watcher
= asyncio
.ensure_future(
125 JujuModelWatcher
.model_watcher(
127 entity_id
=entity
.entity_id
,
128 entity_type
=entity_type
,
129 timeout
=progress_timeout
,
135 tasks
= [wait_for_entity
, watcher
]
137 # Execute tasks, and stop when the first is finished
138 # The watcher task won't never finish (unless it timeouts)
139 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
146 async def model_watcher(
151 db_dict
: dict = None,
152 n2vc
: N2VCConnector
= None,
155 Observes the changes related to an specific entity in a model
157 :param: model: Model to observe
158 :param: entity_id: ID of the entity to be observed
159 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
160 :param: timeout: Maximum time between two updates in the model
161 :param: db_dict: Dictionary with data of the DB to write the updates
162 :param: n2vc: N2VC Connector objector
164 :raises: asyncio.TimeoutError when timeout reaches
167 allwatcher
= client
.AllWatcherFacade
.from_connection(model
.connection())
169 # Genenerate array with entity types to listen
171 [entity_type
, "unit"]
172 if entity_type
== "application" # TODO: Add "action" too
176 # Get time when it should timeout
177 timeout_end
= time
.time() + timeout
181 change
= await allwatcher
.Next()
182 for delta
in change
.deltas
:
186 # Get delta EntityType
187 delta_entity
= delta
.entity
189 if delta_entity
in entity_types
:
191 if entity_type
== "application":
193 delta
.data
["application"]
194 if delta_entity
== "unit"
195 else delta
.data
["name"]
198 id = delta
.data
["id"]
200 # Write if the entity id match
201 write
= True if id == entity_id
else False
204 timeout_end
= time
.time() + timeout
209 ) = JujuModelWatcher
.get_status(delta
)
211 if write
and n2vc
is not None and db_dict
:
213 status
= n2vc
.osm_status(delta_entity
, status
)
214 await n2vc
.write_app_status_to_db(
217 detailed_status
=status_message
,
218 vca_status
=vca_status
,
219 entity_type
=delta_entity
,
222 if time
.time() > timeout_end
:
223 raise asyncio
.TimeoutError()
224 except ConnectionClosed
:
226 # This is expected to happen when the
227 # entity reaches its final state, because
228 # the model connection is closed afterwards
231 def get_status(delta
: Delta
) -> (str, str, str):
233 Get status from delta
235 :param: delta: Delta generated by the allwatcher
236 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
238 :return (status, message, vca_status)
240 if delta
.entity
== "machine":
242 delta
.data
["agent-status"]["current"],
243 delta
.data
["instance-status"]["message"],
244 delta
.data
["instance-status"]["current"],
246 elif delta
.entity
== "action":
248 delta
.data
["status"],
249 delta
.data
["status"],
250 delta
.data
["status"],
252 elif delta
.entity
== "application":
254 delta
.data
["status"]["current"],
255 delta
.data
["status"]["message"],
256 delta
.data
["status"]["current"],
258 elif delta
.entity
== "unit":
260 delta
.data
["workload-status"]["current"],
261 delta
.data
["workload-status"]["message"],
262 delta
.data
["workload-status"]["current"],