blob: ac40f34dcb122b43ed0de5580fdd7daefb25ee24 [file] [log] [blame]
quilesj29114342019-10-29 09:30:44 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
23import asyncio
24import time
25
26from juju.model import ModelObserver, Model
27from juju.machine import Machine
28from juju.application import Application
29from juju.action import Action
30
31from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
32from n2vc.exceptions import N2VCTimeoutException
33
34
35class _Entity:
36 def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
37 self.entity_id = entity_id
38 self.entity_type = entity_type
39 self.obj = obj
40 self.event = asyncio.Event()
41 self.db_dict = db_dict
42
43
44class JujuModelObserver(ModelObserver):
45
46 def __init__(self, n2vc: N2VCConnector, model: Model):
47 self.n2vc = n2vc
48 self.model = model
49 model.add_observer(self)
50 self.machines = dict()
51 self.applications = dict()
52 self.actions = dict()
53
54 def register_machine(self, machine: Machine, db_dict: dict):
55 entity_id = machine.entity_id
56 entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
57 self.machines[entity_id] = entity
58
59 def unregister_machine(self, machine_id: str):
60 if machine_id in self.machines:
61 del self.machines[machine_id]
62
63 def is_machine_registered(self, machine_id: str):
64 return machine_id in self.machines
65
66 def register_application(self, application: Application, db_dict: dict):
67 entity_id = application.entity_id
68 entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
69 self.applications[entity_id] = entity
70
71 def unregister_application(self, application_id: str):
72 if application_id in self.applications:
73 del self.applications[application_id]
74
75 def is_application_registered(self, application_id: str):
76 return application_id in self.applications
77
78 def register_action(self, action: Action, db_dict: dict):
79 entity_id = action.entity_id
80 entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
81 self.actions[entity_id] = entity
82
83 def unregister_action(self, action_id: str):
84 if action_id in self.actions:
85 del self.actions[action_id]
86
87 def is_action_registered(self, action_id: str):
88 return action_id in self.actions
89
90 async def wait_for_machine(
91 self,
92 machine_id: str,
93 progress_timeout: float = None,
94 total_timeout: float = None) -> int:
95
96 if not self.is_machine_registered(machine_id):
97 return
98
99 # wait for a final state
100 entity = self.machines[machine_id]
101 return await self._wait_for_entity(
102 entity=entity,
103 field_to_check='agent_status',
104 final_states_list=['started'],
105 progress_timeout=progress_timeout,
106 total_timeout=total_timeout)
107
108 async def wait_for_application(
109 self,
110 application_id: str,
111 progress_timeout: float = None,
112 total_timeout: float = None) -> int:
113
114 if not self.is_application_registered(application_id):
115 return
116
117 # application statuses: unknown, active, waiting
118 # wait for a final state
119 entity = self.applications[application_id]
120 return await self._wait_for_entity(
121 entity=entity,
122 field_to_check='status',
123 final_states_list=['active', 'blocked'],
124 progress_timeout=progress_timeout,
125 total_timeout=total_timeout)
126
127 async def wait_for_action(
128 self,
129 action_id: str,
130 progress_timeout: float = None,
131 total_timeout: float = None) -> int:
132
133 if not self.is_action_registered(action_id):
134 return
135
136 # action statuses: pending, running, completed, failed, cancelled
137 # wait for a final state
138 entity = self.actions[action_id]
139 return await self._wait_for_entity(
140 entity=entity,
141 field_to_check='status',
142 final_states_list=['completed', 'failed', 'cancelled'],
143 progress_timeout=progress_timeout,
144 total_timeout=total_timeout)
145
146 async def _wait_for_entity(
147 self,
148 entity: _Entity,
149 field_to_check: str,
150 final_states_list: list,
151 progress_timeout: float = None,
152 total_timeout: float = None) -> int:
153
154 # default values for no timeout
155 if total_timeout is None:
156 total_timeout = 100000
157 if progress_timeout is None:
158 progress_timeout = 100000
159
160 # max end time
161 now = time.time()
162 total_end = now + total_timeout
163
164 if now >= total_end:
165 raise N2VCTimeoutException(
166 message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id),
167 timeout='total'
168 )
169
170 # update next progress timeout
171 progress_end = now + progress_timeout # type: float
172
173 # which is closest? progress or end timeout?
174 closest_end = min(total_end, progress_end)
175
176 next_timeout = closest_end - now
177
178 retries = 0
179
180 while entity.obj.__getattribute__(field_to_check) not in final_states_list:
181 retries += 1
182 if await _wait_for_event_or_timeout(entity.event, next_timeout):
183 entity.event.clear()
184 else:
185 message = 'Progress timeout {} seconds, {}}: {}'\
186 .format(progress_timeout, entity.entity_type, entity.entity_id)
187 self.n2vc.debug(message)
188 raise N2VCTimeoutException(message=message, timeout='progress')
189 self.n2vc.debug('End of wait. Final state: {}, retries: {}'
190 .format(entity.obj.__getattribute__(field_to_check), retries))
191 return retries
192
193 async def on_change(self, delta, old, new, model):
194
195 if new is None:
196 return
197
198 # log
199 self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
200 .format(delta.type, delta.entity, new.entity_id))
201
202 if delta.entity == 'machine':
203
204 # check registered machine
205 if new.entity_id not in self.machines:
206 return
207
208 # write change in database
209 await self.n2vc.write_app_status_to_db(
210 db_dict=self.machines[new.entity_id].db_dict,
211 status=juju_status_2_osm_status(delta.entity, new.agent_status),
212 detailed_status=new.status_message,
213 vca_status=new.status,
214 entity_type='machine'
215 )
216
217 # set event for this machine
218 self.machines[new.entity_id].event.set()
219
220 elif delta.entity == 'application':
221
222 # check registered application
223 if new.entity_id not in self.applications:
224 return
225
226 # write change in database
227 await self.n2vc.write_app_status_to_db(
228 db_dict=self.applications[new.entity_id].db_dict,
229 status=juju_status_2_osm_status(delta.entity, new.status),
230 detailed_status=new.status_message,
231 vca_status=new.status,
232 entity_type='application'
233 )
234
235 # set event for this application
236 self.applications[new.entity_id].event.set()
237
238 elif delta.entity == 'unit':
239
240 # get the application for this unit
241 application_id = delta.data['application']
242
243 # check registered application
244 if application_id not in self.applications:
245 return
246
247 # write change in database
248 await self.n2vc.write_app_status_to_db(
249 db_dict=self.applications[application_id].db_dict,
250 status=juju_status_2_osm_status(delta.entity, new.workload_status),
251 detailed_status=new.workload_status_message,
252 vca_status=new.workload_status,
253 entity_type='unit'
254 )
255
256 # set event for this application
257 self.applications[application_id].event.set()
258
259 elif delta.entity == 'action':
260
261 # check registered action
262 if new.entity_id not in self.actions:
263 return
264
265 # write change in database
266 await self.n2vc.write_app_status_to_db(
267 db_dict=self.actions[new.entity_id].db_dict,
268 status=juju_status_2_osm_status(delta.entity, new.status),
269 detailed_status=new.status,
270 vca_status=new.status,
271 entity_type='action'
272 )
273
274 # set event for this application
275 self.actions[new.entity_id].event.set()
276
277
278async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
279 try:
280 await asyncio.wait_for(fut=event.wait(), timeout=timeout)
281 except asyncio.TimeoutError:
282 pass
283 return event.is_set()