Fix missing await
[osm/N2VC.git] / n2vc / juju_observer.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import time
25
26 from juju.action import Action
27 from juju.application import Application
28 from juju.machine import Machine
29 from juju.model import ModelObserver, Model
30
31 from n2vc.exceptions import N2VCTimeoutException
32 from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
33
34
35 class _Entity:
36 def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
37 self.entity_id = entity_id
38 self.entity_type = entity_type
39 self.obj = obj
40 self.event = asyncio.Event()
41 self.db_dict = db_dict
42
43
44 class JujuModelObserver(ModelObserver):
45 def __init__(self, n2vc: N2VCConnector, model: Model):
46 self.n2vc = n2vc
47 self.model = model
48 model.add_observer(self)
49 self.machines = dict()
50 self.applications = dict()
51 self.actions = dict()
52
53 def register_machine(self, machine: Machine, db_dict: dict):
54 try:
55 entity_id = machine.entity_id
56 except Exception:
57 # no entity_id aatribute, try machine attribute
58 entity_id = machine.machine
59 # self.n2vc.debug(
60 # msg='Registering machine for change notifications: {}'.format(entity_id))
61 entity = _Entity(
62 entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict
63 )
64 self.machines[entity_id] = entity
65
66 def unregister_machine(self, machine_id: str):
67 if machine_id in self.machines:
68 del self.machines[machine_id]
69
70 def is_machine_registered(self, machine_id: str):
71 return machine_id in self.machines
72
73 def register_application(self, application: Application, db_dict: dict):
74 entity_id = application.entity_id
75 # self.n2vc.debug(
76 # msg='Registering application for change notifications: {}'.format(entity_id))
77 entity = _Entity(
78 entity_id=entity_id,
79 entity_type="application",
80 obj=application,
81 db_dict=db_dict,
82 )
83 self.applications[entity_id] = entity
84
85 def unregister_application(self, application_id: str):
86 if application_id in self.applications:
87 del self.applications[application_id]
88
89 def is_application_registered(self, application_id: str):
90 return application_id in self.applications
91
92 def register_action(self, action: Action, db_dict: dict):
93 entity_id = action.entity_id
94 # self.n2vc.debug(
95 # msg='Registering action for changes notifications: {}'.format(entity_id))
96 entity = _Entity(
97 entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict
98 )
99 self.actions[entity_id] = entity
100
101 def unregister_action(self, action_id: str):
102 if action_id in self.actions:
103 del self.actions[action_id]
104
105 def is_action_registered(self, action_id: str):
106 return action_id in self.actions
107
108 async def wait_for_machine(
109 self,
110 machine_id: str,
111 progress_timeout: float = None,
112 total_timeout: float = None,
113 ) -> int:
114
115 if not self.is_machine_registered(machine_id):
116 return
117
118 self.n2vc.debug("Waiting for machine completed: {}".format(machine_id))
119
120 # wait for a final state
121 entity = self.machines[machine_id]
122 return await self._wait_for_entity(
123 entity=entity,
124 field_to_check="agent_status",
125 final_states_list=["started"],
126 progress_timeout=progress_timeout,
127 total_timeout=total_timeout,
128 )
129
130 async def wait_for_application(
131 self,
132 application_id: str,
133 progress_timeout: float = None,
134 total_timeout: float = None,
135 ) -> int:
136
137 if not self.is_application_registered(application_id):
138 return
139
140 self.n2vc.debug("Waiting for application completed: {}".format(application_id))
141
142 # application statuses: unknown, active, waiting
143 # wait for a final state
144 entity = self.applications[application_id]
145 return await self._wait_for_entity(
146 entity=entity,
147 field_to_check="status",
148 final_states_list=["active", "blocked"],
149 progress_timeout=progress_timeout,
150 total_timeout=total_timeout,
151 )
152
153 async def wait_for_action(
154 self,
155 action_id: str,
156 progress_timeout: float = None,
157 total_timeout: float = None,
158 ) -> int:
159
160 if not self.is_action_registered(action_id):
161 return
162
163 self.n2vc.debug("Waiting for action completed: {}".format(action_id))
164
165 # action statuses: pending, running, completed, failed, cancelled
166 # wait for a final state
167 entity = self.actions[action_id]
168 return await self._wait_for_entity(
169 entity=entity,
170 field_to_check="status",
171 final_states_list=["completed", "failed", "cancelled"],
172 progress_timeout=progress_timeout,
173 total_timeout=total_timeout,
174 )
175
176 async def _wait_for_entity(
177 self,
178 entity: _Entity,
179 field_to_check: str,
180 final_states_list: list,
181 progress_timeout: float = None,
182 total_timeout: float = None,
183 ) -> int:
184
185 # default values for no timeout
186 if total_timeout is None:
187 total_timeout = 3600
188 if progress_timeout is None:
189 progress_timeout = 3600
190
191 # max end time
192 now = time.time()
193 total_end = now + total_timeout
194
195 if now >= total_end:
196 raise N2VCTimeoutException(
197 message="Total timeout {} seconds, {}: {}".format(
198 total_timeout, entity.entity_type, entity.entity_id
199 ),
200 timeout="total",
201 )
202
203 # update next progress timeout
204 progress_end = now + progress_timeout # type: float
205
206 # which is closest? progress or end timeout?
207 closest_end = min(total_end, progress_end)
208
209 next_timeout = closest_end - now
210
211 retries = 0
212
213 while entity.obj.__getattribute__(field_to_check) not in final_states_list:
214 retries += 1
215 if await _wait_for_event_or_timeout(entity.event, next_timeout):
216 entity.event.clear()
217 else:
218 message = "Progress timeout {} seconds, {}: {}".format(
219 progress_timeout, entity.entity_type, entity.entity_id
220 )
221 self.n2vc.debug(message)
222 raise N2VCTimeoutException(message=message, timeout="progress")
223 # self.n2vc.debug('End of wait. Final state: {}, retries: {}'
224 # .format(entity.obj.__getattribute__(field_to_check), retries))
225 return retries
226
227 async def on_change(self, delta, old, new, model):
228
229 if new is None:
230 return
231
232 # log
233 # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
234 # .format(delta.type, delta.entity, new.entity_id))
235
236 if delta.entity == "machine":
237
238 # check registered machine
239 if new.entity_id not in self.machines:
240 return
241
242 # write change in database
243 await self.n2vc.write_app_status_to_db(
244 db_dict=self.machines[new.entity_id].db_dict,
245 status=juju_status_2_osm_status(delta.entity, new.agent_status),
246 detailed_status=new.status_message,
247 vca_status=new.status,
248 entity_type="machine",
249 )
250
251 # set event for this machine
252 self.machines[new.entity_id].event.set()
253
254 elif delta.entity == "application":
255
256 # check registered application
257 if new.entity_id not in self.applications:
258 return
259
260 # write change in database
261 await self.n2vc.write_app_status_to_db(
262 db_dict=self.applications[new.entity_id].db_dict,
263 status=juju_status_2_osm_status(delta.entity, new.status),
264 detailed_status=new.status_message,
265 vca_status=new.status,
266 entity_type="application",
267 )
268
269 # set event for this application
270 self.applications[new.entity_id].event.set()
271
272 elif delta.entity == "unit":
273
274 # get the application for this unit
275 application_id = delta.data["application"]
276
277 # check registered application
278 if application_id not in self.applications:
279 return
280
281 # write change in database
282 if not new.dead:
283 await self.n2vc.write_app_status_to_db(
284 db_dict=self.applications[application_id].db_dict,
285 status=juju_status_2_osm_status(delta.entity, new.workload_status),
286 detailed_status=new.workload_status_message,
287 vca_status=new.workload_status,
288 entity_type="unit",
289 )
290
291 # set event for this application
292 self.applications[application_id].event.set()
293
294 elif delta.entity == "action":
295
296 # check registered action
297 if new.entity_id not in self.actions:
298 return
299
300 # write change in database
301 await self.n2vc.write_app_status_to_db(
302 db_dict=self.actions[new.entity_id].db_dict,
303 status=juju_status_2_osm_status(delta.entity, new.status),
304 detailed_status=new.status,
305 vca_status=new.status,
306 entity_type="action",
307 )
308
309 # set event for this application
310 self.actions[new.entity_id].event.set()
311
312
313 async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
314 try:
315 await asyncio.wait_for(fut=event.wait(), timeout=timeout)
316 except asyncio.TimeoutError:
317 pass
318 return event.is_set()