2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from juju
.model
import ModelObserver
, Model
27 from juju
.machine
import Machine
28 from juju
.application
import Application
29 from juju
.action
import Action
31 from n2vc
.n2vc_conn
import N2VCConnector
, juju_status_2_osm_status
32 from n2vc
.exceptions
import N2VCTimeoutException
36 def __init__(self
, entity_id
: str, entity_type
: str, obj
: object, db_dict
: dict):
37 self
.entity_id
= entity_id
38 self
.entity_type
= entity_type
40 self
.event
= asyncio
.Event()
41 self
.db_dict
= db_dict
44 class JujuModelObserver(ModelObserver
):
46 def __init__(self
, n2vc
: N2VCConnector
, model
: Model
):
49 model
.add_observer(self
)
50 self
.machines
= dict()
51 self
.applications
= dict()
54 def register_machine(self
, machine
: Machine
, db_dict
: dict):
56 entity_id
= machine
.entity_id
57 except Exception as e
:
58 # no entity_id aatribute, try machine attribute
59 entity_id
= machine
.machine
60 # self.n2vc.debug(msg='Registering machine for change notifications: {}'.format(entity_id))
61 entity
= _Entity(entity_id
=entity_id
, entity_type
='machine', obj
=machine
, db_dict
=db_dict
)
62 self
.machines
[entity_id
] = entity
64 def unregister_machine(self
, machine_id
: str):
65 if machine_id
in self
.machines
:
66 del self
.machines
[machine_id
]
68 def is_machine_registered(self
, machine_id
: str):
69 return machine_id
in self
.machines
71 def register_application(self
, application
: Application
, db_dict
: dict):
72 entity_id
= application
.entity_id
73 # self.n2vc.debug(msg='Registering application for change notifications: {}'.format(entity_id))
74 entity
= _Entity(entity_id
=entity_id
, entity_type
='application', obj
=application
, db_dict
=db_dict
)
75 self
.applications
[entity_id
] = entity
77 def unregister_application(self
, application_id
: str):
78 if application_id
in self
.applications
:
79 del self
.applications
[application_id
]
81 def is_application_registered(self
, application_id
: str):
82 return application_id
in self
.applications
84 def register_action(self
, action
: Action
, db_dict
: dict):
85 entity_id
= action
.entity_id
86 # self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
87 entity
= _Entity(entity_id
=entity_id
, entity_type
='action', obj
=action
, db_dict
=db_dict
)
88 self
.actions
[entity_id
] = entity
90 def unregister_action(self
, action_id
: str):
91 if action_id
in self
.actions
:
92 del self
.actions
[action_id
]
94 def is_action_registered(self
, action_id
: str):
95 return action_id
in self
.actions
97 async def wait_for_machine(
100 progress_timeout
: float = None,
101 total_timeout
: float = None) -> int:
103 if not self
.is_machine_registered(machine_id
):
106 self
.n2vc
.debug('Waiting for machine completed: {}'.format(machine_id
))
108 # wait for a final state
109 entity
= self
.machines
[machine_id
]
110 return await self
._wait
_for
_entity
(
112 field_to_check
='agent_status',
113 final_states_list
=['started'],
114 progress_timeout
=progress_timeout
,
115 total_timeout
=total_timeout
)
117 async def wait_for_application(
120 progress_timeout
: float = None,
121 total_timeout
: float = None) -> int:
123 if not self
.is_application_registered(application_id
):
126 self
.n2vc
.debug('Waiting for application completed: {}'.format(application_id
))
128 # application statuses: unknown, active, waiting
129 # wait for a final state
130 entity
= self
.applications
[application_id
]
131 return await self
._wait
_for
_entity
(
133 field_to_check
='status',
134 final_states_list
=['active', 'blocked'],
135 progress_timeout
=progress_timeout
,
136 total_timeout
=total_timeout
)
138 async def wait_for_action(
141 progress_timeout
: float = None,
142 total_timeout
: float = None) -> int:
144 if not self
.is_action_registered(action_id
):
147 self
.n2vc
.debug('Waiting for action completed: {}'.format(action_id
))
149 # action statuses: pending, running, completed, failed, cancelled
150 # wait for a final state
151 entity
= self
.actions
[action_id
]
152 return await self
._wait
_for
_entity
(
154 field_to_check
='status',
155 final_states_list
=['completed', 'failed', 'cancelled'],
156 progress_timeout
=progress_timeout
,
157 total_timeout
=total_timeout
)
159 async def _wait_for_entity(
163 final_states_list
: list,
164 progress_timeout
: float = None,
165 total_timeout
: float = None) -> int:
167 # default values for no timeout
168 if total_timeout
is None:
169 total_timeout
= 100000
170 if progress_timeout
is None:
171 progress_timeout
= 100000
175 total_end
= now
+ total_timeout
178 raise N2VCTimeoutException(
179 message
='Total timeout {} seconds, {}: {}'.format(total_timeout
, entity
.entity_type
, entity
.entity_id
),
183 # update next progress timeout
184 progress_end
= now
+ progress_timeout
# type: float
186 # which is closest? progress or end timeout?
187 closest_end
= min(total_end
, progress_end
)
189 next_timeout
= closest_end
- now
193 while entity
.obj
.__getattribute
__(field_to_check
) not in final_states_list
:
195 if await _wait_for_event_or_timeout(entity
.event
, next_timeout
):
198 message
= 'Progress timeout {} seconds, {}}: {}'\
199 .format(progress_timeout
, entity
.entity_type
, entity
.entity_id
)
200 self
.n2vc
.debug(message
)
201 raise N2VCTimeoutException(message
=message
, timeout
='progress')
202 # self.n2vc.debug('End of wait. Final state: {}, retries: {}'
203 # .format(entity.obj.__getattribute__(field_to_check), retries))
206 async def on_change(self
, delta
, old
, new
, model
):
212 # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
213 # .format(delta.type, delta.entity, new.entity_id))
215 if delta
.entity
== 'machine':
217 # check registered machine
218 if new
.entity_id
not in self
.machines
:
221 # write change in database
222 await self
.n2vc
.write_app_status_to_db(
223 db_dict
=self
.machines
[new
.entity_id
].db_dict
,
224 status
=juju_status_2_osm_status(delta
.entity
, new
.agent_status
),
225 detailed_status
=new
.status_message
,
226 vca_status
=new
.status
,
227 entity_type
='machine'
230 # set event for this machine
231 self
.machines
[new
.entity_id
].event
.set()
233 elif delta
.entity
== 'application':
235 # check registered application
236 if new
.entity_id
not in self
.applications
:
239 # write change in database
240 await self
.n2vc
.write_app_status_to_db(
241 db_dict
=self
.applications
[new
.entity_id
].db_dict
,
242 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
243 detailed_status
=new
.status_message
,
244 vca_status
=new
.status
,
245 entity_type
='application'
248 # set event for this application
249 self
.applications
[new
.entity_id
].event
.set()
251 elif delta
.entity
== 'unit':
253 # get the application for this unit
254 application_id
= delta
.data
['application']
256 # check registered application
257 if application_id
not in self
.applications
:
260 # write change in database
262 await self
.n2vc
.write_app_status_to_db(
263 db_dict
=self
.applications
[application_id
].db_dict
,
264 status
=juju_status_2_osm_status(delta
.entity
, new
.workload_status
),
265 detailed_status
=new
.workload_status_message
,
266 vca_status
=new
.workload_status
,
270 # set event for this application
271 self
.applications
[application_id
].event
.set()
273 elif delta
.entity
== 'action':
275 # check registered action
276 if new
.entity_id
not in self
.actions
:
279 # write change in database
280 await self
.n2vc
.write_app_status_to_db(
281 db_dict
=self
.actions
[new
.entity_id
].db_dict
,
282 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
283 detailed_status
=new
.status
,
284 vca_status
=new
.status
,
288 # set event for this application
289 self
.actions
[new
.entity_id
].event
.set()
292 async def _wait_for_event_or_timeout(event
: asyncio
.Event
, timeout
: float = None):
294 await asyncio
.wait_for(fut
=event
.wait(), timeout
=timeout
)
295 except asyncio
.TimeoutError
:
297 return event
.is_set()