1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
34 # pylint: disable=E1101,E0203,W0201
36 """Common logic for task management"""
50 PENDING
, REFRESH
, IGNORE
= range(3)
52 TIMEOUT
= 1 * 60 * 60 # 1 hour
57 """Create a basic object representing the action record.
60 record (dict): record as returned by the database
61 **kwargs: extra keyword arguments to overwrite the fields in record
65 'task_index', # MD - Index number of the task.
66 # This together with the instance_action_id
67 # forms a unique key identifier
68 'action', # MD - CREATE, DELETE, FIND
69 'item', # MD - table name, eg. instance_wim_nets
70 'item_id', # MD - uuid of the referenced entry in the
72 'instance_action_id', # MD - reference to a cohesive group of actions
73 # related to the same instance-scenario
74 'wim_account_id', # MD - reference to the WIM account used
75 # by the thread/connector
76 'wim_internal_id', # MD - internal ID used by the WIM to refer to
78 'datacenter_vim_id', # MD - reference to the VIM account used
79 # by the thread/connector
80 'vim_id', # MD - internal ID used by the VIM to refer to
82 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
83 'extra', # MD - text with yaml format at database,
84 # dict at memory with:
85 # `- params: list with the params to be sent to the VIM for CREATE
86 # or FIND. For DELETE the vim_id is taken from other
88 # `- find: (only for CREATE tasks) if present it should FIND
89 # before creating and use if existing.
90 # Contains the FIND params
91 # `- depends_on: list with the 'task_index'es of tasks that must be
92 # completed before. e.g. a vm creation depends on a net
94 # `- sdn_net_id: used for net.
97 # dictionary with extra elements created that need
98 # to be deleted. e.g. ports,
100 # `- created: False if the VIM element is not created by
101 # other actions, and it should not be deleted
102 # `- wim_status: WIM status of the element. Stored also at database
104 'params', # M - similar to extra[params]
105 'depends_on', # M - similar to extra[depends_on]
106 'depends', # M - dict with task_index(from depends_on) to
108 'error_msg', # MD - descriptive text upon an error
109 'created_at', # MD - task DB creation time
110 'modified_at', # MD - last DB update time
111 'process_at', # M - unix epoch when to process the task
114 __slots__
= PROPERTIES
+ [
118 def __init__(self
, record
, logger
=None, **kwargs
):
119 self
.logger
= logger
or logging
.getLogger('openmano.wim.action')
120 attrs
= merge_dicts(dict.fromkeys(self
.PROPERTIES
), record
, kwargs
)
121 self
.update(_expand_extra(attrs
))
124 return super(Action
, self
).__repr
__() + repr(self
.as_dict())
126 def as_dict(self
, *fields
):
127 """Representation of the object as a dict"""
128 attrs
= (set(self
.PROPERTIES
) & set(fields
)
129 if fields
else self
.PROPERTIES
)
130 return {k
: getattr(self
, k
) for k
in attrs
}
133 """Returns a dict that can be send to the persistence layer"""
134 special
= ['params', 'depends_on', 'depends']
135 record
= self
.as_dict()
136 record
['extra'].update(self
.as_dict(*special
))
137 non_fields
= special
+ ['process_at']
139 return remove_none_items(filter_out_dict_keys(record
, non_fields
))
141 def update(self
, values
=None, **kwargs
):
142 """Update the in-memory representation of the task (works similarly to
143 dict.update). The update is NOT automatically persisted.
145 # "white-listed mass assignment"
146 updates
= merge_dicts(values
, kwargs
)
147 for attr
in set(self
.PROPERTIES
) & set(updates
.keys()):
148 setattr(self
, attr
, updates
[attr
])
150 def save(self
, persistence
, **kwargs
):
151 """Persist current state of the object to the database.
154 persistence: object encapsulating the database
155 **kwargs: extra properties to be updated before saving
158 If any key word argument is passed, the object itself will be
159 changed as an extra side-effect.
161 action_id
= self
.instance_action_id
162 index
= self
.task_index
165 properties
= self
.as_record()
167 return persistence
.update_action(action_id
, index
, properties
)
169 def fail(self
, persistence
, reason
, status
='FAILED'):
170 """Mark action as FAILED, updating tables accordingly"""
171 persistence
.update_instance_action_counters(
172 self
.instance_action_id
,
174 done
=(-1 if self
.status
== 'DONE' else 0))
177 self
.error_msg
= truncate(reason
)
178 self
.logger
.error('%s %s: %s', self
.id, status
, reason
)
179 return self
.save(persistence
)
181 def succeed(self
, persistence
, status
='DONE'):
182 """Mark action as DONE, updating tables accordingly"""
183 persistence
.update_instance_action_counters(
184 self
.instance_action_id
, done
=1)
186 self
.logger
.debug('%s %s', self
.id, status
)
187 return self
.save(persistence
)
189 def defer(self
, persistence
, reason
,
190 timeout
=TIMEOUT
, min_attempts
=MIN_ATTEMPTS
):
191 """Postpone the task processing, taking care to not timeout.
194 persistence: object encapsulating the database
195 reason (str): explanation for the delay
196 timeout (int): maximum delay tolerated since the first attempt.
197 Note that this number is a time delta, in seconds
198 min_attempts (int): Number of attempts to try before giving up.
201 last_attempt
= self
.extra
.get('last_attempted_at') or time()
202 attempts
= self
.extra
.get('attempts') or 0
204 if last_attempt
- now
> timeout
and attempts
> min_attempts
:
205 self
.fail(persistence
,
206 'Timeout reached. {} attempts in the last {:d} min'
207 .format(attempts
, last_attempt
/ 60))
209 self
.extra
['last_attempted_at'] = time()
210 self
.extra
['attempts'] = attempts
+ 1
211 self
.logger
.info('%s DEFERRED: %s', self
.id, reason
)
212 return self
.save(persistence
)
216 """Key defining the group to which this tasks belongs"""
217 return (self
.item
, self
.item_id
)
220 def processing(self
):
221 """Processing status for the task (PENDING, REFRESH, IGNORE)"""
222 if self
.status
== 'SCHEDULED':
229 """Unique identifier of this particular action"""
230 return '{}[{}]'.format(self
.instance_action_id
, self
.task_index
)
233 def is_scheduled(self
):
234 return self
.status
== 'SCHEDULED'
238 return self
.status
== 'BUILD'
242 return self
.status
== 'DONE'
246 return self
.status
== 'FAILED'
249 def is_superseded(self
):
250 return self
.status
== 'SUPERSEDED'
252 def refresh(self
, connector
, persistence
):
253 """Use the connector/persistence to refresh the status of the item.
255 After the item status is refreshed any change in the task should be
256 persisted to the database.
259 connector: object containing the classes to access the WIM or VIM
260 persistence: object containing the methods necessary to query the
261 database and to persist the updates
264 'Action `%s` has no refresh to be done',
265 self
.__class
__.__name
__)
267 def expand_dependency_links(self
, task_group
):
268 """Expand task indexes into actual IDs"""
269 if not self
.depends_on
or (
270 isinstance(self
.depends
, dict) and self
.depends
):
273 num_tasks
= len(task_group
)
275 "TASK-{}".format(i
): task_group
[i
]
276 for i
in self
.depends_on
277 if i
< num_tasks
and task_group
[i
].task_index
== i
and
278 task_group
[i
].instance_action_id
== self
.instance_action_id
280 self
.depends
= references
282 def become_superseded(self
, superseding
):
283 """When another action tries to supersede this one,
284 we need to change both of them, so the surviving actions will be
287 This method should do the required internal changes, and also
288 suggest changes for the other, superseding, action.
291 superseding: other task superseding this one
294 dict: changes suggested to the action superseding this one.
295 A special key ``superseding_needed`` is used to
296 suggest if the superseding is actually required or not.
297 If not present, ``superseding_needed`` is assumed to
300 self
.status
= 'SUPERSEDED'
302 'Action `%s` was superseded by `%s`',
303 self
.__class
__.__name
__, superseding
.__class
__.__name
__)
306 def supersede(self
, others
):
307 """Supersede other tasks, if necessary
310 others (list): action objects being superseded
312 When the task decide to supersede others, this method should call
313 ``become_superseded`` on the other actions, collect the suggested
314 updates and perform the necessary changes
316 # By default actions don't supersede others
318 'Action `%s` does not supersede other actions',
319 self
.__class
__.__name
__)
321 def process(self
, connector
, persistence
, ovim
):
322 """Abstract method, that needs to be implemented.
323 Process the current task.
326 connector: object with API for accessing the WAN
327 Infrastructure Manager system
328 persistence: abstraction layer for the database
329 ovim: instance of openvim, abstraction layer that enable
330 SDN-related operations
332 raise NotImplementedError
335 class FindAction(Action
):
336 """Abstract class that should be inherited for FIND actions, depending on
340 def processing(self
):
341 if self
.status
in ('DONE', 'BUILD'):
344 return super(FindAction
, self
).processing
346 def become_superseded(self
, superseding
):
347 super(FindAction
, self
).become_superseded(superseding
)
348 info
= ('vim_id', 'wim_internal_id')
349 return remove_none_items({f
: getattr(self
, f
) for f
in info
})
352 class CreateAction(Action
):
353 """Abstract class that should be inherited for CREATE actions, depending on
357 def processing(self
):
358 if self
.status
in ('DONE', 'BUILD'):
361 return super(CreateAction
, self
).processing
363 def become_superseded(self
, superseding
):
364 super(CreateAction
, self
).become_superseded(superseding
)
366 created
= self
.extra
.get('created', True)
367 sdn_net_id
= self
.extra
.get('sdn_net_id')
368 pending_info
= self
.wim_internal_id
or self
.vim_id
or sdn_net_id
369 if not(created
and pending_info
):
372 extra_fields
= ('sdn_net_id', 'interfaces', 'created_items')
373 extra_info
= filter_dict_keys(self
.extra
or {}, extra_fields
)
375 return {'superseding_needed': True,
376 'wim_internal_id': self
.wim_internal_id
,
377 'vim_id': self
.vim_id
,
378 'extra': remove_none_items(extra_info
)}
381 class DeleteAction(Action
):
382 """Abstract class that should be inherited for DELETE actions, depending on
385 def supersede(self
, others
):
386 self
.logger
.debug('%s %s %s %s might supersede other actions',
387 self
.id, self
.action
, self
.item
, self
.item_id
)
388 # First collect all the changes from the superseded tasks
389 changes
= [other
.become_superseded(self
) for other
in others
]
390 needed
= any(change
.pop('superseding_needed', False)
391 for change
in changes
)
393 # Deal with the nested ones first
394 extras
= [change
.pop('extra', None) or {} for change
in changes
]
395 items
= [extra
.pop('created_items', None) or {} for extra
in extras
]
396 items
= merge_dicts(self
.extra
.get('created_items', {}), *items
)
397 self
.extra
= merge_dicts(self
.extra
, {'created_items': items
}, *extras
)
399 # Accept the other ones
400 change
= ((key
, value
) for key
, value
in merge_dicts(*changes
).items()
401 if key
in self
.PROPERTIES
)
402 for attr
, value
in change
:
403 setattr(self
, attr
, value
)
405 # Reevaluate if the action itself is needed
407 self
.status
= 'SUPERSEDED'
410 def _expand_extra(record
):
411 extra
= record
.pop('extra', None) or {}
412 if isinstance(extra
, str):
413 extra
= yaml
.safe_load(extra
)
415 record
['params'] = extra
.get('params')
416 record
['depends_on'] = extra
.get('depends_on', [])
417 record
['depends'] = extra
.get('depends', None)
418 record
['extra'] = extra