2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from juju
.model
import ModelObserver
, Model
27 from juju
.machine
import Machine
28 from juju
.application
import Application
29 from juju
.action
import Action
31 from n2vc
.n2vc_conn
import N2VCConnector
, juju_status_2_osm_status
32 from n2vc
.exceptions
import N2VCTimeoutException
36 def __init__(self
, entity_id
: str, entity_type
: str, obj
: object, db_dict
: dict):
37 self
.entity_id
= entity_id
38 self
.entity_type
= entity_type
40 self
.event
= asyncio
.Event()
41 self
.db_dict
= db_dict
44 class JujuModelObserver(ModelObserver
):
46 def __init__(self
, n2vc
: N2VCConnector
, model
: Model
):
49 model
.add_observer(self
)
50 self
.machines
= dict()
51 self
.applications
= dict()
54 def register_machine(self
, machine
: Machine
, db_dict
: dict):
56 entity_id
= machine
.entity_id
58 # no entity_id aatribute, try machine attribute
59 entity_id
= machine
.machine
60 self
.n2vc
.debug(msg
='Registering machine for changes notifications: {}'.format(entity_id
))
61 entity
= _Entity(entity_id
=entity_id
, entity_type
='machine', obj
=machine
, db_dict
=db_dict
)
62 self
.machines
[entity_id
] = entity
64 def unregister_machine(self
, machine_id
: str):
65 if machine_id
in self
.machines
:
66 del self
.machines
[machine_id
]
68 def is_machine_registered(self
, machine_id
: str):
69 return machine_id
in self
.machines
71 def register_application(self
, application
: Application
, db_dict
: dict):
72 entity_id
= application
.entity_id
73 self
.n2vc
.debug(msg
='Registering application for changes notifications: {}'.format(entity_id
))
74 entity
= _Entity(entity_id
=entity_id
, entity_type
='application', obj
=application
, db_dict
=db_dict
)
75 self
.applications
[entity_id
] = entity
77 def unregister_application(self
, application_id
: str):
78 if application_id
in self
.applications
:
79 del self
.applications
[application_id
]
81 def is_application_registered(self
, application_id
: str):
82 return application_id
in self
.applications
84 def register_action(self
, action
: Action
, db_dict
: dict):
85 entity_id
= action
.entity_id
86 self
.n2vc
.debug(msg
='Registering action for changes notifications: {}'.format(entity_id
))
87 entity
= _Entity(entity_id
=entity_id
, entity_type
='action', obj
=action
, db_dict
=db_dict
)
88 self
.actions
[entity_id
] = entity
90 def unregister_action(self
, action_id
: str):
91 if action_id
in self
.actions
:
92 del self
.actions
[action_id
]
94 def is_action_registered(self
, action_id
: str):
95 return action_id
in self
.actions
97 async def wait_for_machine(
100 progress_timeout
: float = None,
101 total_timeout
: float = None) -> int:
103 if not self
.is_machine_registered(machine_id
):
106 # wait for a final state
107 entity
= self
.machines
[machine_id
]
108 return await self
._wait
_for
_entity
(
110 field_to_check
='agent_status',
111 final_states_list
=['started'],
112 progress_timeout
=progress_timeout
,
113 total_timeout
=total_timeout
)
115 async def wait_for_application(
118 progress_timeout
: float = None,
119 total_timeout
: float = None) -> int:
121 if not self
.is_application_registered(application_id
):
124 # application statuses: unknown, active, waiting
125 # wait for a final state
126 entity
= self
.applications
[application_id
]
127 return await self
._wait
_for
_entity
(
129 field_to_check
='status',
130 final_states_list
=['active', 'blocked'],
131 progress_timeout
=progress_timeout
,
132 total_timeout
=total_timeout
)
134 async def wait_for_action(
137 progress_timeout
: float = None,
138 total_timeout
: float = None) -> int:
140 if not self
.is_action_registered(action_id
):
143 # action statuses: pending, running, completed, failed, cancelled
144 # wait for a final state
145 entity
= self
.actions
[action_id
]
146 return await self
._wait
_for
_entity
(
148 field_to_check
='status',
149 final_states_list
=['completed', 'failed', 'cancelled'],
150 progress_timeout
=progress_timeout
,
151 total_timeout
=total_timeout
)
153 async def _wait_for_entity(
157 final_states_list
: list,
158 progress_timeout
: float = None,
159 total_timeout
: float = None) -> int:
161 # default values for no timeout
162 if total_timeout
is None:
163 total_timeout
= 100000
164 if progress_timeout
is None:
165 progress_timeout
= 100000
169 total_end
= now
+ total_timeout
172 raise N2VCTimeoutException(
173 message
='Total timeout {} seconds, {}: {}'.format(total_timeout
, entity
.entity_type
, entity
.entity_id
),
177 # update next progress timeout
178 progress_end
= now
+ progress_timeout
# type: float
180 # which is closest? progress or end timeout?
181 closest_end
= min(total_end
, progress_end
)
183 next_timeout
= closest_end
- now
187 while entity
.obj
.__getattribute
__(field_to_check
) not in final_states_list
:
189 if await _wait_for_event_or_timeout(entity
.event
, next_timeout
):
192 message
= 'Progress timeout {} seconds, {}}: {}'\
193 .format(progress_timeout
, entity
.entity_type
, entity
.entity_id
)
194 self
.n2vc
.debug(message
)
195 raise N2VCTimeoutException(message
=message
, timeout
='progress')
196 self
.n2vc
.debug('End of wait. Final state: {}, retries: {}'
197 .format(entity
.obj
.__getattribute
__(field_to_check
), retries
))
200 async def on_change(self
, delta
, old
, new
, model
):
206 self
.n2vc
.debug('on_change(): type: {}, entity: {}, id: {}'
207 .format(delta
.type, delta
.entity
, new
.entity_id
))
209 if delta
.entity
== 'machine':
211 # check registered machine
212 if new
.entity_id
not in self
.machines
:
215 # write change in database
216 await self
.n2vc
.write_app_status_to_db(
217 db_dict
=self
.machines
[new
.entity_id
].db_dict
,
218 status
=juju_status_2_osm_status(delta
.entity
, new
.agent_status
),
219 detailed_status
=new
.status_message
,
220 vca_status
=new
.status
,
221 entity_type
='machine'
224 # set event for this machine
225 self
.machines
[new
.entity_id
].event
.set()
227 elif delta
.entity
== 'application':
229 # check registered application
230 if new
.entity_id
not in self
.applications
:
233 # write change in database
234 await self
.n2vc
.write_app_status_to_db(
235 db_dict
=self
.applications
[new
.entity_id
].db_dict
,
236 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
237 detailed_status
=new
.status_message
,
238 vca_status
=new
.status
,
239 entity_type
='application'
242 # set event for this application
243 self
.applications
[new
.entity_id
].event
.set()
245 elif delta
.entity
== 'unit':
247 # get the application for this unit
248 application_id
= delta
.data
['application']
250 # check registered application
251 if application_id
not in self
.applications
:
254 # write change in database
255 await self
.n2vc
.write_app_status_to_db(
256 db_dict
=self
.applications
[application_id
].db_dict
,
257 status
=juju_status_2_osm_status(delta
.entity
, new
.workload_status
),
258 detailed_status
=new
.workload_status_message
,
259 vca_status
=new
.workload_status
,
263 # set event for this application
264 self
.applications
[application_id
].event
.set()
266 elif delta
.entity
== 'action':
268 # check registered action
269 if new
.entity_id
not in self
.actions
:
272 # write change in database
273 await self
.n2vc
.write_app_status_to_db(
274 db_dict
=self
.actions
[new
.entity_id
].db_dict
,
275 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
276 detailed_status
=new
.status
,
277 vca_status
=new
.status
,
281 # set event for this application
282 self
.actions
[new
.entity_id
].event
.set()
285 async def _wait_for_event_or_timeout(event
: asyncio
.Event
, timeout
: float = None):
287 await asyncio
.wait_for(fut
=event
.wait(), timeout
=timeout
)
288 except asyncio
.TimeoutError
:
290 return event
.is_set()