blob: 25c1c1f6588c7906ce25ec795a1b985ad9494bf7 [file] [log] [blame]
quilesj29114342019-10-29 09:30:44 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
23import asyncio
24import time
25
26from juju.model import ModelObserver, Model
27from juju.machine import Machine
28from juju.application import Application
29from juju.action import Action
30
31from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
32from n2vc.exceptions import N2VCTimeoutException
33
34
35class _Entity:
36 def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
37 self.entity_id = entity_id
38 self.entity_type = entity_type
39 self.obj = obj
40 self.event = asyncio.Event()
41 self.db_dict = db_dict
42
43
44class JujuModelObserver(ModelObserver):
45
46 def __init__(self, n2vc: N2VCConnector, model: Model):
47 self.n2vc = n2vc
48 self.model = model
49 model.add_observer(self)
50 self.machines = dict()
51 self.applications = dict()
52 self.actions = dict()
53
54 def register_machine(self, machine: Machine, db_dict: dict):
quilesjac4e0de2019-11-27 16:12:02 +000055 try:
56 entity_id = machine.entity_id
quilesj073e1692019-11-29 11:19:14 +000057 except Exception as e:
quilesjac4e0de2019-11-27 16:12:02 +000058 # no entity_id aatribute, try machine attribute
59 entity_id = machine.machine
60 self.n2vc.debug(msg='Registering machine for changes notifications: {}'.format(entity_id))
quilesj29114342019-10-29 09:30:44 +010061 entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
62 self.machines[entity_id] = entity
63
64 def unregister_machine(self, machine_id: str):
65 if machine_id in self.machines:
66 del self.machines[machine_id]
67
68 def is_machine_registered(self, machine_id: str):
69 return machine_id in self.machines
70
71 def register_application(self, application: Application, db_dict: dict):
72 entity_id = application.entity_id
quilesjac4e0de2019-11-27 16:12:02 +000073 self.n2vc.debug(msg='Registering application for changes notifications: {}'.format(entity_id))
quilesj29114342019-10-29 09:30:44 +010074 entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
75 self.applications[entity_id] = entity
76
77 def unregister_application(self, application_id: str):
78 if application_id in self.applications:
79 del self.applications[application_id]
80
81 def is_application_registered(self, application_id: str):
82 return application_id in self.applications
83
84 def register_action(self, action: Action, db_dict: dict):
85 entity_id = action.entity_id
quilesjac4e0de2019-11-27 16:12:02 +000086 self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
quilesj29114342019-10-29 09:30:44 +010087 entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
88 self.actions[entity_id] = entity
89
90 def unregister_action(self, action_id: str):
91 if action_id in self.actions:
92 del self.actions[action_id]
93
94 def is_action_registered(self, action_id: str):
95 return action_id in self.actions
96
97 async def wait_for_machine(
98 self,
99 machine_id: str,
100 progress_timeout: float = None,
101 total_timeout: float = None) -> int:
102
103 if not self.is_machine_registered(machine_id):
104 return
105
106 # wait for a final state
107 entity = self.machines[machine_id]
108 return await self._wait_for_entity(
109 entity=entity,
110 field_to_check='agent_status',
111 final_states_list=['started'],
112 progress_timeout=progress_timeout,
113 total_timeout=total_timeout)
114
115 async def wait_for_application(
116 self,
117 application_id: str,
118 progress_timeout: float = None,
119 total_timeout: float = None) -> int:
120
121 if not self.is_application_registered(application_id):
122 return
123
124 # application statuses: unknown, active, waiting
125 # wait for a final state
126 entity = self.applications[application_id]
127 return await self._wait_for_entity(
128 entity=entity,
129 field_to_check='status',
130 final_states_list=['active', 'blocked'],
131 progress_timeout=progress_timeout,
132 total_timeout=total_timeout)
133
134 async def wait_for_action(
135 self,
136 action_id: str,
137 progress_timeout: float = None,
138 total_timeout: float = None) -> int:
139
140 if not self.is_action_registered(action_id):
141 return
142
143 # action statuses: pending, running, completed, failed, cancelled
144 # wait for a final state
145 entity = self.actions[action_id]
146 return await self._wait_for_entity(
147 entity=entity,
148 field_to_check='status',
149 final_states_list=['completed', 'failed', 'cancelled'],
150 progress_timeout=progress_timeout,
151 total_timeout=total_timeout)
152
153 async def _wait_for_entity(
154 self,
155 entity: _Entity,
156 field_to_check: str,
157 final_states_list: list,
158 progress_timeout: float = None,
159 total_timeout: float = None) -> int:
160
161 # default values for no timeout
162 if total_timeout is None:
163 total_timeout = 100000
164 if progress_timeout is None:
165 progress_timeout = 100000
166
167 # max end time
168 now = time.time()
169 total_end = now + total_timeout
170
171 if now >= total_end:
172 raise N2VCTimeoutException(
173 message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id),
174 timeout='total'
175 )
176
177 # update next progress timeout
178 progress_end = now + progress_timeout # type: float
179
180 # which is closest? progress or end timeout?
181 closest_end = min(total_end, progress_end)
182
183 next_timeout = closest_end - now
184
185 retries = 0
186
187 while entity.obj.__getattribute__(field_to_check) not in final_states_list:
188 retries += 1
189 if await _wait_for_event_or_timeout(entity.event, next_timeout):
190 entity.event.clear()
191 else:
192 message = 'Progress timeout {} seconds, {}}: {}'\
193 .format(progress_timeout, entity.entity_type, entity.entity_id)
194 self.n2vc.debug(message)
195 raise N2VCTimeoutException(message=message, timeout='progress')
196 self.n2vc.debug('End of wait. Final state: {}, retries: {}'
197 .format(entity.obj.__getattribute__(field_to_check), retries))
198 return retries
199
200 async def on_change(self, delta, old, new, model):
201
202 if new is None:
203 return
204
205 # log
206 self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
207 .format(delta.type, delta.entity, new.entity_id))
208
209 if delta.entity == 'machine':
210
211 # check registered machine
212 if new.entity_id not in self.machines:
213 return
214
215 # write change in database
216 await self.n2vc.write_app_status_to_db(
217 db_dict=self.machines[new.entity_id].db_dict,
218 status=juju_status_2_osm_status(delta.entity, new.agent_status),
219 detailed_status=new.status_message,
220 vca_status=new.status,
221 entity_type='machine'
222 )
223
224 # set event for this machine
225 self.machines[new.entity_id].event.set()
226
227 elif delta.entity == 'application':
228
229 # check registered application
230 if new.entity_id not in self.applications:
231 return
232
233 # write change in database
234 await self.n2vc.write_app_status_to_db(
235 db_dict=self.applications[new.entity_id].db_dict,
236 status=juju_status_2_osm_status(delta.entity, new.status),
237 detailed_status=new.status_message,
238 vca_status=new.status,
239 entity_type='application'
240 )
241
242 # set event for this application
243 self.applications[new.entity_id].event.set()
244
245 elif delta.entity == 'unit':
246
247 # get the application for this unit
248 application_id = delta.data['application']
249
250 # check registered application
251 if application_id not in self.applications:
252 return
253
254 # write change in database
255 await self.n2vc.write_app_status_to_db(
256 db_dict=self.applications[application_id].db_dict,
257 status=juju_status_2_osm_status(delta.entity, new.workload_status),
258 detailed_status=new.workload_status_message,
259 vca_status=new.workload_status,
260 entity_type='unit'
261 )
262
263 # set event for this application
264 self.applications[application_id].event.set()
265
266 elif delta.entity == 'action':
267
268 # check registered action
269 if new.entity_id not in self.actions:
270 return
271
272 # write change in database
273 await self.n2vc.write_app_status_to_db(
274 db_dict=self.actions[new.entity_id].db_dict,
275 status=juju_status_2_osm_status(delta.entity, new.status),
276 detailed_status=new.status,
277 vca_status=new.status,
278 entity_type='action'
279 )
280
281 # set event for this application
282 self.actions[new.entity_id].event.set()
283
284
285async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
286 try:
287 await asyncio.wait_for(fut=event.wait(), timeout=timeout)
288 except asyncio.TimeoutError:
289 pass
290 return event.is_set()