Check if cloud is built-in cloud when adding a model
[osm/N2VC.git] / n2vc / juju_observer.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import time
25
26 from juju.model import ModelObserver, Model
27 from juju.machine import Machine
28 from juju.application import Application
29 from juju.action import Action
30
31 from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
32 from n2vc.exceptions import N2VCTimeoutException
33
34
35 class _Entity:
36 def __init__(self, entity_id: str, entity_type: str, obj: object, db_dict: dict):
37 self.entity_id = entity_id
38 self.entity_type = entity_type
39 self.obj = obj
40 self.event = asyncio.Event()
41 self.db_dict = db_dict
42
43
44 class JujuModelObserver(ModelObserver):
45
46 def __init__(self, n2vc: N2VCConnector, model: Model):
47 self.n2vc = n2vc
48 self.model = model
49 model.add_observer(self)
50 self.machines = dict()
51 self.applications = dict()
52 self.actions = dict()
53
54 def register_machine(self, machine: Machine, db_dict: dict):
55 try:
56 entity_id = machine.entity_id
57 except Exception as e:
58 # no entity_id aatribute, try machine attribute
59 entity_id = machine.machine
60 # self.n2vc.debug(msg='Registering machine for change notifications: {}'.format(entity_id))
61 entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
62 self.machines[entity_id] = entity
63
64 def unregister_machine(self, machine_id: str):
65 if machine_id in self.machines:
66 del self.machines[machine_id]
67
68 def is_machine_registered(self, machine_id: str):
69 return machine_id in self.machines
70
71 def register_application(self, application: Application, db_dict: dict):
72 entity_id = application.entity_id
73 # self.n2vc.debug(msg='Registering application for change notifications: {}'.format(entity_id))
74 entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
75 self.applications[entity_id] = entity
76
77 def unregister_application(self, application_id: str):
78 if application_id in self.applications:
79 del self.applications[application_id]
80
81 def is_application_registered(self, application_id: str):
82 return application_id in self.applications
83
84 def register_action(self, action: Action, db_dict: dict):
85 entity_id = action.entity_id
86 # self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
87 entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
88 self.actions[entity_id] = entity
89
90 def unregister_action(self, action_id: str):
91 if action_id in self.actions:
92 del self.actions[action_id]
93
94 def is_action_registered(self, action_id: str):
95 return action_id in self.actions
96
97 async def wait_for_machine(
98 self,
99 machine_id: str,
100 progress_timeout: float = None,
101 total_timeout: float = None) -> int:
102
103 if not self.is_machine_registered(machine_id):
104 return
105
106 self.n2vc.debug('Waiting for machine completed: {}'.format(machine_id))
107
108 # wait for a final state
109 entity = self.machines[machine_id]
110 return await self._wait_for_entity(
111 entity=entity,
112 field_to_check='agent_status',
113 final_states_list=['started'],
114 progress_timeout=progress_timeout,
115 total_timeout=total_timeout)
116
117 async def wait_for_application(
118 self,
119 application_id: str,
120 progress_timeout: float = None,
121 total_timeout: float = None) -> int:
122
123 if not self.is_application_registered(application_id):
124 return
125
126 self.n2vc.debug('Waiting for application completed: {}'.format(application_id))
127
128 # application statuses: unknown, active, waiting
129 # wait for a final state
130 entity = self.applications[application_id]
131 return await self._wait_for_entity(
132 entity=entity,
133 field_to_check='status',
134 final_states_list=['active', 'blocked'],
135 progress_timeout=progress_timeout,
136 total_timeout=total_timeout)
137
138 async def wait_for_action(
139 self,
140 action_id: str,
141 progress_timeout: float = None,
142 total_timeout: float = None) -> int:
143
144 if not self.is_action_registered(action_id):
145 return
146
147 self.n2vc.debug('Waiting for action completed: {}'.format(action_id))
148
149 # action statuses: pending, running, completed, failed, cancelled
150 # wait for a final state
151 entity = self.actions[action_id]
152 return await self._wait_for_entity(
153 entity=entity,
154 field_to_check='status',
155 final_states_list=['completed', 'failed', 'cancelled'],
156 progress_timeout=progress_timeout,
157 total_timeout=total_timeout)
158
159 async def _wait_for_entity(
160 self,
161 entity: _Entity,
162 field_to_check: str,
163 final_states_list: list,
164 progress_timeout: float = None,
165 total_timeout: float = None) -> int:
166
167 # default values for no timeout
168 if total_timeout is None:
169 total_timeout = 100000
170 if progress_timeout is None:
171 progress_timeout = 100000
172
173 # max end time
174 now = time.time()
175 total_end = now + total_timeout
176
177 if now >= total_end:
178 raise N2VCTimeoutException(
179 message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id),
180 timeout='total'
181 )
182
183 # update next progress timeout
184 progress_end = now + progress_timeout # type: float
185
186 # which is closest? progress or end timeout?
187 closest_end = min(total_end, progress_end)
188
189 next_timeout = closest_end - now
190
191 retries = 0
192
193 while entity.obj.__getattribute__(field_to_check) not in final_states_list:
194 retries += 1
195 if await _wait_for_event_or_timeout(entity.event, next_timeout):
196 entity.event.clear()
197 else:
198 message = 'Progress timeout {} seconds, {}}: {}'\
199 .format(progress_timeout, entity.entity_type, entity.entity_id)
200 self.n2vc.debug(message)
201 raise N2VCTimeoutException(message=message, timeout='progress')
202 # self.n2vc.debug('End of wait. Final state: {}, retries: {}'
203 # .format(entity.obj.__getattribute__(field_to_check), retries))
204 return retries
205
206 async def on_change(self, delta, old, new, model):
207
208 if new is None:
209 return
210
211 # log
212 # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
213 # .format(delta.type, delta.entity, new.entity_id))
214
215 if delta.entity == 'machine':
216
217 # check registered machine
218 if new.entity_id not in self.machines:
219 return
220
221 # write change in database
222 await self.n2vc.write_app_status_to_db(
223 db_dict=self.machines[new.entity_id].db_dict,
224 status=juju_status_2_osm_status(delta.entity, new.agent_status),
225 detailed_status=new.status_message,
226 vca_status=new.status,
227 entity_type='machine'
228 )
229
230 # set event for this machine
231 self.machines[new.entity_id].event.set()
232
233 elif delta.entity == 'application':
234
235 # check registered application
236 if new.entity_id not in self.applications:
237 return
238
239 # write change in database
240 await self.n2vc.write_app_status_to_db(
241 db_dict=self.applications[new.entity_id].db_dict,
242 status=juju_status_2_osm_status(delta.entity, new.status),
243 detailed_status=new.status_message,
244 vca_status=new.status,
245 entity_type='application'
246 )
247
248 # set event for this application
249 self.applications[new.entity_id].event.set()
250
251 elif delta.entity == 'unit':
252
253 # get the application for this unit
254 application_id = delta.data['application']
255
256 # check registered application
257 if application_id not in self.applications:
258 return
259
260 # write change in database
261 if not new.dead:
262 await self.n2vc.write_app_status_to_db(
263 db_dict=self.applications[application_id].db_dict,
264 status=juju_status_2_osm_status(delta.entity, new.workload_status),
265 detailed_status=new.workload_status_message,
266 vca_status=new.workload_status,
267 entity_type='unit'
268 )
269
270 # set event for this application
271 self.applications[application_id].event.set()
272
273 elif delta.entity == 'action':
274
275 # check registered action
276 if new.entity_id not in self.actions:
277 return
278
279 # write change in database
280 await self.n2vc.write_app_status_to_db(
281 db_dict=self.actions[new.entity_id].db_dict,
282 status=juju_status_2_osm_status(delta.entity, new.status),
283 detailed_status=new.status,
284 vca_status=new.status,
285 entity_type='action'
286 )
287
288 # set event for this application
289 self.actions[new.entity_id].event.set()
290
291
292 async def _wait_for_event_or_timeout(event: asyncio.Event, timeout: float = None):
293 try:
294 await asyncio.wait_for(fut=event.wait(), timeout=timeout)
295 except asyncio.TimeoutError:
296 pass
297 return event.is_set()