2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from juju
.model
import ModelObserver
, Model
27 from juju
.machine
import Machine
28 from juju
.application
import Application
29 from juju
.action
import Action
31 from n2vc
.n2vc_conn
import N2VCConnector
, juju_status_2_osm_status
32 from n2vc
.exceptions
import N2VCTimeoutException
36 def __init__(self
, entity_id
: str, entity_type
: str, obj
: object, db_dict
: dict):
37 self
.entity_id
= entity_id
38 self
.entity_type
= entity_type
40 self
.event
= asyncio
.Event()
41 self
.db_dict
= db_dict
44 class JujuModelObserver(ModelObserver
):
46 def __init__(self
, n2vc
: N2VCConnector
, model
: Model
):
49 model
.add_observer(self
)
50 self
.machines
= dict()
51 self
.applications
= dict()
54 def register_machine(self
, machine
: Machine
, db_dict
: dict):
55 entity_id
= machine
.entity_id
56 entity
= _Entity(entity_id
=entity_id
, entity_type
='machine', obj
=machine
, db_dict
=db_dict
)
57 self
.machines
[entity_id
] = entity
59 def unregister_machine(self
, machine_id
: str):
60 if machine_id
in self
.machines
:
61 del self
.machines
[machine_id
]
63 def is_machine_registered(self
, machine_id
: str):
64 return machine_id
in self
.machines
66 def register_application(self
, application
: Application
, db_dict
: dict):
67 entity_id
= application
.entity_id
68 entity
= _Entity(entity_id
=entity_id
, entity_type
='application', obj
=application
, db_dict
=db_dict
)
69 self
.applications
[entity_id
] = entity
71 def unregister_application(self
, application_id
: str):
72 if application_id
in self
.applications
:
73 del self
.applications
[application_id
]
75 def is_application_registered(self
, application_id
: str):
76 return application_id
in self
.applications
78 def register_action(self
, action
: Action
, db_dict
: dict):
79 entity_id
= action
.entity_id
80 entity
= _Entity(entity_id
=entity_id
, entity_type
='action', obj
=action
, db_dict
=db_dict
)
81 self
.actions
[entity_id
] = entity
83 def unregister_action(self
, action_id
: str):
84 if action_id
in self
.actions
:
85 del self
.actions
[action_id
]
87 def is_action_registered(self
, action_id
: str):
88 return action_id
in self
.actions
90 async def wait_for_machine(
93 progress_timeout
: float = None,
94 total_timeout
: float = None) -> int:
96 if not self
.is_machine_registered(machine_id
):
99 # wait for a final state
100 entity
= self
.machines
[machine_id
]
101 return await self
._wait
_for
_entity
(
103 field_to_check
='agent_status',
104 final_states_list
=['started'],
105 progress_timeout
=progress_timeout
,
106 total_timeout
=total_timeout
)
108 async def wait_for_application(
111 progress_timeout
: float = None,
112 total_timeout
: float = None) -> int:
114 if not self
.is_application_registered(application_id
):
117 # application statuses: unknown, active, waiting
118 # wait for a final state
119 entity
= self
.applications
[application_id
]
120 return await self
._wait
_for
_entity
(
122 field_to_check
='status',
123 final_states_list
=['active', 'blocked'],
124 progress_timeout
=progress_timeout
,
125 total_timeout
=total_timeout
)
127 async def wait_for_action(
130 progress_timeout
: float = None,
131 total_timeout
: float = None) -> int:
133 if not self
.is_action_registered(action_id
):
136 # action statuses: pending, running, completed, failed, cancelled
137 # wait for a final state
138 entity
= self
.actions
[action_id
]
139 return await self
._wait
_for
_entity
(
141 field_to_check
='status',
142 final_states_list
=['completed', 'failed', 'cancelled'],
143 progress_timeout
=progress_timeout
,
144 total_timeout
=total_timeout
)
146 async def _wait_for_entity(
150 final_states_list
: list,
151 progress_timeout
: float = None,
152 total_timeout
: float = None) -> int:
154 # default values for no timeout
155 if total_timeout
is None:
156 total_timeout
= 100000
157 if progress_timeout
is None:
158 progress_timeout
= 100000
162 total_end
= now
+ total_timeout
165 raise N2VCTimeoutException(
166 message
='Total timeout {} seconds, {}: {}'.format(total_timeout
, entity
.entity_type
, entity
.entity_id
),
170 # update next progress timeout
171 progress_end
= now
+ progress_timeout
# type: float
173 # which is closest? progress or end timeout?
174 closest_end
= min(total_end
, progress_end
)
176 next_timeout
= closest_end
- now
180 while entity
.obj
.__getattribute
__(field_to_check
) not in final_states_list
:
182 if await _wait_for_event_or_timeout(entity
.event
, next_timeout
):
185 message
= 'Progress timeout {} seconds, {}}: {}'\
186 .format(progress_timeout
, entity
.entity_type
, entity
.entity_id
)
187 self
.n2vc
.debug(message
)
188 raise N2VCTimeoutException(message
=message
, timeout
='progress')
189 self
.n2vc
.debug('End of wait. Final state: {}, retries: {}'
190 .format(entity
.obj
.__getattribute
__(field_to_check
), retries
))
193 async def on_change(self
, delta
, old
, new
, model
):
199 self
.n2vc
.debug('on_change(): type: {}, entity: {}, id: {}'
200 .format(delta
.type, delta
.entity
, new
.entity_id
))
202 if delta
.entity
== 'machine':
204 # check registered machine
205 if new
.entity_id
not in self
.machines
:
208 # write change in database
209 await self
.n2vc
.write_app_status_to_db(
210 db_dict
=self
.machines
[new
.entity_id
].db_dict
,
211 status
=juju_status_2_osm_status(delta
.entity
, new
.agent_status
),
212 detailed_status
=new
.status_message
,
213 vca_status
=new
.status
,
214 entity_type
='machine'
217 # set event for this machine
218 self
.machines
[new
.entity_id
].event
.set()
220 elif delta
.entity
== 'application':
222 # check registered application
223 if new
.entity_id
not in self
.applications
:
226 # write change in database
227 await self
.n2vc
.write_app_status_to_db(
228 db_dict
=self
.applications
[new
.entity_id
].db_dict
,
229 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
230 detailed_status
=new
.status_message
,
231 vca_status
=new
.status
,
232 entity_type
='application'
235 # set event for this application
236 self
.applications
[new
.entity_id
].event
.set()
238 elif delta
.entity
== 'unit':
240 # get the application for this unit
241 application_id
= delta
.data
['application']
243 # check registered application
244 if application_id
not in self
.applications
:
247 # write change in database
248 await self
.n2vc
.write_app_status_to_db(
249 db_dict
=self
.applications
[application_id
].db_dict
,
250 status
=juju_status_2_osm_status(delta
.entity
, new
.workload_status
),
251 detailed_status
=new
.workload_status_message
,
252 vca_status
=new
.workload_status
,
256 # set event for this application
257 self
.applications
[application_id
].event
.set()
259 elif delta
.entity
== 'action':
261 # check registered action
262 if new
.entity_id
not in self
.actions
:
265 # write change in database
266 await self
.n2vc
.write_app_status_to_db(
267 db_dict
=self
.actions
[new
.entity_id
].db_dict
,
268 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
269 detailed_status
=new
.status
,
270 vca_status
=new
.status
,
274 # set event for this application
275 self
.actions
[new
.entity_id
].event
.set()
278 async def _wait_for_event_or_timeout(event
: asyncio
.Event
, timeout
: float = None):
280 await asyncio
.wait_for(fut
=event
.wait(), timeout
=timeout
)
281 except asyncio
.TimeoutError
:
283 return event
.is_set()