Merge remote-tracking branch 'upstream/master' into gerrit-submission
[osm/RO.git] / osm_ro / wim / wim_thread.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """
36 Thread-based interaction with WIMs. Tasks are stored in the
37 database (vim_wim_actions table) and processed sequentially
38
39 Please check the Action class for information about the content of each action.
40 """
41
42 import logging
43 import threading
44 from contextlib import contextmanager
45 from functools import partial
46 from itertools import islice, chain, takewhile
47 from operator import itemgetter, attrgetter
48 from sys import exc_info
49 from time import time, sleep
50
51 from six import reraise
52 from six.moves import queue
53
54 from . import wan_link_actions, wimconn_odl # wimconn_tapi
55 from ..utils import ensure, partition, pipe
56 from .actions import IGNORE, PENDING, REFRESH
57 from .errors import (
58 DbBaseException,
59 QueueFull,
60 InvalidParameters as Invalid,
61 UndefinedAction,
62 )
63 from .failing_connector import FailingConnector
64 from .wimconn import WimConnectorError
65
66 ACTIONS = {
67 'instance_wim_nets': wan_link_actions.ACTIONS
68 }
69
70 CONNECTORS = {
71 "odl": wimconn_odl.OdlConnector,
72 # "tapi": wimconn_tapi
73 # Add extra connectors here
74 }
75
76
77 class WimThread(threading.Thread):
78 """Specialized task queue implementation that runs in an isolated thread.
79
80 Objects of this class have a few methods that are intended to be used
81 outside of the thread:
82
83 - start
84 - insert_task
85 - reload
86 - exit
87
88 All the other methods are used internally to manipulate/process the task
89 queue.
90 """
91 RETRY_SCHEDULED = 10 # 10 seconds
92 REFRESH_BUILD = 10 # 10 seconds
93 REFRESH_ACTIVE = 60 # 1 minute
94 BATCH = 10 # 10 actions per round
95 QUEUE_SIZE = 2000
96 RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover
97 MAX_RECOVERY_TIME = 180
98 WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none
99
100 def __init__(self, persistence, wim_account, logger=None, ovim=None):
101 """Init a thread.
102
103 Arguments:
104 persistence: Database abstraction layer
105 wim_account: Record containing wim_account, tenant and wim
106 information.
107 """
108 name = '{}.{}.{}'.format(wim_account['wim']['name'],
109 wim_account['name'], wim_account['uuid'])
110 super(WimThread, self).__init__(name=name)
111
112 self.name = name
113 self.connector = None
114 self.wim_account = wim_account
115
116 self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
117 self.persist = persistence
118 self.ovim = ovim
119
120 self.task_queue = queue.Queue(self.QUEUE_SIZE)
121
122 self.refresh_tasks = []
123 """Time ordered task list for refreshing the status of WIM nets"""
124
125 self.pending_tasks = []
126 """Time ordered task list for creation, deletion of WIM nets"""
127
128 self.grouped_tasks = {}
129 """ It contains all the creation/deletion pending tasks grouped by
130 its concrete vm, net, etc
131
132 <item><item_id>:
133 - <task1> # e.g. CREATE task
134 <task2> # e.g. DELETE task
135 """
136
137 self._insert_task = {
138 PENDING: partial(self.schedule, list_name='pending'),
139 REFRESH: partial(self.schedule, list_name='refresh'),
140 IGNORE: lambda task, *_, **__: task.save(self.persist)}
141 """Send the task to the right processing queue"""
142
143 def on_start(self):
144 """Run a series of procedures every time the thread (re)starts"""
145 self.connector = self.get_connector()
146 self.reload_actions()
147
148 def get_connector(self):
149 """Create an WimConnector instance according to the wim.type"""
150 error_msg = ''
151 account_id = self.wim_account['uuid']
152 try:
153 account = self.persist.get_wim_account_by(
154 uuid=account_id, hide=None) # Credentials need to be available
155 wim = account['wim']
156 mapping = self.persist.query('wim_port_mappings',
157 WHERE={'wim_id': wim['uuid']},
158 error_if_none=False)
159 return CONNECTORS[wim['type']](wim, account, {
160 'service_endpoint_mapping': mapping or []
161 })
162 except DbBaseException as ex:
163 error_msg = ('Error when retrieving WIM account ({})\n'
164 .format(account_id)) + str(ex)
165 self.logger.error(error_msg, exc_info=True)
166 except KeyError as ex:
167 error_msg = ('Unable to find the WIM connector for WIM ({})\n'
168 .format(wim['type'])) + str(ex)
169 self.logger.error(error_msg, exc_info=True)
170 except (WimConnectorError, Exception) as ex:
171 # TODO: Remove the Exception class here when the connector class is
172 # ready
173 error_msg = ('Error when loading WIM connector for WIM ({})\n'
174 .format(wim['type'])) + str(ex)
175 self.logger.error(error_msg, exc_info=True)
176
177 error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
178 .format(account_id, self.wim_account.get('name')))
179 self.logger.warning(error_msg_extra)
180 return FailingConnector(error_msg + '\n' + error_msg_extra)
181
182 @contextmanager
183 def avoid_exceptions(self):
184 """Make a real effort to keep the thread alive, by avoiding the
185 exceptions. They are instead logged as a critical errors.
186 """
187 try:
188 yield
189 except Exception as ex:
190 self.logger.critical("Unexpected exception %s", ex, exc_info=True)
191 sleep(self.RECOVERY_TIME)
192
193 def reload_actions(self, group_limit=100):
194 """Read actions from database and reload them at memory.
195
196 This method will clean and reload the attributes ``refresh_tasks``,
197 ``pending_tasks`` and ``grouped_tasks``
198
199 Attributes:
200 group_limit (int): maximum number of action groups (those that
201 refer to the same ``<item, item_id>``) to be retrieved from the
202 database in each batch.
203 """
204
205 # First we clean the cache to let the garbage collector work
206 self.refresh_tasks = []
207 self.pending_tasks = []
208 self.grouped_tasks = {}
209
210 offset = 0
211
212 while True:
213 # Do things in batches
214 task_groups = self.persist.get_actions_in_groups(
215 self.wim_account['uuid'], item_types=('instance_wim_nets',),
216 group_offset=offset, group_limit=group_limit)
217 offset += (group_limit - 1) # Update for the next batch
218
219 if not task_groups:
220 break
221
222 pending_groups = (g for _, g in task_groups if is_pending_group(g))
223
224 for task_list in pending_groups:
225 with self.avoid_exceptions():
226 self.insert_pending_tasks(filter_pending_tasks(task_list))
227
228 self.logger.debug(
229 'Reloaded wim actions pending: %d refresh: %d',
230 len(self.pending_tasks), len(self.refresh_tasks))
231
232 def insert_pending_tasks(self, task_list):
233 """Insert task in the list of actions being processed"""
234 task_list = [action_from(task, self.logger) for task in task_list]
235
236 for task in task_list:
237 group = task.group_key
238 self.grouped_tasks.setdefault(group, [])
239 # Each task can try to supersede the other ones,
240 # but just DELETE actions will actually do
241 task.supersede(self.grouped_tasks[group])
242 self.grouped_tasks[group].append(task)
243
244 # We need a separate loop so each task can check all the other
245 # ones before deciding
246 for task in task_list:
247 self._insert_task[task.processing](task)
248 self.logger.debug('Insert WIM task: %s (%s): %s %s',
249 task.id, task.status, task.action, task.item)
250
251 def schedule(self, task, when=None, list_name='pending'):
252 """Insert a task in the correct list, respecting the schedule.
253 The refreshing list is ordered by threshold_time (task.process_at)
254 It is assumed that this is called inside this thread
255
256 Arguments:
257 task (Action): object representing the task.
258 This object must implement the ``process`` method and inherit
259 from the ``Action`` class
260 list_name: either 'refresh' or 'pending'
261 when (float): unix time in seconds since as a float number
262 """
263 processing_list = {'refresh': self.refresh_tasks,
264 'pending': self.pending_tasks}[list_name]
265
266 when = when or time()
267 task.process_at = when
268
269 schedule = (t.process_at for t in processing_list)
270 index = len(list(takewhile(lambda moment: moment <= when, schedule)))
271
272 processing_list.insert(index, task)
273 self.logger.debug(
274 'Schedule of %s in "%s" - waiting position: %d (%f)',
275 task.id, list_name, index, task.process_at)
276
277 return task
278
279 def process_list(self, list_name='pending'):
280 """Process actions in batches and reschedule them if necessary"""
281 task_list, handler = {
282 'refresh': (self.refresh_tasks, self._refresh_single),
283 'pending': (self.pending_tasks, self._process_single)}[list_name]
284
285 now = time()
286 waiting = ((i, task) for i, task in enumerate(task_list)
287 if task.process_at is None or task.process_at <= now)
288
289 is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
290 superseded, active = partition(is_superseded, waiting)
291 superseded = [(i, t.save(self.persist)) for i, t in superseded]
292
293 batch = islice(active, self.BATCH)
294 refreshed = [(i, handler(t)) for i, t in batch]
295
296 # Since pop changes the indexes in the list, we need to do it backwards
297 remove = sorted([i for i, _ in chain(refreshed, superseded)])
298 return len([task_list.pop(i) for i in reversed(remove)])
299
300 def _refresh_single(self, task):
301 """Refresh just a single task, and reschedule it if necessary"""
302 now = time()
303
304 result = task.refresh(self.connector, self.persist)
305 self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
306 task.id, task.status, task.action, task.item, result)
307
308 interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
309 self.schedule(task, now + interval, 'refresh')
310
311 return result
312
313 def _process_single(self, task):
314 """Process just a single task, and reschedule it if necessary"""
315 now = time()
316
317 result = task.process(self.connector, self.persist, self.ovim)
318 self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
319 task.id, task.status, task.action, task.item, result)
320
321 if task.action == 'DELETE':
322 del self.grouped_tasks[task.group_key]
323
324 self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
325
326 return result
327
328 def insert_task(self, task):
329 """Send a message to the running thread
330
331 This function is supposed to be called outside of the WIM Thread.
332
333 Arguments:
334 task (str or dict): `"exit"`, `"reload"` or dict representing a
335 task. For more information about the fields in task, please
336 check the Action class.
337 """
338 try:
339 self.task_queue.put(task, False)
340 return None
341 except queue.Full:
342 ex = QueueFull(self.name)
343 reraise(ex.__class__, ex, exc_info()[2])
344
345 def reload(self):
346 """Send a message to the running thread to reload itself"""
347 self.insert_task('reload')
348
349 def exit(self):
350 """Send a message to the running thread to kill itself"""
351 self.insert_task('exit')
352
353 def run(self):
354 self.logger.debug('Starting: %s', self.name)
355 recovery_time = 0
356 while True:
357 self.on_start()
358 reload_thread = False
359 self.logger.debug('Reloaded: %s', self.name)
360
361 while True:
362 with self.avoid_exceptions():
363 while not self.task_queue.empty():
364 task = self.task_queue.get()
365 if isinstance(task, dict):
366 self.insert_pending_tasks([task])
367 elif isinstance(task, list):
368 self.insert_pending_tasks(task)
369 elif isinstance(task, str):
370 if task == 'exit':
371 self.logger.debug('Finishing: %s', self.name)
372 return 0
373 elif task == 'reload':
374 reload_thread = True
375 break
376 self.task_queue.task_done()
377
378 if reload_thread:
379 break
380
381 if not(self.process_list('pending') +
382 self.process_list('refresh')):
383 sleep(self.WAITING_TIME)
384
385 if isinstance(self.connector, FailingConnector):
386 # Wait sometime to try instantiating the connector
387 # again and restart
388 # Increase the recovery time if restarting is not
389 # working (up to a limit)
390 recovery_time = min(self.MAX_RECOVERY_TIME,
391 recovery_time + self.RECOVERY_TIME)
392 sleep(recovery_time)
393 break
394 else:
395 recovery_time = 0
396
397 self.logger.debug("Finishing")
398
399
400 def is_pending_group(group):
401 return all(task['action'] != 'DELETE' or
402 task['status'] == 'SCHEDULED'
403 for task in group)
404
405
406 def filter_pending_tasks(group):
407 return (t for t in group
408 if (t['status'] == 'SCHEDULED' or
409 t['action'] in ('CREATE', 'FIND')))
410
411
412 def action_from(record, logger=None, mapping=ACTIONS):
413 """Create an Action object from a action record (dict)
414
415 Arguments:
416 mapping (dict): Nested data structure that maps the relationship
417 between action properties and object constructors. This data
418 structure should be a dict with 2 levels of keys: item type and
419 action type. Example::
420 {'wan_link':
421 {'CREATE': WanLinkCreate}
422 ...}
423 ...}
424 record (dict): action information
425
426 Return:
427 (Action.Base): Object representing the action
428 """
429 ensure('item' in record, Invalid('`record` should contain "item"'))
430 ensure('action' in record, Invalid('`record` should contain "action"'))
431
432 try:
433 factory = mapping[record['item']][record['action']]
434 return factory(record, logger=logger)
435 except KeyError:
436 ex = UndefinedAction(record['item'], record['action'])
437 reraise(ex.__class__, ex, exc_info()[2])