fa64fbba67ba0ae4937d83c84a8a101f68239ae0
[osm/RO.git] / osm_ro / wim / wim_thread.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """
36 Thread-based interaction with WIMs. Tasks are stored in the
37 database (vim_wim_actions table) and processed sequentially
38
39 Please check the Action class for information about the content of each action.
40 """
41
42 import logging
43 import threading
44 from contextlib import contextmanager
45 from functools import partial
46 from itertools import islice, chain, takewhile
47 from operator import itemgetter, attrgetter
48 from sys import exc_info
49 from time import time, sleep
50
51 from six import reraise
52 from six.moves import queue
53
54 from . import wan_link_actions, wimconn_odl, wimconn_dynpac # wimconn_tapi
55 from ..utils import ensure, partition, pipe
56 from .actions import IGNORE, PENDING, REFRESH
57 from .errors import (
58 DbBaseException,
59 QueueFull,
60 InvalidParameters as Invalid,
61 UndefinedAction,
62 )
63 from .failing_connector import FailingConnector
64 from .wimconn import WimConnectorError
65
66 ACTIONS = {
67 'instance_wim_nets': wan_link_actions.ACTIONS
68 }
69
70 CONNECTORS = {
71 "odl": wimconn_odl.OdlConnector,
72 # "tapi": wimconn_tapi
73 # Add extra connectors here
74 "dynpac": wimconn_dynpac.DynpacConnector
75 }
76
77
78 class WimThread(threading.Thread):
79 """Specialized task queue implementation that runs in an isolated thread.
80
81 Objects of this class have a few methods that are intended to be used
82 outside of the thread:
83
84 - start
85 - insert_task
86 - reload
87 - exit
88
89 All the other methods are used internally to manipulate/process the task
90 queue.
91 """
92 RETRY_SCHEDULED = 10 # 10 seconds
93 REFRESH_BUILD = 10 # 10 seconds
94 REFRESH_ACTIVE = 60 # 1 minute
95 BATCH = 10 # 10 actions per round
96 QUEUE_SIZE = 2000
97 RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover
98 MAX_RECOVERY_TIME = 180
99 WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none
100
101 def __init__(self, persistence, wim_account, logger=None, ovim=None):
102 """Init a thread.
103
104 Arguments:
105 persistence: Database abstraction layer
106 wim_account: Record containing wim_account, tenant and wim
107 information.
108 """
109 name = '{}.{}.{}'.format(wim_account['wim']['name'],
110 wim_account['name'], wim_account['uuid'])
111 super(WimThread, self).__init__(name=name)
112
113 self.name = name
114 self.connector = None
115 self.wim_account = wim_account
116
117 self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
118 self.persist = persistence
119 self.ovim = ovim
120
121 self.task_queue = queue.Queue(self.QUEUE_SIZE)
122
123 self.refresh_tasks = []
124 """Time ordered task list for refreshing the status of WIM nets"""
125
126 self.pending_tasks = []
127 """Time ordered task list for creation, deletion of WIM nets"""
128
129 self.grouped_tasks = {}
130 """ It contains all the creation/deletion pending tasks grouped by
131 its concrete vm, net, etc
132
133 <item><item_id>:
134 - <task1> # e.g. CREATE task
135 <task2> # e.g. DELETE task
136 """
137
138 self._insert_task = {
139 PENDING: partial(self.schedule, list_name='pending'),
140 REFRESH: partial(self.schedule, list_name='refresh'),
141 IGNORE: lambda task, *_, **__: task.save(self.persist)}
142 """Send the task to the right processing queue"""
143
144 def on_start(self):
145 """Run a series of procedures every time the thread (re)starts"""
146 self.connector = self.get_connector()
147 self.reload_actions()
148
149 def get_connector(self):
150 """Create an WimConnector instance according to the wim.type"""
151 error_msg = ''
152 account_id = self.wim_account['uuid']
153 try:
154 account = self.persist.get_wim_account_by(
155 uuid=account_id, hide=None) # Credentials need to be available
156 wim = account['wim']
157 mapping = self.persist.query('wim_port_mappings',
158 WHERE={'wim_id': wim['uuid']},
159 error_if_none=False)
160 return CONNECTORS[wim['type']](wim, account, {
161 'service_endpoint_mapping': mapping or []
162 })
163 except DbBaseException as ex:
164 error_msg = ('Error when retrieving WIM account ({})\n'
165 .format(account_id)) + str(ex)
166 self.logger.error(error_msg, exc_info=True)
167 except KeyError as ex:
168 error_msg = ('Unable to find the WIM connector for WIM ({})\n'
169 .format(wim['type'])) + str(ex)
170 self.logger.error(error_msg, exc_info=True)
171 except (WimConnectorError, Exception) as ex:
172 # TODO: Remove the Exception class here when the connector class is
173 # ready
174 error_msg = ('Error when loading WIM connector for WIM ({})\n'
175 .format(wim['type'])) + str(ex)
176 self.logger.error(error_msg, exc_info=True)
177
178 error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
179 .format(account_id, self.wim_account.get('name')))
180 self.logger.warning(error_msg_extra)
181 return FailingConnector(error_msg + '\n' + error_msg_extra)
182
183 @contextmanager
184 def avoid_exceptions(self):
185 """Make a real effort to keep the thread alive, by avoiding the
186 exceptions. They are instead logged as a critical errors.
187 """
188 try:
189 yield
190 except Exception as ex:
191 self.logger.critical("Unexpected exception %s", ex, exc_info=True)
192 sleep(self.RECOVERY_TIME)
193
194 def reload_actions(self, group_limit=100):
195 """Read actions from database and reload them at memory.
196
197 This method will clean and reload the attributes ``refresh_tasks``,
198 ``pending_tasks`` and ``grouped_tasks``
199
200 Attributes:
201 group_limit (int): maximum number of action groups (those that
202 refer to the same ``<item, item_id>``) to be retrieved from the
203 database in each batch.
204 """
205
206 # First we clean the cache to let the garbage collector work
207 self.refresh_tasks = []
208 self.pending_tasks = []
209 self.grouped_tasks = {}
210
211 offset = 0
212
213 while True:
214 # Do things in batches
215 task_groups = self.persist.get_actions_in_groups(
216 self.wim_account['uuid'], item_types=('instance_wim_nets',),
217 group_offset=offset, group_limit=group_limit)
218 offset += (group_limit - 1) # Update for the next batch
219
220 if not task_groups:
221 break
222
223 pending_groups = (g for _, g in task_groups if is_pending_group(g))
224
225 for task_list in pending_groups:
226 with self.avoid_exceptions():
227 self.insert_pending_tasks(filter_pending_tasks(task_list))
228
229 self.logger.debug(
230 'Reloaded wim actions pending: %d refresh: %d',
231 len(self.pending_tasks), len(self.refresh_tasks))
232
233 def insert_pending_tasks(self, task_list):
234 """Insert task in the list of actions being processed"""
235 task_list = [action_from(task, self.logger) for task in task_list]
236
237 for task in task_list:
238 group = task.group_key
239 self.grouped_tasks.setdefault(group, [])
240 # Each task can try to supersede the other ones,
241 # but just DELETE actions will actually do
242 task.supersede(self.grouped_tasks[group])
243 self.grouped_tasks[group].append(task)
244
245 # We need a separate loop so each task can check all the other
246 # ones before deciding
247 for task in task_list:
248 self._insert_task[task.processing](task)
249 self.logger.debug('Insert WIM task: %s (%s): %s %s',
250 task.id, task.status, task.action, task.item)
251
252 def schedule(self, task, when=None, list_name='pending'):
253 """Insert a task in the correct list, respecting the schedule.
254 The refreshing list is ordered by threshold_time (task.process_at)
255 It is assumed that this is called inside this thread
256
257 Arguments:
258 task (Action): object representing the task.
259 This object must implement the ``process`` method and inherit
260 from the ``Action`` class
261 list_name: either 'refresh' or 'pending'
262 when (float): unix time in seconds since as a float number
263 """
264 processing_list = {'refresh': self.refresh_tasks,
265 'pending': self.pending_tasks}[list_name]
266
267 when = when or time()
268 task.process_at = when
269
270 schedule = (t.process_at for t in processing_list)
271 index = len(list(takewhile(lambda moment: moment <= when, schedule)))
272
273 processing_list.insert(index, task)
274 self.logger.debug(
275 'Schedule of %s in "%s" - waiting position: %d (%f)',
276 task.id, list_name, index, task.process_at)
277
278 return task
279
280 def process_list(self, list_name='pending'):
281 """Process actions in batches and reschedule them if necessary"""
282 task_list, handler = {
283 'refresh': (self.refresh_tasks, self._refresh_single),
284 'pending': (self.pending_tasks, self._process_single)}[list_name]
285
286 now = time()
287 waiting = ((i, task) for i, task in enumerate(task_list)
288 if task.process_at is None or task.process_at <= now)
289
290 is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
291 superseded, active = partition(is_superseded, waiting)
292 superseded = [(i, t.save(self.persist)) for i, t in superseded]
293
294 batch = islice(active, self.BATCH)
295 refreshed = [(i, handler(t)) for i, t in batch]
296
297 # Since pop changes the indexes in the list, we need to do it backwards
298 remove = sorted([i for i, _ in chain(refreshed, superseded)])
299 return len([task_list.pop(i) for i in reversed(remove)])
300
301 def _refresh_single(self, task):
302 """Refresh just a single task, and reschedule it if necessary"""
303 now = time()
304
305 result = task.refresh(self.connector, self.persist)
306 self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
307 task.id, task.status, task.action, task.item, result)
308
309 interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
310 self.schedule(task, now + interval, 'refresh')
311
312 return result
313
314 def _process_single(self, task):
315 """Process just a single task, and reschedule it if necessary"""
316 now = time()
317
318 result = task.process(self.connector, self.persist, self.ovim)
319 self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
320 task.id, task.status, task.action, task.item, result)
321
322 if task.action == 'DELETE':
323 del self.grouped_tasks[task.group_key]
324
325 self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
326
327 return result
328
329 def insert_task(self, task):
330 """Send a message to the running thread
331
332 This function is supposed to be called outside of the WIM Thread.
333
334 Arguments:
335 task (str or dict): `"exit"`, `"reload"` or dict representing a
336 task. For more information about the fields in task, please
337 check the Action class.
338 """
339 try:
340 self.task_queue.put(task, False)
341 return None
342 except queue.Full:
343 ex = QueueFull(self.name)
344 reraise(ex.__class__, ex, exc_info()[2])
345
346 def reload(self):
347 """Send a message to the running thread to reload itself"""
348 self.insert_task('reload')
349
350 def exit(self):
351 """Send a message to the running thread to kill itself"""
352 self.insert_task('exit')
353
354 def run(self):
355 self.logger.debug('Starting: %s', self.name)
356 recovery_time = 0
357 while True:
358 self.on_start()
359 reload_thread = False
360 self.logger.debug('Reloaded: %s', self.name)
361
362 while True:
363 with self.avoid_exceptions():
364 while not self.task_queue.empty():
365 task = self.task_queue.get()
366 if isinstance(task, dict):
367 self.insert_pending_tasks([task])
368 elif isinstance(task, list):
369 self.insert_pending_tasks(task)
370 elif isinstance(task, str):
371 if task == 'exit':
372 self.logger.debug('Finishing: %s', self.name)
373 return 0
374 elif task == 'reload':
375 reload_thread = True
376 break
377 self.task_queue.task_done()
378
379 if reload_thread:
380 break
381
382 if not(self.process_list('pending') +
383 self.process_list('refresh')):
384 sleep(self.WAITING_TIME)
385
386 if isinstance(self.connector, FailingConnector):
387 # Wait sometime to try instantiating the connector
388 # again and restart
389 # Increase the recovery time if restarting is not
390 # working (up to a limit)
391 recovery_time = min(self.MAX_RECOVERY_TIME,
392 recovery_time + self.RECOVERY_TIME)
393 sleep(recovery_time)
394 break
395 else:
396 recovery_time = 0
397
398 self.logger.debug("Finishing")
399
400
401 def is_pending_group(group):
402 return all(task['action'] != 'DELETE' or
403 task['status'] == 'SCHEDULED'
404 for task in group)
405
406
407 def filter_pending_tasks(group):
408 return (t for t in group
409 if (t['status'] == 'SCHEDULED' or
410 t['action'] in ('CREATE', 'FIND')))
411
412
413 def action_from(record, logger=None, mapping=ACTIONS):
414 """Create an Action object from a action record (dict)
415
416 Arguments:
417 mapping (dict): Nested data structure that maps the relationship
418 between action properties and object constructors. This data
419 structure should be a dict with 2 levels of keys: item type and
420 action type. Example::
421 {'wan_link':
422 {'CREATE': WanLinkCreate}
423 ...}
424 ...}
425 record (dict): action information
426
427 Return:
428 (Action.Base): Object representing the action
429 """
430 ensure('item' in record, Invalid('`record` should contain "item"'))
431 ensure('action' in record, Invalid('`record` should contain "action"'))
432
433 try:
434 factory = mapping[record['item']][record['action']]
435 return factory(record, logger=logger)
436 except KeyError:
437 ex = UndefinedAction(record['item'], record['action'])
438 reraise(ex.__class__, ex, exc_info()[2])