2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from juju
.action
import Action
27 from juju
.application
import Application
28 from juju
.machine
import Machine
29 from juju
.model
import ModelObserver
, Model
31 from n2vc
.exceptions
import N2VCTimeoutException
32 from n2vc
.n2vc_conn
import N2VCConnector
, juju_status_2_osm_status
36 def __init__(self
, entity_id
: str, entity_type
: str, obj
: object, db_dict
: dict):
37 self
.entity_id
= entity_id
38 self
.entity_type
= entity_type
40 self
.event
= asyncio
.Event()
41 self
.db_dict
= db_dict
44 class JujuModelObserver(ModelObserver
):
45 def __init__(self
, n2vc
: N2VCConnector
, model
: Model
):
48 model
.add_observer(self
)
49 self
.machines
= dict()
50 self
.applications
= dict()
53 def register_machine(self
, machine
: Machine
, db_dict
: dict):
55 entity_id
= machine
.entity_id
57 # no entity_id aatribute, try machine attribute
58 entity_id
= machine
.machine
60 # msg='Registering machine for change notifications: {}'.format(entity_id))
62 entity_id
=entity_id
, entity_type
="machine", obj
=machine
, db_dict
=db_dict
64 self
.machines
[entity_id
] = entity
66 def unregister_machine(self
, machine_id
: str):
67 if machine_id
in self
.machines
:
68 del self
.machines
[machine_id
]
70 def is_machine_registered(self
, machine_id
: str):
71 return machine_id
in self
.machines
73 def register_application(self
, application
: Application
, db_dict
: dict):
74 entity_id
= application
.entity_id
76 # msg='Registering application for change notifications: {}'.format(entity_id))
79 entity_type
="application",
83 self
.applications
[entity_id
] = entity
85 def unregister_application(self
, application_id
: str):
86 if application_id
in self
.applications
:
87 del self
.applications
[application_id
]
89 def is_application_registered(self
, application_id
: str):
90 return application_id
in self
.applications
92 def register_action(self
, action
: Action
, db_dict
: dict):
93 entity_id
= action
.entity_id
95 # msg='Registering action for changes notifications: {}'.format(entity_id))
97 entity_id
=entity_id
, entity_type
="action", obj
=action
, db_dict
=db_dict
99 self
.actions
[entity_id
] = entity
101 def unregister_action(self
, action_id
: str):
102 if action_id
in self
.actions
:
103 del self
.actions
[action_id
]
105 def is_action_registered(self
, action_id
: str):
106 return action_id
in self
.actions
108 async def wait_for_machine(
111 progress_timeout
: float = None,
112 total_timeout
: float = None,
115 if not self
.is_machine_registered(machine_id
):
118 self
.n2vc
.debug("Waiting for machine completed: {}".format(machine_id
))
120 # wait for a final state
121 entity
= self
.machines
[machine_id
]
122 return await self
._wait
_for
_entity
(
124 field_to_check
="agent_status",
125 final_states_list
=["started"],
126 progress_timeout
=progress_timeout
,
127 total_timeout
=total_timeout
,
130 async def wait_for_application(
133 progress_timeout
: float = None,
134 total_timeout
: float = None,
137 if not self
.is_application_registered(application_id
):
140 self
.n2vc
.debug("Waiting for application completed: {}".format(application_id
))
142 # application statuses: unknown, active, waiting
143 # wait for a final state
144 entity
= self
.applications
[application_id
]
145 return await self
._wait
_for
_entity
(
147 field_to_check
="status",
148 final_states_list
=["active", "blocked"],
149 progress_timeout
=progress_timeout
,
150 total_timeout
=total_timeout
,
153 async def wait_for_action(
156 progress_timeout
: float = None,
157 total_timeout
: float = None,
160 if not self
.is_action_registered(action_id
):
163 self
.n2vc
.debug("Waiting for action completed: {}".format(action_id
))
165 # action statuses: pending, running, completed, failed, cancelled
166 # wait for a final state
167 entity
= self
.actions
[action_id
]
168 return await self
._wait
_for
_entity
(
170 field_to_check
="status",
171 final_states_list
=["completed", "failed", "cancelled"],
172 progress_timeout
=progress_timeout
,
173 total_timeout
=total_timeout
,
176 async def _wait_for_entity(
180 final_states_list
: list,
181 progress_timeout
: float = None,
182 total_timeout
: float = None,
185 # default values for no timeout
186 if total_timeout
is None:
188 if progress_timeout
is None:
189 progress_timeout
= 3600
193 total_end
= now
+ total_timeout
196 raise N2VCTimeoutException(
197 message
="Total timeout {} seconds, {}: {}".format(
198 total_timeout
, entity
.entity_type
, entity
.entity_id
203 # update next progress timeout
204 progress_end
= now
+ progress_timeout
# type: float
206 # which is closest? progress or end timeout?
207 closest_end
= min(total_end
, progress_end
)
209 next_timeout
= closest_end
- now
213 while entity
.obj
.__getattribute
__(field_to_check
) not in final_states_list
:
215 if await _wait_for_event_or_timeout(entity
.event
, next_timeout
):
218 message
= "Progress timeout {} seconds, {}: {}".format(
219 progress_timeout
, entity
.entity_type
, entity
.entity_id
221 self
.n2vc
.debug(message
)
222 raise N2VCTimeoutException(message
=message
, timeout
="progress")
223 # self.n2vc.debug('End of wait. Final state: {}, retries: {}'
224 # .format(entity.obj.__getattribute__(field_to_check), retries))
227 async def on_change(self
, delta
, old
, new
, model
):
233 # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
234 # .format(delta.type, delta.entity, new.entity_id))
236 if delta
.entity
== "machine":
238 # check registered machine
239 if new
.entity_id
not in self
.machines
:
242 # write change in database
243 await self
.n2vc
.write_app_status_to_db(
244 db_dict
=self
.machines
[new
.entity_id
].db_dict
,
245 status
=juju_status_2_osm_status(delta
.entity
, new
.agent_status
),
246 detailed_status
=new
.status_message
,
247 vca_status
=new
.status
,
248 entity_type
="machine",
251 # set event for this machine
252 self
.machines
[new
.entity_id
].event
.set()
254 elif delta
.entity
== "application":
256 # check registered application
257 if new
.entity_id
not in self
.applications
:
260 # write change in database
261 await self
.n2vc
.write_app_status_to_db(
262 db_dict
=self
.applications
[new
.entity_id
].db_dict
,
263 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
264 detailed_status
=new
.status_message
,
265 vca_status
=new
.status
,
266 entity_type
="application",
269 # set event for this application
270 self
.applications
[new
.entity_id
].event
.set()
272 elif delta
.entity
== "unit":
274 # get the application for this unit
275 application_id
= delta
.data
["application"]
277 # check registered application
278 if application_id
not in self
.applications
:
281 # write change in database
283 await self
.n2vc
.write_app_status_to_db(
284 db_dict
=self
.applications
[application_id
].db_dict
,
285 status
=juju_status_2_osm_status(delta
.entity
, new
.workload_status
),
286 detailed_status
=new
.workload_status_message
,
287 vca_status
=new
.workload_status
,
291 # set event for this application
292 self
.applications
[application_id
].event
.set()
294 elif delta
.entity
== "action":
296 # check registered action
297 if new
.entity_id
not in self
.actions
:
300 # write change in database
301 await self
.n2vc
.write_app_status_to_db(
302 db_dict
=self
.actions
[new
.entity_id
].db_dict
,
303 status
=juju_status_2_osm_status(delta
.entity
, new
.status
),
304 detailed_status
=new
.status
,
305 vca_status
=new
.status
,
306 entity_type
="action",
309 # set event for this application
310 self
.actions
[new
.entity_id
].event
.set()
313 async def _wait_for_event_or_timeout(event
: asyncio
.Event
, timeout
: float = None):
315 await asyncio
.wait_for(fut
=event
.wait(), timeout
=timeout
)
316 except asyncio
.TimeoutError
:
318 return event
.is_set()