13502b991b195468f77687e082136a1361dbadd7
[osm/RO.git] / RO / osm_ro / wim / wim_thread.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """
36 Thread-based interaction with WIMs. Tasks are stored in the
37 database (vim_wim_actions table) and processed sequentially
38
39 Please check the Action class for information about the content of each action.
40 """
41
42 import logging
43 import threading
44 from contextlib import contextmanager
45 from functools import partial
46 from itertools import islice, chain, takewhile
47 from operator import itemgetter, attrgetter
48 from sys import exc_info
49 from time import time, sleep
50
51 import queue
52
53 from . import wan_link_actions
54 from ..utils import ensure, partition, pipe
55 from .actions import IGNORE, PENDING, REFRESH
56 from .errors import (
57 DbBaseException,
58 QueueFull,
59 InvalidParameters as Invalid,
60 UndefinedAction,
61 )
62 from .failing_connector import FailingConnector
63 from .wimconn import WimConnectorError
64 from .wimconn_dynpac import DynpacConnector
65 from .wimconn_fake import FakeConnector
66 from .wimconn_ietfl2vpn import WimconnectorIETFL2VPN
67
68 ACTIONS = {
69 'instance_wim_nets': wan_link_actions.ACTIONS
70 }
71
72 CONNECTORS = {
73 # "odl": wimconn_odl.OdlConnector,
74 "dynpac": DynpacConnector,
75 "fake": FakeConnector,
76 "tapi": WimconnectorIETFL2VPN,
77 # Add extra connectors here
78 }
79
80
81 class WimThread(threading.Thread):
82 """Specialized task queue implementation that runs in an isolated thread.
83
84 Objects of this class have a few methods that are intended to be used
85 outside of the thread:
86
87 - start
88 - insert_task
89 - reload
90 - exit
91
92 All the other methods are used internally to manipulate/process the task
93 queue.
94 """
95 RETRY_SCHEDULED = 10 # 10 seconds
96 REFRESH_BUILD = 10 # 10 seconds
97 REFRESH_ACTIVE = 60 # 1 minute
98 BATCH = 10 # 10 actions per round
99 QUEUE_SIZE = 2000
100 RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover
101 MAX_RECOVERY_TIME = 180
102 WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none
103
104 def __init__(self, persistence, wim_account, logger=None, ovim=None):
105 """Init a thread.
106
107 Arguments:
108 persistence: Database abstraction layer
109 wim_account: Record containing wim_account, tenant and wim
110 information.
111 """
112 name = '{}.{}.{}'.format(wim_account['wim']['name'],
113 wim_account['name'], wim_account['uuid'])
114 super(WimThread, self).__init__(name=name)
115
116 self.name = name
117 self.connector = None
118 self.wim_account = wim_account
119
120 self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
121 self.persist = persistence
122 self.ovim = ovim
123
124 self.task_queue = queue.Queue(self.QUEUE_SIZE)
125
126 self.refresh_tasks = []
127 """Time ordered task list for refreshing the status of WIM nets"""
128
129 self.pending_tasks = []
130 """Time ordered task list for creation, deletion of WIM nets"""
131
132 self.grouped_tasks = {}
133 """ It contains all the creation/deletion pending tasks grouped by
134 its concrete vm, net, etc
135
136 <item><item_id>:
137 - <task1> # e.g. CREATE task
138 <task2> # e.g. DELETE task
139 """
140
141 self._insert_task = {
142 PENDING: partial(self.schedule, list_name='pending'),
143 REFRESH: partial(self.schedule, list_name='refresh'),
144 IGNORE: lambda task, *_, **__: task.save(self.persist)}
145 """Send the task to the right processing queue"""
146
147 def on_start(self):
148 """Run a series of procedures every time the thread (re)starts"""
149 self.connector = self.get_connector()
150 self.reload_actions()
151
152 def get_connector(self):
153 """Create an WimConnector instance according to the wim.type"""
154 error_msg = ''
155 account_id = self.wim_account['uuid']
156 try:
157 account = self.persist.get_wim_account_by(
158 uuid=account_id, hide=None) # Credentials need to be available
159 wim = account['wim']
160 mapping = self.persist.query('wim_port_mappings',
161 WHERE={'wim_id': wim['uuid']},
162 error_if_none=False)
163 return CONNECTORS[wim['type']](wim, account, {
164 'service_endpoint_mapping': mapping or []
165 })
166 except DbBaseException as ex:
167 error_msg = ('Error when retrieving WIM account ({})\n'
168 .format(account_id)) + str(ex)
169 self.logger.error(error_msg, exc_info=True)
170 except KeyError as ex:
171 error_msg = ('Unable to find the WIM connector for WIM ({})\n'
172 .format(wim['type'])) + str(ex)
173 self.logger.error(error_msg, exc_info=True)
174 except (WimConnectorError, Exception) as ex:
175 # TODO: Remove the Exception class here when the connector class is
176 # ready
177 error_msg = ('Error when loading WIM connector for WIM ({})\n'
178 .format(wim['type'])) + str(ex)
179 self.logger.error(error_msg, exc_info=True)
180
181 error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
182 .format(account_id, self.wim_account.get('name')))
183 self.logger.warning(error_msg_extra)
184 return FailingConnector(error_msg + '\n' + error_msg_extra)
185
186 @contextmanager
187 def avoid_exceptions(self):
188 """Make a real effort to keep the thread alive, by avoiding the
189 exceptions. They are instead logged as a critical errors.
190 """
191 try:
192 yield
193 except Exception as ex:
194 self.logger.critical("Unexpected exception %s", ex, exc_info=True)
195 sleep(self.RECOVERY_TIME)
196
197 def reload_actions(self, group_limit=100):
198 """Read actions from database and reload them at memory.
199
200 This method will clean and reload the attributes ``refresh_tasks``,
201 ``pending_tasks`` and ``grouped_tasks``
202
203 Attributes:
204 group_limit (int): maximum number of action groups (those that
205 refer to the same ``<item, item_id>``) to be retrieved from the
206 database in each batch.
207 """
208
209 # First we clean the cache to let the garbage collector work
210 self.refresh_tasks = []
211 self.pending_tasks = []
212 self.grouped_tasks = {}
213
214 offset = 0
215
216 while True:
217 # Do things in batches
218 task_groups = self.persist.get_actions_in_groups(
219 self.wim_account['uuid'], item_types=('instance_wim_nets',),
220 group_offset=offset, group_limit=group_limit)
221 offset += (group_limit - 1) # Update for the next batch
222
223 if not task_groups:
224 break
225
226 pending_groups = (g for _, g in task_groups if is_pending_group(g))
227
228 for task_list in pending_groups:
229 with self.avoid_exceptions():
230 self.insert_pending_tasks(filter_pending_tasks(task_list))
231
232 self.logger.debug(
233 'Reloaded wim actions pending: %d refresh: %d',
234 len(self.pending_tasks), len(self.refresh_tasks))
235
236 def insert_pending_tasks(self, task_list):
237 """Insert task in the list of actions being processed"""
238 task_list = [action_from(task, self.logger) for task in task_list]
239
240 for task in task_list:
241 group = task.group_key
242 self.grouped_tasks.setdefault(group, [])
243 # Each task can try to supersede the other ones,
244 # but just DELETE actions will actually do
245 task.supersede(self.grouped_tasks[group])
246 self.grouped_tasks[group].append(task)
247
248 # We need a separate loop so each task can check all the other
249 # ones before deciding
250 for task in task_list:
251 self._insert_task[task.processing](task)
252 self.logger.debug('Insert WIM task: %s (%s): %s %s',
253 task.id, task.status, task.action, task.item)
254
255 def schedule(self, task, when=None, list_name='pending'):
256 """Insert a task in the correct list, respecting the schedule.
257 The refreshing list is ordered by threshold_time (task.process_at)
258 It is assumed that this is called inside this thread
259
260 Arguments:
261 task (Action): object representing the task.
262 This object must implement the ``process`` method and inherit
263 from the ``Action`` class
264 list_name: either 'refresh' or 'pending'
265 when (float): unix time in seconds since as a float number
266 """
267 processing_list = {'refresh': self.refresh_tasks,
268 'pending': self.pending_tasks}[list_name]
269
270 when = when or time()
271 task.process_at = when
272
273 schedule = (t.process_at for t in processing_list)
274 index = len(list(takewhile(lambda moment: moment <= when, schedule)))
275
276 processing_list.insert(index, task)
277 self.logger.debug(
278 'Schedule of %s in "%s" - waiting position: %d (%f)',
279 task.id, list_name, index, task.process_at)
280
281 return task
282
283 def process_list(self, list_name='pending'):
284 """Process actions in batches and reschedule them if necessary"""
285 task_list, handler = {
286 'refresh': (self.refresh_tasks, self._refresh_single),
287 'pending': (self.pending_tasks, self._process_single)}[list_name]
288
289 now = time()
290 waiting = ((i, task) for i, task in enumerate(task_list)
291 if task.process_at is None or task.process_at <= now)
292
293 is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
294 superseded, active = partition(is_superseded, waiting)
295 superseded = [(i, t.save(self.persist)) for i, t in superseded]
296
297 batch = islice(active, self.BATCH)
298 refreshed = [(i, handler(t)) for i, t in batch]
299
300 # Since pop changes the indexes in the list, we need to do it backwards
301 remove = sorted([i for i, _ in chain(refreshed, superseded)])
302 return len([task_list.pop(i) for i in reversed(remove)])
303
304 def _refresh_single(self, task):
305 """Refresh just a single task, and reschedule it if necessary"""
306 now = time()
307
308 result = task.refresh(self.connector, self.persist)
309 self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
310 task.id, task.status, task.action, task.item, result)
311
312 interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
313 self.schedule(task, now + interval, 'refresh')
314
315 return result
316
317 def _process_single(self, task):
318 """Process just a single task, and reschedule it if necessary"""
319 now = time()
320
321 result = task.process(self.connector, self.persist, self.ovim)
322 self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
323 task.id, task.status, task.action, task.item, result)
324
325 if task.action == 'DELETE':
326 del self.grouped_tasks[task.group_key]
327
328 self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
329
330 return result
331
332 def insert_task(self, task):
333 """Send a message to the running thread
334
335 This function is supposed to be called outside of the WIM Thread.
336
337 Arguments:
338 task (str or dict): `"exit"`, `"reload"` or dict representing a
339 task. For more information about the fields in task, please
340 check the Action class.
341 """
342 try:
343 self.task_queue.put(task, False)
344 return None
345 except queue.Full as e:
346 ex = QueueFull(self.name)
347 raise ex from e
348
349 def reload(self):
350 """Send a message to the running thread to reload itself"""
351 self.insert_task('reload')
352
353 def exit(self):
354 """Send a message to the running thread to kill itself"""
355 self.insert_task('exit')
356
357 def run(self):
358 self.logger.debug('Starting: %s', self.name)
359 recovery_time = 0
360 while True:
361 self.on_start()
362 reload_thread = False
363 self.logger.debug('Reloaded: %s', self.name)
364
365 while True:
366 with self.avoid_exceptions():
367 while not self.task_queue.empty():
368 task = self.task_queue.get()
369 if isinstance(task, dict):
370 self.insert_pending_tasks([task])
371 elif isinstance(task, list):
372 self.insert_pending_tasks(task)
373 elif isinstance(task, str):
374 if task == 'exit':
375 self.logger.debug('Finishing: %s', self.name)
376 return 0
377 elif task == 'reload':
378 reload_thread = True
379 break
380 self.task_queue.task_done()
381
382 if reload_thread:
383 break
384
385 if not(self.process_list('pending') +
386 self.process_list('refresh')):
387 sleep(self.WAITING_TIME)
388
389 if isinstance(self.connector, FailingConnector):
390 # Wait sometime to try instantiating the connector
391 # again and restart
392 # Increase the recovery time if restarting is not
393 # working (up to a limit)
394 recovery_time = min(self.MAX_RECOVERY_TIME,
395 recovery_time + self.RECOVERY_TIME)
396 sleep(recovery_time)
397 break
398 else:
399 recovery_time = 0
400
401 self.logger.debug("Finishing")
402
403
404 def is_pending_group(group):
405 return all(task['action'] != 'DELETE' or
406 task['status'] == 'SCHEDULED'
407 for task in group)
408
409
410 def filter_pending_tasks(group):
411 return (t for t in group
412 if (t['status'] == 'SCHEDULED' or
413 t['action'] in ('CREATE', 'FIND')))
414
415
416 def action_from(record, logger=None, mapping=ACTIONS):
417 """Create an Action object from a action record (dict)
418
419 Arguments:
420 mapping (dict): Nested data structure that maps the relationship
421 between action properties and object constructors. This data
422 structure should be a dict with 2 levels of keys: item type and
423 action type. Example::
424 {'wan_link':
425 {'CREATE': WanLinkCreate}
426 ...}
427 ...}
428 record (dict): action information
429
430 Return:
431 (Action.Base): Object representing the action
432 """
433 ensure('item' in record, Invalid('`record` should contain "item"'))
434 ensure('action' in record, Invalid('`record` should contain "action"'))
435
436 try:
437 factory = mapping[record['item']][record['action']]
438 return factory(record, logger=logger)
439 except KeyError as e:
440 ex = UndefinedAction(record['item'], record['action'])
441 raise ex from e