feature 8029 change RO to python3. Using vim plugins
[osm/RO.git] / RO / osm_ro / wim / actions.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34 # pylint: disable=E1101,E0203,W0201
35
36 """Common logic for task management"""
37 import logging
38 from time import time
39
40 import yaml
41
42 from ..utils import (
43 filter_dict_keys,
44 filter_out_dict_keys,
45 merge_dicts,
46 remove_none_items,
47 truncate
48 )
49
50 PENDING, REFRESH, IGNORE = range(3)
51
52 TIMEOUT = 1 * 60 * 60 # 1 hour
53 MIN_ATTEMPTS = 10
54
55
56 class Action(object):
57 """Create a basic object representing the action record.
58
59 Arguments:
60 record (dict): record as returned by the database
61 **kwargs: extra keyword arguments to overwrite the fields in record
62 """
63
64 PROPERTIES = [
65 'task_index', # MD - Index number of the task.
66 # This together with the instance_action_id
67 # forms a unique key identifier
68 'action', # MD - CREATE, DELETE, FIND
69 'item', # MD - table name, eg. instance_wim_nets
70 'item_id', # MD - uuid of the referenced entry in the
71 # previous table
72 'instance_action_id', # MD - reference to a cohesive group of actions
73 # related to the same instance-scenario
74 'wim_account_id', # MD - reference to the WIM account used
75 # by the thread/connector
76 'wim_internal_id', # MD - internal ID used by the WIM to refer to
77 # the item
78 'datacenter_vim_id', # MD - reference to the VIM account used
79 # by the thread/connector
80 'vim_id', # MD - internal ID used by the VIM to refer to
81 # the item
82 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
83 'extra', # MD - text with yaml format at database,
84 # dict at memory with:
85 # `- params: list with the params to be sent to the VIM for CREATE
86 # or FIND. For DELETE the vim_id is taken from other
87 # related tasks
88 # `- find: (only for CREATE tasks) if present it should FIND
89 # before creating and use if existing.
90 # Contains the FIND params
91 # `- depends_on: list with the 'task_index'es of tasks that must be
92 # completed before. e.g. a vm creation depends on a net
93 # creation
94 # `- sdn_net_id: used for net.
95 # `- tries
96 # `- created_items:
97 # dictionary with extra elements created that need
98 # to be deleted. e.g. ports,
99 # `- volumes,...
100 # `- created: False if the VIM element is not created by
101 # other actions, and it should not be deleted
102 # `- wim_status: WIM status of the element. Stored also at database
103 # in the item table
104 'params', # M - similar to extra[params]
105 'depends_on', # M - similar to extra[depends_on]
106 'depends', # M - dict with task_index(from depends_on) to
107 # task class
108 'error_msg', # MD - descriptive text upon an error
109 'created_at', # MD - task DB creation time
110 'modified_at', # MD - last DB update time
111 'process_at', # M - unix epoch when to process the task
112 ]
113
114 __slots__ = PROPERTIES + [
115 'logger',
116 ]
117
118 def __init__(self, record, logger=None, **kwargs):
119 self.logger = logger or logging.getLogger('openmano.wim.action')
120 attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs)
121 self.update(_expand_extra(attrs))
122
123 def __repr__(self):
124 return super(Action, self).__repr__() + repr(self.as_dict())
125
126 def as_dict(self, *fields):
127 """Representation of the object as a dict"""
128 attrs = (set(self.PROPERTIES) & set(fields)
129 if fields else self.PROPERTIES)
130 return {k: getattr(self, k) for k in attrs}
131
132 def as_record(self):
133 """Returns a dict that can be send to the persistence layer"""
134 special = ['params', 'depends_on', 'depends']
135 record = self.as_dict()
136 record['extra'].update(self.as_dict(*special))
137 non_fields = special + ['process_at']
138
139 return remove_none_items(filter_out_dict_keys(record, non_fields))
140
141 def update(self, values=None, **kwargs):
142 """Update the in-memory representation of the task (works similarly to
143 dict.update). The update is NOT automatically persisted.
144 """
145 # "white-listed mass assignment"
146 updates = merge_dicts(values, kwargs)
147 for attr in set(self.PROPERTIES) & set(updates.keys()):
148 setattr(self, attr, updates[attr])
149
150 def save(self, persistence, **kwargs):
151 """Persist current state of the object to the database.
152
153 Arguments:
154 persistence: object encapsulating the database
155 **kwargs: extra properties to be updated before saving
156
157 Note:
158 If any key word argument is passed, the object itself will be
159 changed as an extra side-effect.
160 """
161 action_id = self.instance_action_id
162 index = self.task_index
163 if kwargs:
164 self.update(kwargs)
165 properties = self.as_record()
166
167 return persistence.update_action(action_id, index, properties)
168
169 def fail(self, persistence, reason, status='FAILED'):
170 """Mark action as FAILED, updating tables accordingly"""
171 persistence.update_instance_action_counters(
172 self.instance_action_id,
173 failed=1,
174 done=(-1 if self.status == 'DONE' else 0))
175
176 self.status = status
177 self.error_msg = truncate(reason)
178 self.logger.error('%s %s: %s', self.id, status, reason)
179 return self.save(persistence)
180
181 def succeed(self, persistence, status='DONE'):
182 """Mark action as DONE, updating tables accordingly"""
183 persistence.update_instance_action_counters(
184 self.instance_action_id, done=1)
185 self.status = status
186 self.logger.debug('%s %s', self.id, status)
187 return self.save(persistence)
188
189 def defer(self, persistence, reason,
190 timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS):
191 """Postpone the task processing, taking care to not timeout.
192
193 Arguments:
194 persistence: object encapsulating the database
195 reason (str): explanation for the delay
196 timeout (int): maximum delay tolerated since the first attempt.
197 Note that this number is a time delta, in seconds
198 min_attempts (int): Number of attempts to try before giving up.
199 """
200 now = time()
201 last_attempt = self.extra.get('last_attempted_at') or time()
202 attempts = self.extra.get('attempts') or 0
203
204 if last_attempt - now > timeout and attempts > min_attempts:
205 self.fail(persistence,
206 'Timeout reached. {} attempts in the last {:d} min'
207 .format(attempts, last_attempt / 60))
208
209 self.extra['last_attempted_at'] = time()
210 self.extra['attempts'] = attempts + 1
211 self.logger.info('%s DEFERRED: %s', self.id, reason)
212 return self.save(persistence)
213
214 @property
215 def group_key(self):
216 """Key defining the group to which this tasks belongs"""
217 return (self.item, self.item_id)
218
219 @property
220 def processing(self):
221 """Processing status for the task (PENDING, REFRESH, IGNORE)"""
222 if self.status == 'SCHEDULED':
223 return PENDING
224
225 return IGNORE
226
227 @property
228 def id(self):
229 """Unique identifier of this particular action"""
230 return '{}[{}]'.format(self.instance_action_id, self.task_index)
231
232 @property
233 def is_scheduled(self):
234 return self.status == 'SCHEDULED'
235
236 @property
237 def is_build(self):
238 return self.status == 'BUILD'
239
240 @property
241 def is_done(self):
242 return self.status == 'DONE'
243
244 @property
245 def is_failed(self):
246 return self.status == 'FAILED'
247
248 @property
249 def is_superseded(self):
250 return self.status == 'SUPERSEDED'
251
252 def refresh(self, connector, persistence):
253 """Use the connector/persistence to refresh the status of the item.
254
255 After the item status is refreshed any change in the task should be
256 persisted to the database.
257
258 Arguments:
259 connector: object containing the classes to access the WIM or VIM
260 persistence: object containing the methods necessary to query the
261 database and to persist the updates
262 """
263 self.logger.debug(
264 'Action `%s` has no refresh to be done',
265 self.__class__.__name__)
266
267 def expand_dependency_links(self, task_group):
268 """Expand task indexes into actual IDs"""
269 if not self.depends_on or (
270 isinstance(self.depends, dict) and self.depends):
271 return
272
273 num_tasks = len(task_group)
274 references = {
275 "TASK-{}".format(i): task_group[i]
276 for i in self.depends_on
277 if i < num_tasks and task_group[i].task_index == i and
278 task_group[i].instance_action_id == self.instance_action_id
279 }
280 self.depends = references
281
282 def become_superseded(self, superseding):
283 """When another action tries to supersede this one,
284 we need to change both of them, so the surviving actions will be
285 logic consistent.
286
287 This method should do the required internal changes, and also
288 suggest changes for the other, superseding, action.
289
290 Arguments:
291 superseding: other task superseding this one
292
293 Returns:
294 dict: changes suggested to the action superseding this one.
295 A special key ``superseding_needed`` is used to
296 suggest if the superseding is actually required or not.
297 If not present, ``superseding_needed`` is assumed to
298 be False.
299 """
300 self.status = 'SUPERSEDED'
301 self.logger.debug(
302 'Action `%s` was superseded by `%s`',
303 self.__class__.__name__, superseding.__class__.__name__)
304 return {}
305
306 def supersede(self, others):
307 """Supersede other tasks, if necessary
308
309 Arguments:
310 others (list): action objects being superseded
311
312 When the task decide to supersede others, this method should call
313 ``become_superseded`` on the other actions, collect the suggested
314 updates and perform the necessary changes
315 """
316 # By default actions don't supersede others
317 self.logger.debug(
318 'Action `%s` does not supersede other actions',
319 self.__class__.__name__)
320
321 def process(self, connector, persistence, ovim):
322 """Abstract method, that needs to be implemented.
323 Process the current task.
324
325 Arguments:
326 connector: object with API for accessing the WAN
327 Infrastructure Manager system
328 persistence: abstraction layer for the database
329 ovim: instance of openvim, abstraction layer that enable
330 SDN-related operations
331 """
332 raise NotImplementedError
333
334
335 class FindAction(Action):
336 """Abstract class that should be inherited for FIND actions, depending on
337 the item type.
338 """
339 @property
340 def processing(self):
341 if self.status in ('DONE', 'BUILD'):
342 return REFRESH
343
344 return super(FindAction, self).processing
345
346 def become_superseded(self, superseding):
347 super(FindAction, self).become_superseded(superseding)
348 info = ('vim_id', 'wim_internal_id')
349 return remove_none_items({f: getattr(self, f) for f in info})
350
351
352 class CreateAction(Action):
353 """Abstract class that should be inherited for CREATE actions, depending on
354 the item type.
355 """
356 @property
357 def processing(self):
358 if self.status in ('DONE', 'BUILD'):
359 return REFRESH
360
361 return super(CreateAction, self).processing
362
363 def become_superseded(self, superseding):
364 super(CreateAction, self).become_superseded(superseding)
365
366 created = self.extra.get('created', True)
367 sdn_net_id = self.extra.get('sdn_net_id')
368 pending_info = self.wim_internal_id or self.vim_id or sdn_net_id
369 if not(created and pending_info):
370 return {}
371
372 extra_fields = ('sdn_net_id', 'interfaces', 'created_items')
373 extra_info = filter_dict_keys(self.extra or {}, extra_fields)
374
375 return {'superseding_needed': True,
376 'wim_internal_id': self.wim_internal_id,
377 'vim_id': self.vim_id,
378 'extra': remove_none_items(extra_info)}
379
380
381 class DeleteAction(Action):
382 """Abstract class that should be inherited for DELETE actions, depending on
383 the item type.
384 """
385 def supersede(self, others):
386 self.logger.debug('%s %s %s %s might supersede other actions',
387 self.id, self.action, self.item, self.item_id)
388 # First collect all the changes from the superseded tasks
389 changes = [other.become_superseded(self) for other in others]
390 needed = any(change.pop('superseding_needed', False)
391 for change in changes)
392
393 # Deal with the nested ones first
394 extras = [change.pop('extra', None) or {} for change in changes]
395 items = [extra.pop('created_items', None) or {} for extra in extras]
396 items = merge_dicts(self.extra.get('created_items', {}), *items)
397 self.extra = merge_dicts(self.extra, {'created_items': items}, *extras)
398
399 # Accept the other ones
400 change = ((key, value) for key, value in merge_dicts(*changes).items()
401 if key in self.PROPERTIES)
402 for attr, value in change:
403 setattr(self, attr, value)
404
405 # Reevaluate if the action itself is needed
406 if not needed:
407 self.status = 'SUPERSEDED'
408
409
410 def _expand_extra(record):
411 extra = record.pop('extra', None) or {}
412 if isinstance(extra, str):
413 extra = yaml.safe_load(extra)
414
415 record['params'] = extra.get('params')
416 record['depends_on'] = extra.get('depends_on', [])
417 record['depends'] = extra.get('depends', None)
418 record['extra'] = extra
419
420 return record