Merge "Add missing argument in notify_callback"
[osm/N2VC.git] / n2vc / juju_observer.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import time
25
26 from juju.model import ModelObserver, Model
27 from juju.machine import Machine
28 from juju.application import Application
29 from juju.action import Action
30
31 from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
32 from n2vc.exceptions import N2VCTimeoutException
33
34
35 class _Entity:
36 def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
37 self.entity_id = entity_id
38 self.entity_type = entity_type
39 self.obj = obj
40 self.event = asyncio.Event()
41 self.db_dict = db_dict
42
43
44 class JujuModelObserver(ModelObserver):
45
46 def __init__(self, n2vc: N2VCConnector, model: Model):
47 self.n2vc = n2vc
48 self.model = model
49 model.add_observer(self)
50 self.machines = dict()
51 self.applications = dict()
52 self.actions = dict()
53
54 def register_machine(self, machine: Machine, db_dict: dict):
55 try:
56 entity_id = machine.entity_id
57 except:
58 # no entity_id aatribute, try machine attribute
59 entity_id = machine.machine
60 self.n2vc.debug(msg='Registering machine for changes notifications: {}'.format(entity_id))
61 entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
62 self.machines[entity_id] = entity
63
64 def unregister_machine(self, machine_id: str):
65 if machine_id in self.machines:
66 del self.machines[machine_id]
67
68 def is_machine_registered(self, machine_id: str):
69 return machine_id in self.machines
70
71 def register_application(self, application: Application, db_dict: dict):
72 entity_id = application.entity_id
73 self.n2vc.debug(msg='Registering application for changes notifications: {}'.format(entity_id))
74 entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
75 self.applications[entity_id] = entity
76
77 def unregister_application(self, application_id: str):
78 if application_id in self.applications:
79 del self.applications[application_id]
80
81 def is_application_registered(self, application_id: str):
82 return application_id in self.applications
83
84 def register_action(self, action: Action, db_dict: dict):
85 entity_id = action.entity_id
86 self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
87 entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
88 self.actions[entity_id] = entity
89
90 def unregister_action(self, action_id: str):
91 if action_id in self.actions:
92 del self.actions[action_id]
93
94 def is_action_registered(self, action_id: str):
95 return action_id in self.actions
96
97 async def wait_for_machine(
98 self,
99 machine_id: str,
100 progress_timeout: float = None,
101 total_timeout: float = None) -> int:
102
103 if not self.is_machine_registered(machine_id):
104 return
105
106 # wait for a final state
107 entity = self.machines[machine_id]
108 return await self._wait_for_entity(
109 entity=entity,
110 field_to_check='agent_status',
111 final_states_list=['started'],
112 progress_timeout=progress_timeout,
113 total_timeout=total_timeout)
114
115 async def wait_for_application(
116 self,
117 application_id: str,
118 progress_timeout: float = None,
119 total_timeout: float = None) -> int:
120
121 if not self.is_application_registered(application_id):
122 return
123
124 # application statuses: unknown, active, waiting
125 # wait for a final state
126 entity = self.applications[application_id]
127 return await self._wait_for_entity(
128 entity=entity,
129 field_to_check='status',
130 final_states_list=['active', 'blocked'],
131 progress_timeout=progress_timeout,
132 total_timeout=total_timeout)
133
134 async def wait_for_action(
135 self,
136 action_id: str,
137 progress_timeout: float = None,
138 total_timeout: float = None) -> int:
139
140 if not self.is_action_registered(action_id):
141 return
142
143 # action statuses: pending, running, completed, failed, cancelled
144 # wait for a final state
145 entity = self.actions[action_id]
146 return await self._wait_for_entity(
147 entity=entity,
148 field_to_check='status',
149 final_states_list=['completed', 'failed', 'cancelled'],
150 progress_timeout=progress_timeout,
151 total_timeout=total_timeout)
152
153 async def _wait_for_entity(
154 self,
155 entity: _Entity,
156 field_to_check: str,
157 final_states_list: list,
158 progress_timeout: float = None,
159 total_timeout: float = None) -> int:
160
161 # default values for no timeout
162 if total_timeout is None:
163 total_timeout = 100000
164 if progress_timeout is None:
165 progress_timeout = 100000
166
167 # max end time
168 now = time.time()
169 total_end = now + total_timeout
170
171 if now >= total_end:
172 raise N2VCTimeoutException(
173 message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id),
174 timeout='total'
175 )
176
177 # update next progress timeout
178 progress_end = now + progress_timeout # type: float
179
180 # which is closest? progress or end timeout?
181 closest_end = min(total_end, progress_end)
182
183 next_timeout = closest_end - now
184
185 retries = 0
186
187 while entity.obj.__getattribute__(field_to_check) not in final_states_list:
188 retries += 1
189 if await _wait_for_event_or_timeout(entity.event, next_timeout):
190 entity.event.clear()
191 else:
192 message = 'Progress timeout {} seconds, {}}: {}'\
193 .format(progress_timeout, entity.entity_type, entity.entity_id)
194 self.n2vc.debug(message)
195 raise N2VCTimeoutException(message=message, timeout='progress')
196 self.n2vc.debug('End of wait. Final state: {}, retries: {}'
197 .format(entity.obj.__getattribute__(field_to_check), retries))
198 return retries
199
200 async def on_change(self, delta, old, new, model):
201
202 if new is None:
203 return
204
205 # log
206 self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
207 .format(delta.type, delta.entity, new.entity_id))
208
209 if delta.entity == 'machine':
210
211 # check registered machine
212 if new.entity_id not in self.machines:
213 return
214
215 # write change in database
216 await self.n2vc.write_app_status_to_db(
217 db_dict=self.machines[new.entity_id].db_dict,
218 status=juju_status_2_osm_status(delta.entity, new.agent_status),
219 detailed_status=new.status_message,
220 vca_status=new.status,
221 entity_type='machine'
222 )
223
224 # set event for this machine
225 self.machines[new.entity_id].event.set()
226
227 elif delta.entity == 'application':
228
229 # check registered application
230 if new.entity_id not in self.applications:
231 return
232
233 # write change in database
234 await self.n2vc.write_app_status_to_db(
235 db_dict=self.applications[new.entity_id].db_dict,
236 status=juju_status_2_osm_status(delta.entity, new.status),
237 detailed_status=new.status_message,
238 vca_status=new.status,
239 entity_type='application'
240 )
241
242 # set event for this application
243 self.applications[new.entity_id].event.set()
244
245 elif delta.entity == 'unit':
246
247 # get the application for this unit
248 application_id = delta.data['application']
249
250 # check registered application
251 if application_id not in self.applications:
252 return
253
254 # write change in database
255 await self.n2vc.write_app_status_to_db(
256 db_dict=self.applications[application_id].db_dict,
257 status=juju_status_2_osm_status(delta.entity, new.workload_status),
258 detailed_status=new.workload_status_message,
259 vca_status=new.workload_status,
260 entity_type='unit'
261 )
262
263 # set event for this application
264 self.applications[application_id].event.set()
265
266 elif delta.entity == 'action':
267
268 # check registered action
269 if new.entity_id not in self.actions:
270 return
271
272 # write change in database
273 await self.n2vc.write_app_status_to_db(
274 db_dict=self.actions[new.entity_id].db_dict,
275 status=juju_status_2_osm_status(delta.entity, new.status),
276 detailed_status=new.status,
277 vca_status=new.status,
278 entity_type='action'
279 )
280
281 # set event for this application
282 self.actions[new.entity_id].event.set()
283
284
285 async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
286 try:
287 await asyncio.wait_for(fut=event.wait(), timeout=timeout)
288 except asyncio.TimeoutError:
289 pass
290 return event.is_set()