blob: e122786ccf7e33785a69a933585957b031a876fe [file] [log] [blame]
David Garcia4fee80e2020-05-13 12:18:38 +02001# Copyright 2020 Canonical Ltd.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import time
17from juju.client import client
David Garcia4fee80e2020-05-13 12:18:38 +020018from n2vc.exceptions import EntityInvalidException
19from n2vc.n2vc_conn import N2VCConnector
20from juju.model import ModelEntity, Model
21from juju.client.overrides import Delta
David Garciac38a6962020-09-16 13:31:33 +020022from juju.status import derive_status
23from juju.application import Application
David Garcia2f66c4d2020-06-19 11:40:18 +020024from websockets.exceptions import ConnectionClosed
David Garcia4fee80e2020-05-13 12:18:38 +020025import logging
26
27logger = logging.getLogger("__main__")
28
29
David Garciac38a6962020-09-16 13:31:33 +020030def status(application: Application) -> str:
31 unit_status = []
32 for unit in application.units:
33 unit_status.append(unit.workload_status)
34 return derive_status(unit_status)
35
36
37def entity_ready(entity: ModelEntity) -> bool:
38 entity_type = entity.entity_type
39 if entity_type == "machine":
40 return entity.agent_status in ["started"]
41 elif entity_type == "action":
42 return entity.status in ["completed", "failed", "cancelled"]
43 elif entity_type == "application":
44 # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
45 return status(entity) in ["active", "blocked"]
46 else:
47 raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
48
49
David Garcia4fee80e2020-05-13 12:18:38 +020050class JujuModelWatcher:
51 @staticmethod
David Garcia667696e2020-09-22 14:52:32 +020052 async def wait_for_model(model: Model, timeout: float = 3600):
53 """
54 Wait for all entities in model to reach its final state.
55
56 :param: model: Model to observe
57 :param: timeout: Timeout for the model applications to be active
58
59 :raises: asyncio.TimeoutError when timeout reaches
60 """
61
62 if timeout is None:
63 timeout = 3600.0
64
65 # Coroutine to wait until the entity reaches the final state
66 wait_for_entity = asyncio.ensure_future(
67 asyncio.wait_for(
68 model.block_until(
69 lambda: all(
70 entity_ready(entity) for entity in model.applications.values()
71 )
72 ),
73 timeout=timeout,
74 )
75 )
76
77 tasks = [wait_for_entity]
78 try:
79 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
80 finally:
81 # Cancel tasks
82 for task in tasks:
83 task.cancel()
84
85 @staticmethod
David Garcia4fee80e2020-05-13 12:18:38 +020086 async def wait_for(
David Garcia667696e2020-09-22 14:52:32 +020087 model: Model,
David Garcia4fee80e2020-05-13 12:18:38 +020088 entity: ModelEntity,
89 progress_timeout: float = 3600,
90 total_timeout: float = 3600,
91 db_dict: dict = None,
92 n2vc: N2VCConnector = None,
93 ):
94 """
95 Wait for entity to reach its final state.
96
97 :param: model: Model to observe
98 :param: entity: Entity object
99 :param: progress_timeout: Maximum time between two updates in the model
100 :param: total_timeout: Timeout for the entity to be active
101 :param: db_dict: Dictionary with data of the DB to write the updates
102 :param: n2vc: N2VC Connector objector
103
104 :raises: asyncio.TimeoutError when timeout reaches
105 """
106
107 if progress_timeout is None:
108 progress_timeout = 3600.0
109 if total_timeout is None:
110 total_timeout = 3600.0
111
David Garciac38a6962020-09-16 13:31:33 +0200112 entity_type = entity.entity_type
113 if entity_type not in ["application", "action", "machine"]:
114 raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
David Garcia4fee80e2020-05-13 12:18:38 +0200115
116 # Coroutine to wait until the entity reaches the final state
117 wait_for_entity = asyncio.ensure_future(
118 asyncio.wait_for(
David Garciac38a6962020-09-16 13:31:33 +0200119 model.block_until(lambda: entity_ready(entity)), timeout=total_timeout,
David Garcia4fee80e2020-05-13 12:18:38 +0200120 )
121 )
122
123 # Coroutine to watch the model for changes (and write them to DB)
124 watcher = asyncio.ensure_future(
125 JujuModelWatcher.model_watcher(
126 model,
127 entity_id=entity.entity_id,
128 entity_type=entity_type,
129 timeout=progress_timeout,
130 db_dict=db_dict,
131 n2vc=n2vc,
132 )
133 )
134
135 tasks = [wait_for_entity, watcher]
136 try:
137 # Execute tasks, and stop when the first is finished
138 # The watcher task won't never finish (unless it timeouts)
139 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
David Garcia4fee80e2020-05-13 12:18:38 +0200140 finally:
141 # Cancel tasks
142 for task in tasks:
143 task.cancel()
144
145 @staticmethod
146 async def model_watcher(
147 model: Model,
148 entity_id: str,
David Garciac38a6962020-09-16 13:31:33 +0200149 entity_type: str,
David Garcia4fee80e2020-05-13 12:18:38 +0200150 timeout: float,
151 db_dict: dict = None,
152 n2vc: N2VCConnector = None,
153 ):
154 """
155 Observes the changes related to an specific entity in a model
156
157 :param: model: Model to observe
158 :param: entity_id: ID of the entity to be observed
David Garciac38a6962020-09-16 13:31:33 +0200159 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
David Garcia4fee80e2020-05-13 12:18:38 +0200160 :param: timeout: Maximum time between two updates in the model
161 :param: db_dict: Dictionary with data of the DB to write the updates
162 :param: n2vc: N2VC Connector objector
163
164 :raises: asyncio.TimeoutError when timeout reaches
165 """
166
167 allwatcher = client.AllWatcherFacade.from_connection(model.connection())
168
169 # Genenerate array with entity types to listen
170 entity_types = (
David Garciac38a6962020-09-16 13:31:33 +0200171 [entity_type, "unit"]
172 if entity_type == "application" # TODO: Add "action" too
David Garcia4fee80e2020-05-13 12:18:38 +0200173 else [entity_type]
174 )
175
176 # Get time when it should timeout
177 timeout_end = time.time() + timeout
178
David Garcia2f66c4d2020-06-19 11:40:18 +0200179 try:
180 while True:
181 change = await allwatcher.Next()
182 for delta in change.deltas:
183 write = False
184 delta_entity = None
David Garcia4fee80e2020-05-13 12:18:38 +0200185
David Garcia2f66c4d2020-06-19 11:40:18 +0200186 # Get delta EntityType
David Garciac38a6962020-09-16 13:31:33 +0200187 delta_entity = delta.entity
David Garcia4fee80e2020-05-13 12:18:38 +0200188
David Garcia2f66c4d2020-06-19 11:40:18 +0200189 if delta_entity in entity_types:
190 # Get entity id
David Garciac38a6962020-09-16 13:31:33 +0200191 if entity_type == "application":
David Garcia2f66c4d2020-06-19 11:40:18 +0200192 id = (
193 delta.data["application"]
David Garciac38a6962020-09-16 13:31:33 +0200194 if delta_entity == "unit"
David Garcia2f66c4d2020-06-19 11:40:18 +0200195 else delta.data["name"]
196 )
197 else:
198 id = delta.data["id"]
199
200 # Write if the entity id match
201 write = True if id == entity_id else False
202
203 # Update timeout
204 timeout_end = time.time() + timeout
David Garciac38a6962020-09-16 13:31:33 +0200205 (
206 status,
207 status_message,
208 vca_status,
209 ) = JujuModelWatcher.get_status(delta)
David Garcia4fee80e2020-05-13 12:18:38 +0200210
David Garcia2f66c4d2020-06-19 11:40:18 +0200211 if write and n2vc is not None and db_dict:
212 # Write status to DB
213 status = n2vc.osm_status(delta_entity, status)
214 await n2vc.write_app_status_to_db(
215 db_dict=db_dict,
216 status=status,
217 detailed_status=status_message,
218 vca_status=vca_status,
David Garciac38a6962020-09-16 13:31:33 +0200219 entity_type=delta_entity,
David Garcia2f66c4d2020-06-19 11:40:18 +0200220 )
221 # Check if timeout
222 if time.time() > timeout_end:
223 raise asyncio.TimeoutError()
224 except ConnectionClosed:
225 pass
226 # This is expected to happen when the
227 # entity reaches its final state, because
228 # the model connection is closed afterwards
David Garcia4fee80e2020-05-13 12:18:38 +0200229
230 @staticmethod
David Garciac38a6962020-09-16 13:31:33 +0200231 def get_status(delta: Delta) -> (str, str, str):
David Garcia4fee80e2020-05-13 12:18:38 +0200232 """
233 Get status from delta
234
235 :param: delta: Delta generated by the allwatcher
David Garciac38a6962020-09-16 13:31:33 +0200236 :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
David Garcia4fee80e2020-05-13 12:18:38 +0200237
238 :return (status, message, vca_status)
239 """
David Garciac38a6962020-09-16 13:31:33 +0200240 if delta.entity == "machine":
David Garcia4fee80e2020-05-13 12:18:38 +0200241 return (
242 delta.data["agent-status"]["current"],
243 delta.data["instance-status"]["message"],
244 delta.data["instance-status"]["current"],
245 )
David Garciac38a6962020-09-16 13:31:33 +0200246 elif delta.entity == "action":
David Garcia4fee80e2020-05-13 12:18:38 +0200247 return (
248 delta.data["status"],
249 delta.data["status"],
250 delta.data["status"],
251 )
David Garciac38a6962020-09-16 13:31:33 +0200252 elif delta.entity == "application":
David Garcia4fee80e2020-05-13 12:18:38 +0200253 return (
254 delta.data["status"]["current"],
255 delta.data["status"]["message"],
256 delta.data["status"]["current"],
257 )
David Garciac38a6962020-09-16 13:31:33 +0200258 elif delta.entity == "unit":
David Garcia4fee80e2020-05-13 12:18:38 +0200259 return (
260 delta.data["workload-status"]["current"],
261 delta.data["workload-status"]["message"],
262 delta.data["workload-status"]["current"],
263 )