blob: 10efa98b7f74e2169960d43cd91cf8ec222b383a [file] [log] [blame]
quilesj29114342019-10-29 09:30:44 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
23import asyncio
24import time
25
26from juju.model import ModelObserver, Model
27from juju.machine import Machine
28from juju.application import Application
29from juju.action import Action
30
31from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
32from n2vc.exceptions import N2VCTimeoutException
33
34
35class _Entity:
36 def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
37 self.entity_id = entity_id
38 self.entity_type = entity_type
39 self.obj = obj
40 self.event = asyncio.Event()
41 self.db_dict = db_dict
42
43
44class JujuModelObserver(ModelObserver):
45
46 def __init__(self, n2vc: N2VCConnector, model: Model):
47 self.n2vc = n2vc
48 self.model = model
49 model.add_observer(self)
50 self.machines = dict()
51 self.applications = dict()
52 self.actions = dict()
53
54 def register_machine(self, machine: Machine, db_dict: dict):
quilesjac4e0de2019-11-27 16:12:02 +000055 try:
56 entity_id = machine.entity_id
quilesj073e1692019-11-29 11:19:14 +000057 except Exception as e:
quilesjac4e0de2019-11-27 16:12:02 +000058 # no entity_id aatribute, try machine attribute
59 entity_id = machine.machine
quilesj776ab392019-12-12 16:10:54 +000060 # self.n2vc.debug(msg='Registering machine for change notifications: {}'.format(entity_id))
quilesj29114342019-10-29 09:30:44 +010061 entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
62 self.machines[entity_id] = entity
63
64 def unregister_machine(self, machine_id: str):
65 if machine_id in self.machines:
66 del self.machines[machine_id]
67
68 def is_machine_registered(self, machine_id: str):
69 return machine_id in self.machines
70
71 def register_application(self, application: Application, db_dict: dict):
72 entity_id = application.entity_id
quilesj776ab392019-12-12 16:10:54 +000073 # self.n2vc.debug(msg='Registering application for change notifications: {}'.format(entity_id))
quilesj29114342019-10-29 09:30:44 +010074 entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
75 self.applications[entity_id] = entity
76
77 def unregister_application(self, application_id: str):
78 if application_id in self.applications:
79 del self.applications[application_id]
80
81 def is_application_registered(self, application_id: str):
82 return application_id in self.applications
83
84 def register_action(self, action: Action, db_dict: dict):
85 entity_id = action.entity_id
quilesj776ab392019-12-12 16:10:54 +000086 # self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
quilesj29114342019-10-29 09:30:44 +010087 entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
88 self.actions[entity_id] = entity
89
90 def unregister_action(self, action_id: str):
91 if action_id in self.actions:
92 del self.actions[action_id]
93
94 def is_action_registered(self, action_id: str):
95 return action_id in self.actions
96
97 async def wait_for_machine(
98 self,
99 machine_id: str,
100 progress_timeout: float = None,
101 total_timeout: float = None) -> int:
102
103 if not self.is_machine_registered(machine_id):
104 return
105
quilesj776ab392019-12-12 16:10:54 +0000106 self.n2vc.debug('Waiting for machine completed: {}'.format(machine_id))
107
quilesj29114342019-10-29 09:30:44 +0100108 # wait for a final state
109 entity = self.machines[machine_id]
110 return await self._wait_for_entity(
111 entity=entity,
112 field_to_check='agent_status',
113 final_states_list=['started'],
114 progress_timeout=progress_timeout,
115 total_timeout=total_timeout)
116
117 async def wait_for_application(
118 self,
119 application_id: str,
120 progress_timeout: float = None,
121 total_timeout: float = None) -> int:
122
123 if not self.is_application_registered(application_id):
124 return
125
quilesj776ab392019-12-12 16:10:54 +0000126 self.n2vc.debug('Waiting for application completed: {}'.format(application_id))
127
quilesj29114342019-10-29 09:30:44 +0100128 # application statuses: unknown, active, waiting
129 # wait for a final state
130 entity = self.applications[application_id]
131 return await self._wait_for_entity(
132 entity=entity,
133 field_to_check='status',
134 final_states_list=['active', 'blocked'],
135 progress_timeout=progress_timeout,
136 total_timeout=total_timeout)
137
138 async def wait_for_action(
139 self,
140 action_id: str,
141 progress_timeout: float = None,
142 total_timeout: float = None) -> int:
143
144 if not self.is_action_registered(action_id):
145 return
146
quilesj776ab392019-12-12 16:10:54 +0000147 self.n2vc.debug('Waiting for action completed: {}'.format(action_id))
148
quilesj29114342019-10-29 09:30:44 +0100149 # action statuses: pending, running, completed, failed, cancelled
150 # wait for a final state
151 entity = self.actions[action_id]
152 return await self._wait_for_entity(
153 entity=entity,
154 field_to_check='status',
155 final_states_list=['completed', 'failed', 'cancelled'],
156 progress_timeout=progress_timeout,
157 total_timeout=total_timeout)
158
159 async def _wait_for_entity(
160 self,
161 entity: _Entity,
162 field_to_check: str,
163 final_states_list: list,
164 progress_timeout: float = None,
165 total_timeout: float = None) -> int:
166
167 # default values for no timeout
168 if total_timeout is None:
169 total_timeout = 100000
170 if progress_timeout is None:
171 progress_timeout = 100000
172
173 # max end time
174 now = time.time()
175 total_end = now + total_timeout
176
177 if now >= total_end:
178 raise N2VCTimeoutException(
179 message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id),
180 timeout='total'
181 )
182
183 # update next progress timeout
184 progress_end = now + progress_timeout # type: float
185
186 # which is closest? progress or end timeout?
187 closest_end = min(total_end, progress_end)
188
189 next_timeout = closest_end - now
190
191 retries = 0
192
193 while entity.obj.__getattribute__(field_to_check) not in final_states_list:
194 retries += 1
195 if await _wait_for_event_or_timeout(entity.event, next_timeout):
196 entity.event.clear()
197 else:
198 message = 'Progress timeout {} seconds, {}}: {}'\
199 .format(progress_timeout, entity.entity_type, entity.entity_id)
200 self.n2vc.debug(message)
201 raise N2VCTimeoutException(message=message, timeout='progress')
quilesj776ab392019-12-12 16:10:54 +0000202 # self.n2vc.debug('End of wait. Final state: {}, retries: {}'
203 # .format(entity.obj.__getattribute__(field_to_check), retries))
quilesj29114342019-10-29 09:30:44 +0100204 return retries
205
206 async def on_change(self, delta, old, new, model):
207
208 if new is None:
209 return
210
211 # log
quilesj776ab392019-12-12 16:10:54 +0000212 # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
213 # .format(delta.type, delta.entity, new.entity_id))
quilesj29114342019-10-29 09:30:44 +0100214
215 if delta.entity == 'machine':
216
217 # check registered machine
218 if new.entity_id not in self.machines:
219 return
220
221 # write change in database
222 await self.n2vc.write_app_status_to_db(
223 db_dict=self.machines[new.entity_id].db_dict,
224 status=juju_status_2_osm_status(delta.entity, new.agent_status),
225 detailed_status=new.status_message,
226 vca_status=new.status,
227 entity_type='machine'
228 )
229
230 # set event for this machine
231 self.machines[new.entity_id].event.set()
232
233 elif delta.entity == 'application':
234
235 # check registered application
236 if new.entity_id not in self.applications:
237 return
238
239 # write change in database
240 await self.n2vc.write_app_status_to_db(
241 db_dict=self.applications[new.entity_id].db_dict,
242 status=juju_status_2_osm_status(delta.entity, new.status),
243 detailed_status=new.status_message,
244 vca_status=new.status,
245 entity_type='application'
246 )
247
248 # set event for this application
249 self.applications[new.entity_id].event.set()
250
251 elif delta.entity == 'unit':
252
253 # get the application for this unit
254 application_id = delta.data['application']
255
256 # check registered application
257 if application_id not in self.applications:
258 return
259
260 # write change in database
261 await self.n2vc.write_app_status_to_db(
262 db_dict=self.applications[application_id].db_dict,
263 status=juju_status_2_osm_status(delta.entity, new.workload_status),
264 detailed_status=new.workload_status_message,
265 vca_status=new.workload_status,
266 entity_type='unit'
267 )
268
269 # set event for this application
270 self.applications[application_id].event.set()
271
272 elif delta.entity == 'action':
273
274 # check registered action
275 if new.entity_id not in self.actions:
276 return
277
278 # write change in database
279 await self.n2vc.write_app_status_to_db(
280 db_dict=self.actions[new.entity_id].db_dict,
281 status=juju_status_2_osm_status(delta.entity, new.status),
282 detailed_status=new.status,
283 vca_status=new.status,
284 entity_type='action'
285 )
286
287 # set event for this application
288 self.actions[new.entity_id].event.set()
289
290
291async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
292 try:
293 await asyncio.wait_for(fut=event.wait(), timeout=timeout)
294 except asyncio.TimeoutError:
295 pass
296 return event.is_set()