1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.client
import client
18 from n2vc
.exceptions
import EntityInvalidException
19 from n2vc
.n2vc_conn
import N2VCConnector
20 from juju
.model
import ModelEntity
, Model
21 from juju
.client
.overrides
import Delta
22 from juju
.status
import derive_status
23 from juju
.application
import Application
24 from websockets
.exceptions
import ConnectionClosed
27 logger
= logging
.getLogger("__main__")
30 def status(application
: Application
) -> str:
32 for unit
in application
.units
:
33 unit_status
.append(unit
.workload_status
)
34 return derive_status(unit_status
)
37 def entity_ready(entity
: ModelEntity
) -> bool:
38 entity_type
= entity
.entity_type
39 if entity_type
== "machine":
40 return entity
.agent_status
in ["started"]
41 elif entity_type
== "action":
42 return entity
.status
in ["completed", "failed", "cancelled"]
43 elif entity_type
== "application":
44 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
45 return status(entity
) in ["active", "blocked"]
47 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
50 class JujuModelWatcher
:
55 progress_timeout
: float = 3600,
56 total_timeout
: float = 3600,
58 n2vc
: N2VCConnector
= None,
61 Wait for entity to reach its final state.
63 :param: model: Model to observe
64 :param: entity: Entity object
65 :param: progress_timeout: Maximum time between two updates in the model
66 :param: total_timeout: Timeout for the entity to be active
67 :param: db_dict: Dictionary with data of the DB to write the updates
68 :param: n2vc: N2VC Connector objector
70 :raises: asyncio.TimeoutError when timeout reaches
73 if progress_timeout
is None:
74 progress_timeout
= 3600.0
75 if total_timeout
is None:
76 total_timeout
= 3600.0
78 entity_type
= entity
.entity_type
79 if entity_type
not in ["application", "action", "machine"]:
80 raise EntityInvalidException("Unknown entity type: {}".format(entity_type
))
82 # Coroutine to wait until the entity reaches the final state
83 wait_for_entity
= asyncio
.ensure_future(
85 model
.block_until(lambda: entity_ready(entity
)), timeout
=total_timeout
,
89 # Coroutine to watch the model for changes (and write them to DB)
90 watcher
= asyncio
.ensure_future(
91 JujuModelWatcher
.model_watcher(
93 entity_id
=entity
.entity_id
,
94 entity_type
=entity_type
,
95 timeout
=progress_timeout
,
101 tasks
= [wait_for_entity
, watcher
]
103 # Execute tasks, and stop when the first is finished
104 # The watcher task won't never finish (unless it timeouts)
105 await asyncio
.wait(tasks
, return_when
=asyncio
.FIRST_COMPLETED
)
106 except Exception as e
:
114 async def model_watcher(
119 db_dict
: dict = None,
120 n2vc
: N2VCConnector
= None,
123 Observes the changes related to an specific entity in a model
125 :param: model: Model to observe
126 :param: entity_id: ID of the entity to be observed
127 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
128 :param: timeout: Maximum time between two updates in the model
129 :param: db_dict: Dictionary with data of the DB to write the updates
130 :param: n2vc: N2VC Connector objector
132 :raises: asyncio.TimeoutError when timeout reaches
135 allwatcher
= client
.AllWatcherFacade
.from_connection(model
.connection())
137 # Genenerate array with entity types to listen
139 [entity_type
, "unit"]
140 if entity_type
== "application" # TODO: Add "action" too
144 # Get time when it should timeout
145 timeout_end
= time
.time() + timeout
149 change
= await allwatcher
.Next()
150 for delta
in change
.deltas
:
154 # Get delta EntityType
155 delta_entity
= delta
.entity
157 if delta_entity
in entity_types
:
159 if entity_type
== "application":
161 delta
.data
["application"]
162 if delta_entity
== "unit"
163 else delta
.data
["name"]
166 id = delta
.data
["id"]
168 # Write if the entity id match
169 write
= True if id == entity_id
else False
172 timeout_end
= time
.time() + timeout
177 ) = JujuModelWatcher
.get_status(delta
)
179 if write
and n2vc
is not None and db_dict
:
181 status
= n2vc
.osm_status(delta_entity
, status
)
182 await n2vc
.write_app_status_to_db(
185 detailed_status
=status_message
,
186 vca_status
=vca_status
,
187 entity_type
=delta_entity
,
190 if time
.time() > timeout_end
:
191 raise asyncio
.TimeoutError()
192 except ConnectionClosed
:
194 # This is expected to happen when the
195 # entity reaches its final state, because
196 # the model connection is closed afterwards
199 def get_status(delta
: Delta
) -> (str, str, str):
201 Get status from delta
203 :param: delta: Delta generated by the allwatcher
204 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
206 :return (status, message, vca_status)
208 if delta
.entity
== "machine":
210 delta
.data
["agent-status"]["current"],
211 delta
.data
["instance-status"]["message"],
212 delta
.data
["instance-status"]["current"],
214 elif delta
.entity
== "action":
216 delta
.data
["status"],
217 delta
.data
["status"],
218 delta
.data
["status"],
220 elif delta
.entity
== "application":
222 delta
.data
["status"]["current"],
223 delta
.data
["status"]["message"],
224 delta
.data
["status"]["current"],
226 elif delta
.entity
== "unit":
228 delta
.data
["workload-status"]["current"],
229 delta
.data
["workload-status"]["message"],
230 delta
.data
["workload-status"]["current"],