3e23a48d861700490da9b307d0c68e7cab1dea2a
[osm/RO.git] / osm_ro / wim / wim_thread.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """
36 Thread-based interaction with WIMs. Tasks are stored in the
37 database (vim_wim_actions table) and processed sequentially
38
39 Please check the Action class for information about the content of each action.
40 """
41
42 import logging
43 import threading
44 from contextlib import contextmanager
45 from functools import partial
46 from itertools import islice, chain, takewhile
47 from operator import itemgetter, attrgetter
48 from sys import exc_info
49 from time import time, sleep
50
51 from six import reraise
52 from six.moves import queue
53
54 from . import wan_link_actions
55 from ..utils import ensure, partition, pipe
56 from .actions import IGNORE, PENDING, REFRESH
57 from .errors import (
58 DbBaseException,
59 QueueFull,
60 InvalidParameters as Invalid,
61 UndefinedAction,
62 )
63 from .failing_connector import FailingConnector
64 from .wimconn import WimConnectorError
65 from .wimconn_dynpac import DynpacConnector
66 from .wimconn_fake import FakeConnector
67 from .wimconn_ietfl2vpn import WimconnectorIETFL2VPN
68
69 ACTIONS = {
70 'instance_wim_nets': wan_link_actions.ACTIONS
71 }
72
73 CONNECTORS = {
74 # "odl": wimconn_odl.OdlConnector,
75 "dynpac": DynpacConnector,
76 "fake": FakeConnector,
77 "ietfl2vpn": WimconnectorIETFL2VPN,
78 # Add extra connectors here
79 }
80
81
82 class WimThread(threading.Thread):
83 """Specialized task queue implementation that runs in an isolated thread.
84
85 Objects of this class have a few methods that are intended to be used
86 outside of the thread:
87
88 - start
89 - insert_task
90 - reload
91 - exit
92
93 All the other methods are used internally to manipulate/process the task
94 queue.
95 """
96 RETRY_SCHEDULED = 10 # 10 seconds
97 REFRESH_BUILD = 10 # 10 seconds
98 REFRESH_ACTIVE = 60 # 1 minute
99 BATCH = 10 # 10 actions per round
100 QUEUE_SIZE = 2000
101 RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover
102 MAX_RECOVERY_TIME = 180
103 WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none
104
105 def __init__(self, persistence, wim_account, logger=None, ovim=None):
106 """Init a thread.
107
108 Arguments:
109 persistence: Database abstraction layer
110 wim_account: Record containing wim_account, tenant and wim
111 information.
112 """
113 name = '{}.{}.{}'.format(wim_account['wim']['name'],
114 wim_account['name'], wim_account['uuid'])
115 super(WimThread, self).__init__(name=name)
116
117 self.name = name
118 self.connector = None
119 self.wim_account = wim_account
120
121 self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
122 self.persist = persistence
123 self.ovim = ovim
124
125 self.task_queue = queue.Queue(self.QUEUE_SIZE)
126
127 self.refresh_tasks = []
128 """Time ordered task list for refreshing the status of WIM nets"""
129
130 self.pending_tasks = []
131 """Time ordered task list for creation, deletion of WIM nets"""
132
133 self.grouped_tasks = {}
134 """ It contains all the creation/deletion pending tasks grouped by
135 its concrete vm, net, etc
136
137 <item><item_id>:
138 - <task1> # e.g. CREATE task
139 <task2> # e.g. DELETE task
140 """
141
142 self._insert_task = {
143 PENDING: partial(self.schedule, list_name='pending'),
144 REFRESH: partial(self.schedule, list_name='refresh'),
145 IGNORE: lambda task, *_, **__: task.save(self.persist)}
146 """Send the task to the right processing queue"""
147
148 def on_start(self):
149 """Run a series of procedures every time the thread (re)starts"""
150 self.connector = self.get_connector()
151 self.reload_actions()
152
153 def get_connector(self):
154 """Create an WimConnector instance according to the wim.type"""
155 error_msg = ''
156 account_id = self.wim_account['uuid']
157 try:
158 account = self.persist.get_wim_account_by(
159 uuid=account_id, hide=None) # Credentials need to be available
160 wim = account['wim']
161 mapping = self.persist.query('wim_port_mappings',
162 WHERE={'wim_id': wim['uuid']},
163 error_if_none=False)
164 return CONNECTORS[wim['type']](wim, account, {
165 'service_endpoint_mapping': mapping or []
166 })
167 except DbBaseException as ex:
168 error_msg = ('Error when retrieving WIM account ({})\n'
169 .format(account_id)) + str(ex)
170 self.logger.error(error_msg, exc_info=True)
171 except KeyError as ex:
172 error_msg = ('Unable to find the WIM connector for WIM ({})\n'
173 .format(wim['type'])) + str(ex)
174 self.logger.error(error_msg, exc_info=True)
175 except (WimConnectorError, Exception) as ex:
176 # TODO: Remove the Exception class here when the connector class is
177 # ready
178 error_msg = ('Error when loading WIM connector for WIM ({})\n'
179 .format(wim['type'])) + str(ex)
180 self.logger.error(error_msg, exc_info=True)
181
182 error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
183 .format(account_id, self.wim_account.get('name')))
184 self.logger.warning(error_msg_extra)
185 return FailingConnector(error_msg + '\n' + error_msg_extra)
186
187 @contextmanager
188 def avoid_exceptions(self):
189 """Make a real effort to keep the thread alive, by avoiding the
190 exceptions. They are instead logged as a critical errors.
191 """
192 try:
193 yield
194 except Exception as ex:
195 self.logger.critical("Unexpected exception %s", ex, exc_info=True)
196 sleep(self.RECOVERY_TIME)
197
198 def reload_actions(self, group_limit=100):
199 """Read actions from database and reload them at memory.
200
201 This method will clean and reload the attributes ``refresh_tasks``,
202 ``pending_tasks`` and ``grouped_tasks``
203
204 Attributes:
205 group_limit (int): maximum number of action groups (those that
206 refer to the same ``<item, item_id>``) to be retrieved from the
207 database in each batch.
208 """
209
210 # First we clean the cache to let the garbage collector work
211 self.refresh_tasks = []
212 self.pending_tasks = []
213 self.grouped_tasks = {}
214
215 offset = 0
216
217 while True:
218 # Do things in batches
219 task_groups = self.persist.get_actions_in_groups(
220 self.wim_account['uuid'], item_types=('instance_wim_nets',),
221 group_offset=offset, group_limit=group_limit)
222 offset += (group_limit - 1) # Update for the next batch
223
224 if not task_groups:
225 break
226
227 pending_groups = (g for _, g in task_groups if is_pending_group(g))
228
229 for task_list in pending_groups:
230 with self.avoid_exceptions():
231 self.insert_pending_tasks(filter_pending_tasks(task_list))
232
233 self.logger.debug(
234 'Reloaded wim actions pending: %d refresh: %d',
235 len(self.pending_tasks), len(self.refresh_tasks))
236
237 def insert_pending_tasks(self, task_list):
238 """Insert task in the list of actions being processed"""
239 task_list = [action_from(task, self.logger) for task in task_list]
240
241 for task in task_list:
242 group = task.group_key
243 self.grouped_tasks.setdefault(group, [])
244 # Each task can try to supersede the other ones,
245 # but just DELETE actions will actually do
246 task.supersede(self.grouped_tasks[group])
247 self.grouped_tasks[group].append(task)
248
249 # We need a separate loop so each task can check all the other
250 # ones before deciding
251 for task in task_list:
252 self._insert_task[task.processing](task)
253 self.logger.debug('Insert WIM task: %s (%s): %s %s',
254 task.id, task.status, task.action, task.item)
255
256 def schedule(self, task, when=None, list_name='pending'):
257 """Insert a task in the correct list, respecting the schedule.
258 The refreshing list is ordered by threshold_time (task.process_at)
259 It is assumed that this is called inside this thread
260
261 Arguments:
262 task (Action): object representing the task.
263 This object must implement the ``process`` method and inherit
264 from the ``Action`` class
265 list_name: either 'refresh' or 'pending'
266 when (float): unix time in seconds since as a float number
267 """
268 processing_list = {'refresh': self.refresh_tasks,
269 'pending': self.pending_tasks}[list_name]
270
271 when = when or time()
272 task.process_at = when
273
274 schedule = (t.process_at for t in processing_list)
275 index = len(list(takewhile(lambda moment: moment <= when, schedule)))
276
277 processing_list.insert(index, task)
278 self.logger.debug(
279 'Schedule of %s in "%s" - waiting position: %d (%f)',
280 task.id, list_name, index, task.process_at)
281
282 return task
283
284 def process_list(self, list_name='pending'):
285 """Process actions in batches and reschedule them if necessary"""
286 task_list, handler = {
287 'refresh': (self.refresh_tasks, self._refresh_single),
288 'pending': (self.pending_tasks, self._process_single)}[list_name]
289
290 now = time()
291 waiting = ((i, task) for i, task in enumerate(task_list)
292 if task.process_at is None or task.process_at <= now)
293
294 is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
295 superseded, active = partition(is_superseded, waiting)
296 superseded = [(i, t.save(self.persist)) for i, t in superseded]
297
298 batch = islice(active, self.BATCH)
299 refreshed = [(i, handler(t)) for i, t in batch]
300
301 # Since pop changes the indexes in the list, we need to do it backwards
302 remove = sorted([i for i, _ in chain(refreshed, superseded)])
303 return len([task_list.pop(i) for i in reversed(remove)])
304
305 def _refresh_single(self, task):
306 """Refresh just a single task, and reschedule it if necessary"""
307 now = time()
308
309 result = task.refresh(self.connector, self.persist)
310 self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
311 task.id, task.status, task.action, task.item, result)
312
313 interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
314 self.schedule(task, now + interval, 'refresh')
315
316 return result
317
318 def _process_single(self, task):
319 """Process just a single task, and reschedule it if necessary"""
320 now = time()
321
322 result = task.process(self.connector, self.persist, self.ovim)
323 self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
324 task.id, task.status, task.action, task.item, result)
325
326 if task.action == 'DELETE':
327 del self.grouped_tasks[task.group_key]
328
329 self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
330
331 return result
332
333 def insert_task(self, task):
334 """Send a message to the running thread
335
336 This function is supposed to be called outside of the WIM Thread.
337
338 Arguments:
339 task (str or dict): `"exit"`, `"reload"` or dict representing a
340 task. For more information about the fields in task, please
341 check the Action class.
342 """
343 try:
344 self.task_queue.put(task, False)
345 return None
346 except queue.Full:
347 ex = QueueFull(self.name)
348 reraise(ex.__class__, ex, exc_info()[2])
349
350 def reload(self):
351 """Send a message to the running thread to reload itself"""
352 self.insert_task('reload')
353
354 def exit(self):
355 """Send a message to the running thread to kill itself"""
356 self.insert_task('exit')
357
358 def run(self):
359 self.logger.debug('Starting: %s', self.name)
360 recovery_time = 0
361 while True:
362 self.on_start()
363 reload_thread = False
364 self.logger.debug('Reloaded: %s', self.name)
365
366 while True:
367 with self.avoid_exceptions():
368 while not self.task_queue.empty():
369 task = self.task_queue.get()
370 if isinstance(task, dict):
371 self.insert_pending_tasks([task])
372 elif isinstance(task, list):
373 self.insert_pending_tasks(task)
374 elif isinstance(task, str):
375 if task == 'exit':
376 self.logger.debug('Finishing: %s', self.name)
377 return 0
378 elif task == 'reload':
379 reload_thread = True
380 break
381 self.task_queue.task_done()
382
383 if reload_thread:
384 break
385
386 if not(self.process_list('pending') +
387 self.process_list('refresh')):
388 sleep(self.WAITING_TIME)
389
390 if isinstance(self.connector, FailingConnector):
391 # Wait sometime to try instantiating the connector
392 # again and restart
393 # Increase the recovery time if restarting is not
394 # working (up to a limit)
395 recovery_time = min(self.MAX_RECOVERY_TIME,
396 recovery_time + self.RECOVERY_TIME)
397 sleep(recovery_time)
398 break
399 else:
400 recovery_time = 0
401
402 self.logger.debug("Finishing")
403
404
405 def is_pending_group(group):
406 return all(task['action'] != 'DELETE' or
407 task['status'] == 'SCHEDULED'
408 for task in group)
409
410
411 def filter_pending_tasks(group):
412 return (t for t in group
413 if (t['status'] == 'SCHEDULED' or
414 t['action'] in ('CREATE', 'FIND')))
415
416
417 def action_from(record, logger=None, mapping=ACTIONS):
418 """Create an Action object from a action record (dict)
419
420 Arguments:
421 mapping (dict): Nested data structure that maps the relationship
422 between action properties and object constructors. This data
423 structure should be a dict with 2 levels of keys: item type and
424 action type. Example::
425 {'wan_link':
426 {'CREATE': WanLinkCreate}
427 ...}
428 ...}
429 record (dict): action information
430
431 Return:
432 (Action.Base): Object representing the action
433 """
434 ensure('item' in record, Invalid('`record` should contain "item"'))
435 ensure('action' in record, Invalid('`record` should contain "action"'))
436
437 try:
438 factory = mapping[record['item']][record['action']]
439 return factory(record, logger=logger)
440 except KeyError:
441 ex = UndefinedAction(record['item'], record['action'])
442 reraise(ex.__class__, ex, exc_info()[2])