1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
34 # pylint: disable=E1101,E0203,W0201
36 """Common logic for task management"""
39 from types
import StringTypes
41 from six
.moves
import range
53 PENDING
, REFRESH
, IGNORE
= range(3)
55 TIMEOUT
= 1 * 60 * 60 # 1 hour
60 """Create a basic object representing the action record.
63 record (dict): record as returned by the database
64 **kwargs: extra keyword arguments to overwrite the fields in record
68 'task_index', # MD - Index number of the task.
69 # This together with the instance_action_id
70 # forms a unique key identifier
71 'action', # MD - CREATE, DELETE, FIND
72 'item', # MD - table name, eg. instance_wim_nets
73 'item_id', # MD - uuid of the referenced entry in the
75 'instance_action_id', # MD - reference to a cohesive group of actions
76 # related to the same instance-scenario
77 'wim_account_id', # MD - reference to the WIM account used
78 # by the thread/connector
79 'wim_internal_id', # MD - internal ID used by the WIM to refer to
81 'datacenter_vim_id', # MD - reference to the VIM account used
82 # by the thread/connector
83 'vim_id', # MD - internal ID used by the VIM to refer to
85 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
86 'extra', # MD - text with yaml format at database,
87 # dict at memory with:
88 # `- params: list with the params to be sent to the VIM for CREATE
89 # or FIND. For DELETE the vim_id is taken from other
91 # `- find: (only for CREATE tasks) if present it should FIND
92 # before creating and use if existing.
93 # Contains the FIND params
94 # `- depends_on: list with the 'task_index'es of tasks that must be
95 # completed before. e.g. a vm creation depends on a net
97 # `- sdn_net_id: used for net.
100 # dictionary with extra elements created that need
101 # to be deleted. e.g. ports,
103 # `- created: False if the VIM element is not created by
104 # other actions, and it should not be deleted
105 # `- wim_status: WIM status of the element. Stored also at database
107 'params', # M - similar to extra[params]
108 'depends_on', # M - similar to extra[depends_on]
109 'depends', # M - dict with task_index(from depends_on) to
111 'error_msg', # MD - descriptive text upon an error
112 'created_at', # MD - task DB creation time
113 'modified_at', # MD - last DB update time
114 'process_at', # M - unix epoch when to process the task
117 __slots__
= PROPERTIES
+ [
121 def __init__(self
, record
, logger
=None, **kwargs
):
122 self
.logger
= logger
or logging
.getLogger('openmano.wim.action')
123 attrs
= merge_dicts(dict.fromkeys(self
.PROPERTIES
), record
, kwargs
)
124 self
.update(_expand_extra(attrs
))
127 return super(Action
, self
).__repr
__() + repr(self
.as_dict())
129 def as_dict(self
, *fields
):
130 """Representation of the object as a dict"""
131 attrs
= (set(self
.PROPERTIES
) & set(fields
)
132 if fields
else self
.PROPERTIES
)
133 return {k
: getattr(self
, k
) for k
in attrs
}
136 """Returns a dict that can be send to the persistence layer"""
137 special
= ['params', 'depends_on', 'depends']
138 record
= self
.as_dict()
139 record
['extra'].update(self
.as_dict(*special
))
140 non_fields
= special
+ ['process_at']
142 return remove_none_items(filter_out_dict_keys(record
, non_fields
))
144 def update(self
, values
=None, **kwargs
):
145 """Update the in-memory representation of the task (works similarly to
146 dict.update). The update is NOT automatically persisted.
148 # "white-listed mass assignment"
149 updates
= merge_dicts(values
, kwargs
)
150 for attr
in set(self
.PROPERTIES
) & set(updates
.keys()):
151 setattr(self
, attr
, updates
[attr
])
153 def save(self
, persistence
, **kwargs
):
154 """Persist current state of the object to the database.
157 persistence: object encapsulating the database
158 **kwargs: extra properties to be updated before saving
161 If any key word argument is passed, the object itself will be
162 changed as an extra side-effect.
164 action_id
= self
.instance_action_id
165 index
= self
.task_index
168 properties
= self
.as_record()
170 return persistence
.update_action(action_id
, index
, properties
)
172 def fail(self
, persistence
, reason
, status
='FAILED'):
173 """Mark action as FAILED, updating tables accordingly"""
174 persistence
.update_instance_action_counters(
175 self
.instance_action_id
,
177 done
=(-1 if self
.status
== 'DONE' else 0))
180 self
.error_msg
= truncate(reason
)
181 self
.logger
.error('%s %s: %s', self
.id, status
, reason
)
182 return self
.save(persistence
)
184 def succeed(self
, persistence
, status
='DONE'):
185 """Mark action as DONE, updating tables accordingly"""
186 persistence
.update_instance_action_counters(
187 self
.instance_action_id
, done
=1)
189 self
.logger
.debug('%s %s', self
.id, status
)
190 return self
.save(persistence
)
192 def defer(self
, persistence
, reason
,
193 timeout
=TIMEOUT
, min_attempts
=MIN_ATTEMPTS
):
194 """Postpone the task processing, taking care to not timeout.
197 persistence: object encapsulating the database
198 reason (str): explanation for the delay
199 timeout (int): maximum delay tolerated since the first attempt.
200 Note that this number is a time delta, in seconds
201 min_attempts (int): Number of attempts to try before giving up.
204 last_attempt
= self
.extra
.get('last_attempted_at') or time()
205 attempts
= self
.extra
.get('attempts') or 0
207 if last_attempt
- now
> timeout
and attempts
> min_attempts
:
208 self
.fail(persistence
,
209 'Timeout reached. {} attempts in the last {:d} min'
210 .format(attempts
, last_attempt
/ 60))
212 self
.extra
['last_attempted_at'] = time()
213 self
.extra
['attempts'] = attempts
+ 1
214 self
.logger
.info('%s DEFERRED: %s', self
.id, reason
)
215 return self
.save(persistence
)
219 """Key defining the group to which this tasks belongs"""
220 return (self
.item
, self
.item_id
)
223 def processing(self
):
224 """Processing status for the task (PENDING, REFRESH, IGNORE)"""
225 if self
.status
== 'SCHEDULED':
232 """Unique identifier of this particular action"""
233 return '{}[{}]'.format(self
.instance_action_id
, self
.task_index
)
236 def is_scheduled(self
):
237 return self
.status
== 'SCHEDULED'
241 return self
.status
== 'BUILD'
245 return self
.status
== 'DONE'
249 return self
.status
== 'FAILED'
252 def is_superseded(self
):
253 return self
.status
== 'SUPERSEDED'
255 def refresh(self
, connector
, persistence
):
256 """Use the connector/persistence to refresh the status of the item.
258 After the item status is refreshed any change in the task should be
259 persisted to the database.
262 connector: object containing the classes to access the WIM or VIM
263 persistence: object containing the methods necessary to query the
264 database and to persist the updates
267 'Action `%s` has no refresh to be done',
268 self
.__class
__.__name
__)
270 def expand_dependency_links(self
, task_group
):
271 """Expand task indexes into actual IDs"""
272 if not self
.depends_on
or (
273 isinstance(self
.depends
, dict) and self
.depends
):
276 num_tasks
= len(task_group
)
278 "TASK-{}".format(i
): task_group
[i
]
279 for i
in self
.depends_on
280 if i
< num_tasks
and task_group
[i
].task_index
== i
and
281 task_group
[i
].instance_action_id
== self
.instance_action_id
283 self
.depends
= references
285 def become_superseded(self
, superseding
):
286 """When another action tries to supersede this one,
287 we need to change both of them, so the surviving actions will be
290 This method should do the required internal changes, and also
291 suggest changes for the other, superseding, action.
294 superseding: other task superseding this one
297 dict: changes suggested to the action superseding this one.
298 A special key ``superseding_needed`` is used to
299 suggest if the superseding is actually required or not.
300 If not present, ``superseding_needed`` is assumed to
303 self
.status
= 'SUPERSEDED'
305 'Action `%s` was superseded by `%s`',
306 self
.__class
__.__name
__, superseding
.__class
__.__name
__)
309 def supersede(self
, others
):
310 """Supersede other tasks, if necessary
313 others (list): action objects being superseded
315 When the task decide to supersede others, this method should call
316 ``become_superseded`` on the other actions, collect the suggested
317 updates and perform the necessary changes
319 # By default actions don't supersede others
321 'Action `%s` does not supersede other actions',
322 self
.__class
__.__name
__)
324 def process(self
, connector
, persistence
, ovim
):
325 """Abstract method, that needs to be implemented.
326 Process the current task.
329 connector: object with API for accessing the WAN
330 Infrastructure Manager system
331 persistence: abstraction layer for the database
332 ovim: instance of openvim, abstraction layer that enable
333 SDN-related operations
335 raise NotImplementedError
338 class FindAction(Action
):
339 """Abstract class that should be inherited for FIND actions, depending on
343 def processing(self
):
344 if self
.status
in ('DONE', 'BUILD'):
347 return super(FindAction
, self
).processing
349 def become_superseded(self
, superseding
):
350 super(FindAction
, self
).become_superseded(superseding
)
351 info
= ('vim_id', 'wim_internal_id')
352 return remove_none_items({f
: getattr(self
, f
) for f
in info
})
355 class CreateAction(Action
):
356 """Abstract class that should be inherited for CREATE actions, depending on
360 def processing(self
):
361 if self
.status
in ('DONE', 'BUILD'):
364 return super(CreateAction
, self
).processing
366 def become_superseded(self
, superseding
):
367 super(CreateAction
, self
).become_superseded(superseding
)
369 created
= self
.extra
.get('created', True)
370 sdn_net_id
= self
.extra
.get('sdn_net_id')
371 pending_info
= self
.wim_internal_id
or self
.vim_id
or sdn_net_id
372 if not(created
and pending_info
):
375 extra_fields
= ('sdn_net_id', 'interfaces', 'created_items')
376 extra_info
= filter_dict_keys(self
.extra
or {}, extra_fields
)
378 return {'superseding_needed': True,
379 'wim_internal_id': self
.wim_internal_id
,
380 'vim_id': self
.vim_id
,
381 'extra': remove_none_items(extra_info
)}
384 class DeleteAction(Action
):
385 """Abstract class that should be inherited for DELETE actions, depending on
388 def supersede(self
, others
):
389 self
.logger
.debug('%s %s %s %s might supersede other actions',
390 self
.id, self
.action
, self
.item
, self
.item_id
)
391 # First collect all the changes from the superseded tasks
392 changes
= [other
.become_superseded(self
) for other
in others
]
393 needed
= any(change
.pop('superseding_needed', False)
394 for change
in changes
)
396 # Deal with the nested ones first
397 extras
= [change
.pop('extra', None) or {} for change
in changes
]
398 items
= [extra
.pop('created_items', None) or {} for extra
in extras
]
399 items
= merge_dicts(self
.extra
.get('created_items', {}), *items
)
400 self
.extra
= merge_dicts(self
.extra
, {'created_items': items
}, *extras
)
402 # Accept the other ones
403 change
= ((key
, value
) for key
, value
in merge_dicts(*changes
).items()
404 if key
in self
.PROPERTIES
)
405 for attr
, value
in change
:
406 setattr(self
, attr
, value
)
408 # Reevaluate if the action itself is needed
410 self
.status
= 'SUPERSEDED'
413 def _expand_extra(record
):
414 extra
= record
.pop('extra', None) or {}
415 if isinstance(extra
, StringTypes
):
416 extra
= yaml
.safe_load(extra
)
418 record
['params'] = extra
.get('params')
419 record
['depends_on'] = extra
.get('depends_on', [])
420 record
['depends'] = extra
.get('depends', None)
421 record
['extra'] = extra