VCD feature 7193-provider_nerwork
[osm/RO.git] / osm_ro / wim / actions.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34 # pylint: disable=E1101,E0203,W0201
35
36 """Common logic for task management"""
37 import logging
38 from time import time
39 from types import StringTypes
40
41 from six.moves import range
42
43 import yaml
44
45 from ..utils import (
46 filter_dict_keys,
47 filter_out_dict_keys,
48 merge_dicts,
49 remove_none_items,
50 truncate
51 )
52
53 PENDING, REFRESH, IGNORE = range(3)
54
55 TIMEOUT = 1 * 60 * 60 # 1 hour
56 MIN_ATTEMPTS = 10
57
58
59 class Action(object):
60 """Create a basic object representing the action record.
61
62 Arguments:
63 record (dict): record as returned by the database
64 **kwargs: extra keyword arguments to overwrite the fields in record
65 """
66
67 PROPERTIES = [
68 'task_index', # MD - Index number of the task.
69 # This together with the instance_action_id
70 # forms a unique key identifier
71 'action', # MD - CREATE, DELETE, FIND
72 'item', # MD - table name, eg. instance_wim_nets
73 'item_id', # MD - uuid of the referenced entry in the
74 # previous table
75 'instance_action_id', # MD - reference to a cohesive group of actions
76 # related to the same instance-scenario
77 'wim_account_id', # MD - reference to the WIM account used
78 # by the thread/connector
79 'wim_internal_id', # MD - internal ID used by the WIM to refer to
80 # the item
81 'datacenter_vim_id', # MD - reference to the VIM account used
82 # by the thread/connector
83 'vim_id', # MD - internal ID used by the VIM to refer to
84 # the item
85 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
86 'extra', # MD - text with yaml format at database,
87 # dict at memory with:
88 # `- params: list with the params to be sent to the VIM for CREATE
89 # or FIND. For DELETE the vim_id is taken from other
90 # related tasks
91 # `- find: (only for CREATE tasks) if present it should FIND
92 # before creating and use if existing.
93 # Contains the FIND params
94 # `- depends_on: list with the 'task_index'es of tasks that must be
95 # completed before. e.g. a vm creation depends on a net
96 # creation
97 # `- sdn_net_id: used for net.
98 # `- tries
99 # `- created_items:
100 # dictionary with extra elements created that need
101 # to be deleted. e.g. ports,
102 # `- volumes,...
103 # `- created: False if the VIM element is not created by
104 # other actions, and it should not be deleted
105 # `- wim_status: WIM status of the element. Stored also at database
106 # in the item table
107 'params', # M - similar to extra[params]
108 'depends_on', # M - similar to extra[depends_on]
109 'depends', # M - dict with task_index(from depends_on) to
110 # task class
111 'error_msg', # MD - descriptive text upon an error
112 'created_at', # MD - task DB creation time
113 'modified_at', # MD - last DB update time
114 'process_at', # M - unix epoch when to process the task
115 ]
116
117 __slots__ = PROPERTIES + [
118 'logger',
119 ]
120
121 def __init__(self, record, logger=None, **kwargs):
122 self.logger = logger or logging.getLogger('openmano.wim.action')
123 attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs)
124 self.update(_expand_extra(attrs))
125
126 def __repr__(self):
127 return super(Action, self).__repr__() + repr(self.as_dict())
128
129 def as_dict(self, *fields):
130 """Representation of the object as a dict"""
131 attrs = (set(self.PROPERTIES) & set(fields)
132 if fields else self.PROPERTIES)
133 return {k: getattr(self, k) for k in attrs}
134
135 def as_record(self):
136 """Returns a dict that can be send to the persistence layer"""
137 special = ['params', 'depends_on', 'depends']
138 record = self.as_dict()
139 record['extra'].update(self.as_dict(*special))
140 non_fields = special + ['process_at']
141
142 return remove_none_items(filter_out_dict_keys(record, non_fields))
143
144 def update(self, values=None, **kwargs):
145 """Update the in-memory representation of the task (works similarly to
146 dict.update). The update is NOT automatically persisted.
147 """
148 # "white-listed mass assignment"
149 updates = merge_dicts(values, kwargs)
150 for attr in set(self.PROPERTIES) & set(updates.keys()):
151 setattr(self, attr, updates[attr])
152
153 def save(self, persistence, **kwargs):
154 """Persist current state of the object to the database.
155
156 Arguments:
157 persistence: object encapsulating the database
158 **kwargs: extra properties to be updated before saving
159
160 Note:
161 If any key word argument is passed, the object itself will be
162 changed as an extra side-effect.
163 """
164 action_id = self.instance_action_id
165 index = self.task_index
166 if kwargs:
167 self.update(kwargs)
168 properties = self.as_record()
169
170 return persistence.update_action(action_id, index, properties)
171
172 def fail(self, persistence, reason, status='FAILED'):
173 """Mark action as FAILED, updating tables accordingly"""
174 persistence.update_instance_action_counters(
175 self.instance_action_id,
176 failed=1,
177 done=(-1 if self.status == 'DONE' else 0))
178
179 self.status = status
180 self.error_msg = truncate(reason)
181 self.logger.error('%s %s: %s', self.id, status, reason)
182 return self.save(persistence)
183
184 def succeed(self, persistence, status='DONE'):
185 """Mark action as DONE, updating tables accordingly"""
186 persistence.update_instance_action_counters(
187 self.instance_action_id, done=1)
188 self.status = status
189 self.logger.debug('%s %s', self.id, status)
190 return self.save(persistence)
191
192 def defer(self, persistence, reason,
193 timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS):
194 """Postpone the task processing, taking care to not timeout.
195
196 Arguments:
197 persistence: object encapsulating the database
198 reason (str): explanation for the delay
199 timeout (int): maximum delay tolerated since the first attempt.
200 Note that this number is a time delta, in seconds
201 min_attempts (int): Number of attempts to try before giving up.
202 """
203 now = time()
204 last_attempt = self.extra.get('last_attempted_at') or time()
205 attempts = self.extra.get('attempts') or 0
206
207 if last_attempt - now > timeout and attempts > min_attempts:
208 self.fail(persistence,
209 'Timeout reached. {} attempts in the last {:d} min'
210 .format(attempts, last_attempt / 60))
211
212 self.extra['last_attempted_at'] = time()
213 self.extra['attempts'] = attempts + 1
214 self.logger.info('%s DEFERRED: %s', self.id, reason)
215 return self.save(persistence)
216
217 @property
218 def group_key(self):
219 """Key defining the group to which this tasks belongs"""
220 return (self.item, self.item_id)
221
222 @property
223 def processing(self):
224 """Processing status for the task (PENDING, REFRESH, IGNORE)"""
225 if self.status == 'SCHEDULED':
226 return PENDING
227
228 return IGNORE
229
230 @property
231 def id(self):
232 """Unique identifier of this particular action"""
233 return '{}[{}]'.format(self.instance_action_id, self.task_index)
234
235 @property
236 def is_scheduled(self):
237 return self.status == 'SCHEDULED'
238
239 @property
240 def is_build(self):
241 return self.status == 'BUILD'
242
243 @property
244 def is_done(self):
245 return self.status == 'DONE'
246
247 @property
248 def is_failed(self):
249 return self.status == 'FAILED'
250
251 @property
252 def is_superseded(self):
253 return self.status == 'SUPERSEDED'
254
255 def refresh(self, connector, persistence):
256 """Use the connector/persistence to refresh the status of the item.
257
258 After the item status is refreshed any change in the task should be
259 persisted to the database.
260
261 Arguments:
262 connector: object containing the classes to access the WIM or VIM
263 persistence: object containing the methods necessary to query the
264 database and to persist the updates
265 """
266 self.logger.debug(
267 'Action `%s` has no refresh to be done',
268 self.__class__.__name__)
269
270 def expand_dependency_links(self, task_group):
271 """Expand task indexes into actual IDs"""
272 if not self.depends_on or (
273 isinstance(self.depends, dict) and self.depends):
274 return
275
276 num_tasks = len(task_group)
277 references = {
278 "TASK-{}".format(i): task_group[i]
279 for i in self.depends_on
280 if i < num_tasks and task_group[i].task_index == i and
281 task_group[i].instance_action_id == self.instance_action_id
282 }
283 self.depends = references
284
285 def become_superseded(self, superseding):
286 """When another action tries to supersede this one,
287 we need to change both of them, so the surviving actions will be
288 logic consistent.
289
290 This method should do the required internal changes, and also
291 suggest changes for the other, superseding, action.
292
293 Arguments:
294 superseding: other task superseding this one
295
296 Returns:
297 dict: changes suggested to the action superseding this one.
298 A special key ``superseding_needed`` is used to
299 suggest if the superseding is actually required or not.
300 If not present, ``superseding_needed`` is assumed to
301 be False.
302 """
303 self.status = 'SUPERSEDED'
304 self.logger.debug(
305 'Action `%s` was superseded by `%s`',
306 self.__class__.__name__, superseding.__class__.__name__)
307 return {}
308
309 def supersede(self, others):
310 """Supersede other tasks, if necessary
311
312 Arguments:
313 others (list): action objects being superseded
314
315 When the task decide to supersede others, this method should call
316 ``become_superseded`` on the other actions, collect the suggested
317 updates and perform the necessary changes
318 """
319 # By default actions don't supersede others
320 self.logger.debug(
321 'Action `%s` does not supersede other actions',
322 self.__class__.__name__)
323
324 def process(self, connector, persistence, ovim):
325 """Abstract method, that needs to be implemented.
326 Process the current task.
327
328 Arguments:
329 connector: object with API for accessing the WAN
330 Infrastructure Manager system
331 persistence: abstraction layer for the database
332 ovim: instance of openvim, abstraction layer that enable
333 SDN-related operations
334 """
335 raise NotImplementedError
336
337
338 class FindAction(Action):
339 """Abstract class that should be inherited for FIND actions, depending on
340 the item type.
341 """
342 @property
343 def processing(self):
344 if self.status in ('DONE', 'BUILD'):
345 return REFRESH
346
347 return super(FindAction, self).processing
348
349 def become_superseded(self, superseding):
350 super(FindAction, self).become_superseded(superseding)
351 info = ('vim_id', 'wim_internal_id')
352 return remove_none_items({f: getattr(self, f) for f in info})
353
354
355 class CreateAction(Action):
356 """Abstract class that should be inherited for CREATE actions, depending on
357 the item type.
358 """
359 @property
360 def processing(self):
361 if self.status in ('DONE', 'BUILD'):
362 return REFRESH
363
364 return super(CreateAction, self).processing
365
366 def become_superseded(self, superseding):
367 super(CreateAction, self).become_superseded(superseding)
368
369 created = self.extra.get('created', True)
370 sdn_net_id = self.extra.get('sdn_net_id')
371 pending_info = self.wim_internal_id or self.vim_id or sdn_net_id
372 if not(created and pending_info):
373 return {}
374
375 extra_fields = ('sdn_net_id', 'interfaces', 'created_items')
376 extra_info = filter_dict_keys(self.extra or {}, extra_fields)
377
378 return {'superseding_needed': True,
379 'wim_internal_id': self.wim_internal_id,
380 'vim_id': self.vim_id,
381 'extra': remove_none_items(extra_info)}
382
383
384 class DeleteAction(Action):
385 """Abstract class that should be inherited for DELETE actions, depending on
386 the item type.
387 """
388 def supersede(self, others):
389 self.logger.debug('%s %s %s %s might supersede other actions',
390 self.id, self.action, self.item, self.item_id)
391 # First collect all the changes from the superseded tasks
392 changes = [other.become_superseded(self) for other in others]
393 needed = any(change.pop('superseding_needed', False)
394 for change in changes)
395
396 # Deal with the nested ones first
397 extras = [change.pop('extra', None) or {} for change in changes]
398 items = [extra.pop('created_items', None) or {} for extra in extras]
399 items = merge_dicts(self.extra.get('created_items', {}), *items)
400 self.extra = merge_dicts(self.extra, {'created_items': items}, *extras)
401
402 # Accept the other ones
403 change = ((key, value) for key, value in merge_dicts(*changes).items()
404 if key in self.PROPERTIES)
405 for attr, value in change:
406 setattr(self, attr, value)
407
408 # Reevaluate if the action itself is needed
409 if not needed:
410 self.status = 'SUPERSEDED'
411
412
413 def _expand_extra(record):
414 extra = record.pop('extra', None) or {}
415 if isinstance(extra, StringTypes):
416 extra = yaml.safe_load(extra)
417
418 record['params'] = extra.get('params')
419 record['depends_on'] = extra.get('depends_on', [])
420 record['depends'] = extra.get('depends', None)
421 record['extra'] = extra
422
423 return record