Add License headers to all code files
[osm/N2VC.git] / n2vc / juju_observer.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import time
25
26 from juju.model import ModelObserver, Model
27 from juju.machine import Machine
28 from juju.application import Application
29 from juju.action import Action
30
31 from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
32 from n2vc.exceptions import N2VCTimeoutException
33
34
35 class _Entity:
36 def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
37 self.entity_id = entity_id
38 self.entity_type = entity_type
39 self.obj = obj
40 self.event = asyncio.Event()
41 self.db_dict = db_dict
42
43
44 class JujuModelObserver(ModelObserver):
45
46 def __init__(self, n2vc: N2VCConnector, model: Model):
47 self.n2vc = n2vc
48 self.model = model
49 model.add_observer(self)
50 self.machines = dict()
51 self.applications = dict()
52 self.actions = dict()
53
54 def register_machine(self, machine: Machine, db_dict: dict):
55 entity_id = machine.entity_id
56 entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
57 self.machines[entity_id] = entity
58
59 def unregister_machine(self, machine_id: str):
60 if machine_id in self.machines:
61 del self.machines[machine_id]
62
63 def is_machine_registered(self, machine_id: str):
64 return machine_id in self.machines
65
66 def register_application(self, application: Application, db_dict: dict):
67 entity_id = application.entity_id
68 entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
69 self.applications[entity_id] = entity
70
71 def unregister_application(self, application_id: str):
72 if application_id in self.applications:
73 del self.applications[application_id]
74
75 def is_application_registered(self, application_id: str):
76 return application_id in self.applications
77
78 def register_action(self, action: Action, db_dict: dict):
79 entity_id = action.entity_id
80 entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
81 self.actions[entity_id] = entity
82
83 def unregister_action(self, action_id: str):
84 if action_id in self.actions:
85 del self.actions[action_id]
86
87 def is_action_registered(self, action_id: str):
88 return action_id in self.actions
89
90 async def wait_for_machine(
91 self,
92 machine_id: str,
93 progress_timeout: float = None,
94 total_timeout: float = None) -> int:
95
96 if not self.is_machine_registered(machine_id):
97 return
98
99 # wait for a final state
100 entity = self.machines[machine_id]
101 return await self._wait_for_entity(
102 entity=entity,
103 field_to_check='agent_status',
104 final_states_list=['started'],
105 progress_timeout=progress_timeout,
106 total_timeout=total_timeout)
107
108 async def wait_for_application(
109 self,
110 application_id: str,
111 progress_timeout: float = None,
112 total_timeout: float = None) -> int:
113
114 if not self.is_application_registered(application_id):
115 return
116
117 # application statuses: unknown, active, waiting
118 # wait for a final state
119 entity = self.applications[application_id]
120 return await self._wait_for_entity(
121 entity=entity,
122 field_to_check='status',
123 final_states_list=['active', 'blocked'],
124 progress_timeout=progress_timeout,
125 total_timeout=total_timeout)
126
127 async def wait_for_action(
128 self,
129 action_id: str,
130 progress_timeout: float = None,
131 total_timeout: float = None) -> int:
132
133 if not self.is_action_registered(action_id):
134 return
135
136 # action statuses: pending, running, completed, failed, cancelled
137 # wait for a final state
138 entity = self.actions[action_id]
139 return await self._wait_for_entity(
140 entity=entity,
141 field_to_check='status',
142 final_states_list=['completed', 'failed', 'cancelled'],
143 progress_timeout=progress_timeout,
144 total_timeout=total_timeout)
145
146 async def _wait_for_entity(
147 self,
148 entity: _Entity,
149 field_to_check: str,
150 final_states_list: list,
151 progress_timeout: float = None,
152 total_timeout: float = None) -> int:
153
154 # default values for no timeout
155 if total_timeout is None:
156 total_timeout = 100000
157 if progress_timeout is None:
158 progress_timeout = 100000
159
160 # max end time
161 now = time.time()
162 total_end = now + total_timeout
163
164 if now >= total_end:
165 raise N2VCTimeoutException(
166 message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id),
167 timeout='total'
168 )
169
170 # update next progress timeout
171 progress_end = now + progress_timeout # type: float
172
173 # which is closest? progress or end timeout?
174 closest_end = min(total_end, progress_end)
175
176 next_timeout = closest_end - now
177
178 retries = 0
179
180 while entity.obj.__getattribute__(field_to_check) not in final_states_list:
181 retries += 1
182 if await _wait_for_event_or_timeout(entity.event, next_timeout):
183 entity.event.clear()
184 else:
185 message = 'Progress timeout {} seconds, {}}: {}'\
186 .format(progress_timeout, entity.entity_type, entity.entity_id)
187 self.n2vc.debug(message)
188 raise N2VCTimeoutException(message=message, timeout='progress')
189 self.n2vc.debug('End of wait. Final state: {}, retries: {}'
190 .format(entity.obj.__getattribute__(field_to_check), retries))
191 return retries
192
193 async def on_change(self, delta, old, new, model):
194
195 if new is None:
196 return
197
198 # log
199 self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
200 .format(delta.type, delta.entity, new.entity_id))
201
202 if delta.entity == 'machine':
203
204 # check registered machine
205 if new.entity_id not in self.machines:
206 return
207
208 # write change in database
209 await self.n2vc.write_app_status_to_db(
210 db_dict=self.machines[new.entity_id].db_dict,
211 status=juju_status_2_osm_status(delta.entity, new.agent_status),
212 detailed_status=new.status_message,
213 vca_status=new.status,
214 entity_type='machine'
215 )
216
217 # set event for this machine
218 self.machines[new.entity_id].event.set()
219
220 elif delta.entity == 'application':
221
222 # check registered application
223 if new.entity_id not in self.applications:
224 return
225
226 # write change in database
227 await self.n2vc.write_app_status_to_db(
228 db_dict=self.applications[new.entity_id].db_dict,
229 status=juju_status_2_osm_status(delta.entity, new.status),
230 detailed_status=new.status_message,
231 vca_status=new.status,
232 entity_type='application'
233 )
234
235 # set event for this application
236 self.applications[new.entity_id].event.set()
237
238 elif delta.entity == 'unit':
239
240 # get the application for this unit
241 application_id = delta.data['application']
242
243 # check registered application
244 if application_id not in self.applications:
245 return
246
247 # write change in database
248 await self.n2vc.write_app_status_to_db(
249 db_dict=self.applications[application_id].db_dict,
250 status=juju_status_2_osm_status(delta.entity, new.workload_status),
251 detailed_status=new.workload_status_message,
252 vca_status=new.workload_status,
253 entity_type='unit'
254 )
255
256 # set event for this application
257 self.applications[application_id].event.set()
258
259 elif delta.entity == 'action':
260
261 # check registered action
262 if new.entity_id not in self.actions:
263 return
264
265 # write change in database
266 await self.n2vc.write_app_status_to_db(
267 db_dict=self.actions[new.entity_id].db_dict,
268 status=juju_status_2_osm_status(delta.entity, new.status),
269 detailed_status=new.status,
270 vca_status=new.status,
271 entity_type='action'
272 )
273
274 # set event for this application
275 self.actions[new.entity_id].event.set()
276
277
278 async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
279 try:
280 await asyncio.wait_for(fut=event.wait(), timeout=timeout)
281 except asyncio.TimeoutError:
282 pass
283 return event.is_set()