fix 1208: add native charm support for rhel and fix centos support
[osm/N2VC.git] / n2vc / juju_watcher.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import time
17 from juju.client import client
18 from n2vc.exceptions import EntityInvalidException
19 from n2vc.n2vc_conn import N2VCConnector
20 from juju.model import ModelEntity, Model
21 from juju.client.overrides import Delta
22 from juju.status import derive_status
23 from juju.application import Application
24 from websockets.exceptions import ConnectionClosed
25 import logging
26
27 logger = logging.getLogger("__main__")
28
29
30 def status(application: Application) -> str:
31 unit_status = []
32 for unit in application.units:
33 unit_status.append(unit.workload_status)
34 return derive_status(unit_status)
35
36
37 def entity_ready(entity: ModelEntity) -> bool:
38 """
39 Check if the entity is ready
40
41 :param: entity: Model entity. It can be a machine, action, or application.
42
43 :returns: boolean saying if the entity is ready or not
44 """
45 entity_type = entity.entity_type
46 if entity_type == "machine":
47 return entity.agent_status in ["started"]
48 elif entity_type == "action":
49 return entity.status in ["completed", "failed", "cancelled"]
50 elif entity_type == "application":
51 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
52 return entity.status in ["active", "blocked"]
53 else:
54 raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
55
56
57 def application_ready(application: Application) -> bool:
58 """
59 Check if an application has a leader
60
61 :param: application: Application entity.
62
63 :returns: boolean saying if the application has a unit that is a leader.
64 """
65 ready_status_list = ["active", "blocked"]
66 application_ready = application.status in ready_status_list
67 units_ready = all(
68 unit.workload_status in ready_status_list for unit in application.units
69 )
70 return application_ready and units_ready
71
72
73 class JujuModelWatcher:
74 @staticmethod
75 async def wait_for_model(model: Model, timeout: float = 3600):
76 """
77 Wait for all entities in model to reach its final state.
78
79 :param: model: Model to observe
80 :param: timeout: Timeout for the model applications to be active
81
82 :raises: asyncio.TimeoutError when timeout reaches
83 """
84
85 if timeout is None:
86 timeout = 3600.0
87
88 # Coroutine to wait until the entity reaches the final state
89 async def wait_until_model_ready():
90 wait_for_entity = asyncio.ensure_future(
91 asyncio.wait_for(
92 model.block_until(
93 lambda: all(
94 application_ready(application)
95 for application in model.applications.values()
96 ),
97 ),
98 timeout=timeout,
99 )
100 )
101
102 tasks = [wait_for_entity]
103 try:
104 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
105 finally:
106 # Cancel tasks
107 for task in tasks:
108 task.cancel()
109
110 await wait_until_model_ready()
111 # Check model is still ready after 10 seconds
112
113 await asyncio.sleep(10)
114 await wait_until_model_ready()
115
116 @staticmethod
117 async def wait_for(
118 model: Model,
119 entity: ModelEntity,
120 progress_timeout: float = 3600,
121 total_timeout: float = 3600,
122 db_dict: dict = None,
123 n2vc: N2VCConnector = None,
124 ):
125 """
126 Wait for entity to reach its final state.
127
128 :param: model: Model to observe
129 :param: entity: Entity object
130 :param: progress_timeout: Maximum time between two updates in the model
131 :param: total_timeout: Timeout for the entity to be active
132 :param: db_dict: Dictionary with data of the DB to write the updates
133 :param: n2vc: N2VC Connector objector
134
135 :raises: asyncio.TimeoutError when timeout reaches
136 """
137
138 if progress_timeout is None:
139 progress_timeout = 3600.0
140 if total_timeout is None:
141 total_timeout = 3600.0
142
143 entity_type = entity.entity_type
144 if entity_type not in ["application", "action", "machine"]:
145 raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
146
147 # Coroutine to wait until the entity reaches the final state
148 wait_for_entity = asyncio.ensure_future(
149 asyncio.wait_for(
150 model.block_until(lambda: entity_ready(entity)),
151 timeout=total_timeout,
152 )
153 )
154
155 # Coroutine to watch the model for changes (and write them to DB)
156 watcher = asyncio.ensure_future(
157 JujuModelWatcher.model_watcher(
158 model,
159 entity_id=entity.entity_id,
160 entity_type=entity_type,
161 timeout=progress_timeout,
162 db_dict=db_dict,
163 n2vc=n2vc,
164 )
165 )
166
167 tasks = [wait_for_entity, watcher]
168 try:
169 # Execute tasks, and stop when the first is finished
170 # The watcher task won't never finish (unless it timeouts)
171 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
172 finally:
173 # Cancel tasks
174 for task in tasks:
175 task.cancel()
176
177 @staticmethod
178 async def model_watcher(
179 model: Model,
180 entity_id: str,
181 entity_type: str,
182 timeout: float,
183 db_dict: dict = None,
184 n2vc: N2VCConnector = None,
185 ):
186 """
187 Observes the changes related to an specific entity in a model
188
189 :param: model: Model to observe
190 :param: entity_id: ID of the entity to be observed
191 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
192 :param: timeout: Maximum time between two updates in the model
193 :param: db_dict: Dictionary with data of the DB to write the updates
194 :param: n2vc: N2VC Connector objector
195
196 :raises: asyncio.TimeoutError when timeout reaches
197 """
198
199 allwatcher = client.AllWatcherFacade.from_connection(model.connection())
200
201 # Genenerate array with entity types to listen
202 entity_types = (
203 [entity_type, "unit"]
204 if entity_type == "application" # TODO: Add "action" too
205 else [entity_type]
206 )
207
208 # Get time when it should timeout
209 timeout_end = time.time() + timeout
210
211 try:
212 while True:
213 change = await allwatcher.Next()
214 for delta in change.deltas:
215 write = False
216 delta_entity = None
217
218 # Get delta EntityType
219 delta_entity = delta.entity
220
221 if delta_entity in entity_types:
222 # Get entity id
223 if entity_type == "application":
224 id = (
225 delta.data["application"]
226 if delta_entity == "unit"
227 else delta.data["name"]
228 )
229 else:
230 id = delta.data["id"]
231
232 # Write if the entity id match
233 write = True if id == entity_id else False
234
235 # Update timeout
236 timeout_end = time.time() + timeout
237 (
238 status,
239 status_message,
240 vca_status,
241 ) = JujuModelWatcher.get_status(delta)
242
243 if write and n2vc is not None and db_dict:
244 # Write status to DB
245 status = n2vc.osm_status(delta_entity, status)
246 await n2vc.write_app_status_to_db(
247 db_dict=db_dict,
248 status=status,
249 detailed_status=status_message,
250 vca_status=vca_status,
251 entity_type=delta_entity,
252 )
253 # Check if timeout
254 if time.time() > timeout_end:
255 raise asyncio.TimeoutError()
256 except ConnectionClosed:
257 pass
258 # This is expected to happen when the
259 # entity reaches its final state, because
260 # the model connection is closed afterwards
261
262 @staticmethod
263 def get_status(delta: Delta) -> (str, str, str):
264 """
265 Get status from delta
266
267 :param: delta: Delta generated by the allwatcher
268 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
269
270 :return (status, message, vca_status)
271 """
272 if delta.entity == "machine":
273 return (
274 delta.data["agent-status"]["current"],
275 delta.data["instance-status"]["message"],
276 delta.data["instance-status"]["current"],
277 )
278 elif delta.entity == "action":
279 return (
280 delta.data["status"],
281 delta.data["status"],
282 delta.data["status"],
283 )
284 elif delta.entity == "application":
285 return (
286 delta.data["status"]["current"],
287 delta.data["status"]["message"],
288 delta.data["status"]["current"],
289 )
290 elif delta.entity == "unit":
291 return (
292 delta.data["workload-status"]["current"],
293 delta.data["workload-status"]["message"],
294 delta.data["workload-status"]["current"],
295 )