3466193528eb1a9ecbb916faeac5f9df7aa47a62
[osm/RO.git] / RO / osm_ro / wim / wim_thread.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """
36 Thread-based interaction with WIMs. Tasks are stored in the
37 database (vim_wim_actions table) and processed sequentially
38
39 Please check the Action class for information about the content of each action.
40 """
41
42 import logging
43 import threading
44 from contextlib import contextmanager
45 from functools import partial
46 from itertools import islice, chain, takewhile
47 from operator import itemgetter, attrgetter
48 # from sys import exc_info
49 from time import time, sleep
50
51 import queue
52
53 from . import wan_link_actions
54 from ..utils import ensure, partition, pipe
55 from .actions import IGNORE, PENDING, REFRESH
56 from .errors import (
57 DbBaseException,
58 QueueFull,
59 InvalidParameters as Invalid,
60 UndefinedAction,
61 )
62 from .failing_connector import FailingConnector
63 from .sdnconn import SdnConnectorError
64 from .wimconn_fake import FakeConnector
65
66 ACTIONS = {
67 'instance_wim_nets': wan_link_actions.ACTIONS
68 }
69
70 CONNECTORS = {
71 # "odl": wimconn_odl.OdlConnector,
72 "fake": FakeConnector,
73 # Add extra connectors here not managed via plugins
74 }
75
76
77 class WimThread(threading.Thread):
78 """Specialized task queue implementation that runs in an isolated thread.
79
80 Objects of this class have a few methods that are intended to be used
81 outside of the thread:
82
83 - start
84 - insert_task
85 - reload
86 - exit
87
88 All the other methods are used internally to manipulate/process the task
89 queue.
90 """
91 RETRY_SCHEDULED = 10 # 10 seconds
92 REFRESH_BUILD = 10 # 10 seconds
93 REFRESH_ACTIVE = 60 # 1 minute
94 BATCH = 10 # 10 actions per round
95 QUEUE_SIZE = 2000
96 RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover
97 MAX_RECOVERY_TIME = 180
98 WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none
99
100 def __init__(self, persistence, plugins, wim_account, logger=None, ovim=None):
101 """Init a thread.
102
103 Arguments:
104 persistence: Database abstraction layer
105 plugins: dictionary with the vim/sdn plugins
106 wim_account: Record containing wim_account, tenant and wim
107 information.
108 """
109 name = '{}.{}.{}'.format(wim_account['wim']['name'],
110 wim_account['name'], wim_account['uuid'])
111 super(WimThread, self).__init__(name=name)
112 self.plugins = plugins
113 if "rosdn_fake" not in self.plugins:
114 self.plugins["rosdn_fake"] = FakeConnector
115
116 self.name = name
117 self.connector = None
118 self.wim_account = wim_account
119
120 self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
121 self.persist = persistence
122 self.ovim = ovim
123
124 self.task_queue = queue.Queue(self.QUEUE_SIZE)
125
126 self.refresh_tasks = []
127 """Time ordered task list for refreshing the status of WIM nets"""
128
129 self.pending_tasks = []
130 """Time ordered task list for creation, deletion of WIM nets"""
131
132 self.grouped_tasks = {}
133 """ It contains all the creation/deletion pending tasks grouped by
134 its concrete vm, net, etc
135
136 <item><item_id>:
137 - <task1> # e.g. CREATE task
138 <task2> # e.g. DELETE task
139 """
140
141 self._insert_task = {
142 PENDING: partial(self.schedule, list_name='pending'),
143 REFRESH: partial(self.schedule, list_name='refresh'),
144 IGNORE: lambda task, *_, **__: task.save(self.persist)}
145 """Send the task to the right processing queue"""
146
147 def on_start(self):
148 """Run a series of procedures every time the thread (re)starts"""
149 self.connector = self.get_connector()
150 self.reload_actions()
151
152 def get_connector(self):
153 """Create an WimConnector instance according to the wim.type"""
154 error_msg = ''
155 account_id = self.wim_account['uuid']
156 try:
157 account = self.persist.get_wim_account_by(
158 uuid=account_id, hide=None) # Credentials need to be available
159 wim = account['wim']
160 mapping = self.persist.query('wim_port_mappings',
161 WHERE={'wim_id': wim['uuid']},
162 error_if_none=False)
163 if wim["type"] in CONNECTORS:
164 return CONNECTORS[wim['type']](wim, account, {'service_endpoint_mapping': mapping or []})
165 else: # load a plugin
166 return self.plugins["rosdn_" + wim["type"]](
167 wim, account, {'service_endpoint_mapping': mapping or []})
168 except DbBaseException as ex:
169 error_msg = ('Error when retrieving WIM account ({})\n'
170 .format(account_id)) + str(ex)
171 self.logger.error(error_msg, exc_info=True)
172 except KeyError as ex:
173 error_msg = ('Unable to find the WIM connector for WIM ({})\n'
174 .format(wim['type'])) + str(ex)
175 self.logger.error(error_msg)
176 except (SdnConnectorError, Exception) as ex:
177 # TODO: Remove the Exception class here when the connector class is
178 # ready
179 error_msg = ('Error when loading WIM connector for WIM ({})\n'
180 .format(wim['type'])) + str(ex)
181 self.logger.error(error_msg, exc_info=True)
182
183 error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
184 .format(account_id, self.wim_account.get('name')))
185 self.logger.warning(error_msg_extra)
186 return FailingConnector(error_msg + '\n' + error_msg_extra)
187
188 @contextmanager
189 def avoid_exceptions(self):
190 """Make a real effort to keep the thread alive, by avoiding the
191 exceptions. They are instead logged as a critical errors.
192 """
193 try:
194 yield
195 except Exception as ex:
196 self.logger.critical("Unexpected exception %s", ex, exc_info=True)
197 sleep(self.RECOVERY_TIME)
198
199 def reload_actions(self, group_limit=100):
200 """Read actions from database and reload them at memory.
201
202 This method will clean and reload the attributes ``refresh_tasks``,
203 ``pending_tasks`` and ``grouped_tasks``
204
205 Attributes:
206 group_limit (int): maximum number of action groups (those that
207 refer to the same ``<item, item_id>``) to be retrieved from the
208 database in each batch.
209 """
210
211 # First we clean the cache to let the garbage collector work
212 self.refresh_tasks = []
213 self.pending_tasks = []
214 self.grouped_tasks = {}
215
216 offset = 0
217
218 while True:
219 # Do things in batches
220 task_groups = self.persist.get_actions_in_groups(
221 self.wim_account['uuid'], item_types=('instance_wim_nets',),
222 group_offset=offset, group_limit=group_limit)
223 offset += (group_limit - 1) # Update for the next batch
224
225 if not task_groups:
226 break
227
228 pending_groups = (g for _, g in task_groups if is_pending_group(g))
229
230 for task_list in pending_groups:
231 with self.avoid_exceptions():
232 self.insert_pending_tasks(filter_pending_tasks(task_list))
233
234 self.logger.debug(
235 'Reloaded wim actions pending: %d refresh: %d',
236 len(self.pending_tasks), len(self.refresh_tasks))
237
238 def insert_pending_tasks(self, task_list):
239 """Insert task in the list of actions being processed"""
240 task_list = [action_from(task, self.logger) for task in task_list]
241
242 for task in task_list:
243 group = task.group_key
244 self.grouped_tasks.setdefault(group, [])
245 # Each task can try to supersede the other ones,
246 # but just DELETE actions will actually do
247 task.supersede(self.grouped_tasks[group])
248 self.grouped_tasks[group].append(task)
249
250 # We need a separate loop so each task can check all the other
251 # ones before deciding
252 for task in task_list:
253 self._insert_task[task.processing](task)
254 self.logger.debug('Insert WIM task: %s (%s): %s %s',
255 task.id, task.status, task.action, task.item)
256
257 def schedule(self, task, when=None, list_name='pending'):
258 """Insert a task in the correct list, respecting the schedule.
259 The refreshing list is ordered by threshold_time (task.process_at)
260 It is assumed that this is called inside this thread
261
262 Arguments:
263 task (Action): object representing the task.
264 This object must implement the ``process`` method and inherit
265 from the ``Action`` class
266 list_name: either 'refresh' or 'pending'
267 when (float): unix time in seconds since as a float number
268 """
269 processing_list = {'refresh': self.refresh_tasks,
270 'pending': self.pending_tasks}[list_name]
271
272 when = when or time()
273 task.process_at = when
274
275 schedule = (t.process_at for t in processing_list)
276 index = len(list(takewhile(lambda moment: moment <= when, schedule)))
277
278 processing_list.insert(index, task)
279 self.logger.debug(
280 'Schedule of %s in "%s" - waiting position: %d (%f)',
281 task.id, list_name, index, task.process_at)
282
283 return task
284
285 def process_list(self, list_name='pending'):
286 """Process actions in batches and reschedule them if necessary"""
287 task_list, handler = {
288 'refresh': (self.refresh_tasks, self._refresh_single),
289 'pending': (self.pending_tasks, self._process_single)}[list_name]
290
291 now = time()
292 waiting = ((i, task) for i, task in enumerate(task_list)
293 if task.process_at is None or task.process_at <= now)
294
295 is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
296 superseded, active = partition(is_superseded, waiting)
297 superseded = [(i, t.save(self.persist)) for i, t in superseded]
298
299 batch = islice(active, self.BATCH)
300 refreshed = [(i, handler(t)) for i, t in batch]
301
302 # Since pop changes the indexes in the list, we need to do it backwards
303 remove = sorted([i for i, _ in chain(refreshed, superseded)])
304 return len([task_list.pop(i) for i in reversed(remove)])
305
306 def _refresh_single(self, task):
307 """Refresh just a single task, and reschedule it if necessary"""
308 now = time()
309
310 result = task.refresh(self.connector, self.persist)
311 self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
312 task.id, task.status, task.action, task.item, result)
313
314 interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
315 self.schedule(task, now + interval, 'refresh')
316
317 return result
318
319 def _process_single(self, task):
320 """Process just a single task, and reschedule it if necessary"""
321 now = time()
322
323 result = task.process(self.connector, self.persist, self.ovim)
324 self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
325 task.id, task.status, task.action, task.item, result)
326
327 if task.action == 'DELETE':
328 del self.grouped_tasks[task.group_key]
329
330 self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
331
332 return result
333
334 def insert_task(self, task):
335 """Send a message to the running thread
336
337 This function is supposed to be called outside of the WIM Thread.
338
339 Arguments:
340 task (str or dict): `"exit"`, `"reload"` or dict representing a
341 task. For more information about the fields in task, please
342 check the Action class.
343 """
344 try:
345 self.task_queue.put(task, False)
346 return None
347 except queue.Full as e:
348 ex = QueueFull(self.name)
349 raise ex from e
350
351 def reload(self):
352 """Send a message to the running thread to reload itself"""
353 self.insert_task('reload')
354
355 def exit(self):
356 """Send a message to the running thread to kill itself"""
357 self.insert_task('exit')
358
359 def run(self):
360 self.logger.debug('Starting: %s', self.name)
361 recovery_time = 0
362 while True:
363 self.on_start()
364 reload_thread = False
365 self.logger.debug('Reloaded: %s', self.name)
366
367 while True:
368 with self.avoid_exceptions():
369 while not self.task_queue.empty():
370 task = self.task_queue.get()
371 if isinstance(task, dict):
372 self.insert_pending_tasks([task])
373 elif isinstance(task, list):
374 self.insert_pending_tasks(task)
375 elif isinstance(task, str):
376 if task == 'exit':
377 self.logger.debug('Finishing: %s', self.name)
378 return 0
379 elif task == 'reload':
380 reload_thread = True
381 break
382 self.task_queue.task_done()
383
384 if reload_thread:
385 break
386
387 if not(self.process_list('pending') +
388 self.process_list('refresh')):
389 sleep(self.WAITING_TIME)
390
391 if isinstance(self.connector, FailingConnector):
392 # Wait sometime to try instantiating the connector
393 # again and restart
394 # Increase the recovery time if restarting is not
395 # working (up to a limit)
396 recovery_time = min(self.MAX_RECOVERY_TIME,
397 recovery_time + self.RECOVERY_TIME)
398 sleep(recovery_time)
399 break
400 else:
401 recovery_time = 0
402
403 self.logger.debug("Finishing")
404
405
406 def is_pending_group(group):
407 return all(task['action'] != 'DELETE' or
408 task['status'] == 'SCHEDULED'
409 for task in group)
410
411
412 def filter_pending_tasks(group):
413 return (t for t in group
414 if (t['status'] == 'SCHEDULED' or
415 t['action'] in ('CREATE', 'FIND')))
416
417
418 def action_from(record, logger=None, mapping=ACTIONS):
419 """Create an Action object from a action record (dict)
420
421 Arguments:
422 mapping (dict): Nested data structure that maps the relationship
423 between action properties and object constructors. This data
424 structure should be a dict with 2 levels of keys: item type and
425 action type. Example::
426 {'wan_link':
427 {'CREATE': WanLinkCreate}
428 ...}
429 ...}
430 record (dict): action information
431
432 Return:
433 (Action.Base): Object representing the action
434 """
435 ensure('item' in record, Invalid('`record` should contain "item"'))
436 ensure('action' in record, Invalid('`record` should contain "action"'))
437
438 try:
439 factory = mapping[record['item']][record['action']]
440 return factory(record, logger=logger)
441 except KeyError as e:
442 ex = UndefinedAction(record['item'], record['action'])
443 raise ex from e