f8d52bbecd7b8fdb786b4d930900ccf6a01cd84e
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
36 Thread-based interaction with WIMs. Tasks are stored in the
37 database (vim_wim_actions table) and processed sequentially
39 Please check the Action class for information about the content of each action.
44 from contextlib
import contextmanager
45 from functools
import partial
46 from itertools
import islice
, chain
, takewhile
47 from operator
import itemgetter
, attrgetter
48 from sys
import exc_info
49 from time
import time
, sleep
51 from six
import reraise
52 from six
.moves
import queue
54 from . import wan_link_actions
, wimconn_odl
, wimconn_dynpac
# wimconn_tapi
55 from ..utils
import ensure
, partition
, pipe
56 from .actions
import IGNORE
, PENDING
, REFRESH
60 InvalidParameters
as Invalid
,
63 from .failing_connector
import FailingConnector
64 from .wimconn
import WimConnectorError
67 'instance_wim_nets': wan_link_actions
.ACTIONS
71 "odl": wimconn_odl
.OdlConnector
,
72 # "tapi": wimconn_tapi
73 # Add extra connectors here
74 "dynpac": wimconn_dynpac
.DynpacConnector
78 class WimThread(threading
.Thread
):
79 """Specialized task queue implementation that runs in an isolated thread.
81 Objects of this class have a few methods that are intended to be used
82 outside of the thread:
89 All the other methods are used internally to manipulate/process the task
92 RETRY_SCHEDULED
= 10 # 10 seconds
93 REFRESH_BUILD
= 10 # 10 seconds
94 REFRESH_ACTIVE
= 60 # 1 minute
95 BATCH
= 10 # 10 actions per round
97 RECOVERY_TIME
= 5 # Sleep 5s to leave the system some time to recover
98 MAX_RECOVERY_TIME
= 180
99 WAITING_TIME
= 1 # Wait 1s for taks to arrive, when there are none
101 def __init__(self
, persistence
, wim_account
, logger
=None, ovim
=None):
105 persistence: Database abstraction layer
106 wim_account: Record containing wim_account, tenant and wim
109 name
= '{}.{}.{}'.format(wim_account
['wim']['name'],
110 wim_account
['name'], wim_account
['uuid'])
111 super(WimThread
, self
).__init
__(name
=name
)
114 self
.connector
= None
115 self
.wim_account
= wim_account
117 self
.logger
= logger
or logging
.getLogger('openmano.wim.'+self
.name
)
118 self
.persist
= persistence
121 self
.task_queue
= queue
.Queue(self
.QUEUE_SIZE
)
123 self
.refresh_tasks
= []
124 """Time ordered task list for refreshing the status of WIM nets"""
126 self
.pending_tasks
= []
127 """Time ordered task list for creation, deletion of WIM nets"""
129 self
.grouped_tasks
= {}
130 """ It contains all the creation/deletion pending tasks grouped by
131 its concrete vm, net, etc
134 - <task1> # e.g. CREATE task
135 <task2> # e.g. DELETE task
138 self
._insert
_task
= {
139 PENDING
: partial(self
.schedule
, list_name
='pending'),
140 REFRESH
: partial(self
.schedule
, list_name
='refresh'),
141 IGNORE
: lambda task
, *_
, **__
: task
.save(self
.persist
)}
142 """Send the task to the right processing queue"""
145 """Run a series of procedures every time the thread (re)starts"""
146 self
.connector
= self
.get_connector()
147 self
.reload_actions()
149 def get_connector(self
):
150 """Create an WimConnector instance according to the wim.type"""
152 account_id
= self
.wim_account
['uuid']
154 account
= self
.persist
.get_wim_account_by(
155 uuid
=account_id
, hide
=None) # Credentials need to be available
157 mapping
= self
.persist
.query('wim_port_mappings',
158 WHERE
={'wim_id': wim
['uuid']},
160 return CONNECTORS
[wim
['type']](wim
, account
, {
161 'service_endpoint_mapping': mapping
or []
163 except DbBaseException
as ex
:
164 error_msg
= ('Error when retrieving WIM account ({})\n'
165 .format(account_id
)) + str(ex
)
166 self
.logger
.error(error_msg
, exc_info
=True)
167 except KeyError as ex
:
168 error_msg
= ('Unable to find the WIM connector for WIM ({})\n'
169 .format(wim
['type'])) + str(ex
)
170 self
.logger
.error(error_msg
, exc_info
=True)
171 except (WimConnectorError
, Exception) as ex
:
172 # TODO: Remove the Exception class here when the connector class is
174 error_msg
= ('Error when loading WIM connector for WIM ({})\n'
175 .format(wim
['type'])) + str(ex
)
176 self
.logger
.error(error_msg
, exc_info
=True)
178 error_msg_extra
= ('Any task targeting WIM account {} ({}) will fail.'
179 .format(account_id
, self
.wim_account
.get('name')))
180 self
.logger
.warning(error_msg_extra
)
181 return FailingConnector(error_msg
+ '\n' + error_msg_extra
)
184 def avoid_exceptions(self
):
185 """Make a real effort to keep the thread alive, by avoiding the
186 exceptions. They are instead logged as a critical errors.
190 except Exception as ex
:
191 self
.logger
.critical("Unexpected exception %s", ex
, exc_info
=True)
192 sleep(self
.RECOVERY_TIME
)
194 def reload_actions(self
, group_limit
=100):
195 """Read actions from database and reload them at memory.
197 This method will clean and reload the attributes ``refresh_tasks``,
198 ``pending_tasks`` and ``grouped_tasks``
201 group_limit (int): maximum number of action groups (those that
202 refer to the same ``<item, item_id>``) to be retrieved from the
203 database in each batch.
206 # First we clean the cache to let the garbage collector work
207 self
.refresh_tasks
= []
208 self
.pending_tasks
= []
209 self
.grouped_tasks
= {}
214 # Do things in batches
215 task_groups
= self
.persist
.get_actions_in_groups(
216 self
.wim_account
['uuid'], item_types
=('instance_wim_nets',),
217 group_offset
=offset
, group_limit
=group_limit
)
218 offset
+= (group_limit
- 1) # Update for the next batch
223 pending_groups
= (g
for _
, g
in task_groups
if is_pending_group(g
))
225 for task_list
in pending_groups
:
226 with self
.avoid_exceptions():
227 self
.insert_pending_tasks(filter_pending_tasks(task_list
))
230 'Reloaded wim actions pending: %d refresh: %d',
231 len(self
.pending_tasks
), len(self
.refresh_tasks
))
233 def insert_pending_tasks(self
, task_list
):
234 """Insert task in the list of actions being processed"""
235 task_list
= [action_from(task
, self
.logger
) for task
in task_list
]
237 for task
in task_list
:
238 group
= task
.group_key
239 self
.grouped_tasks
.setdefault(group
, [])
240 # Each task can try to supersede the other ones,
241 # but just DELETE actions will actually do
242 task
.supersede(self
.grouped_tasks
[group
])
243 self
.grouped_tasks
[group
].append(task
)
245 # We need a separate loop so each task can check all the other
246 # ones before deciding
247 for task
in task_list
:
248 self
._insert
_task
[task
.processing
](task
)
249 self
.logger
.debug('Insert WIM task: %s (%s): %s %s',
250 task
.id, task
.status
, task
.action
, task
.item
)
252 def schedule(self
, task
, when
=None, list_name
='pending'):
253 """Insert a task in the correct list, respecting the schedule.
254 The refreshing list is ordered by threshold_time (task.process_at)
255 It is assumed that this is called inside this thread
258 task (Action): object representing the task.
259 This object must implement the ``process`` method and inherit
260 from the ``Action`` class
261 list_name: either 'refresh' or 'pending'
262 when (float): unix time in seconds since as a float number
264 processing_list
= {'refresh': self
.refresh_tasks
,
265 'pending': self
.pending_tasks
}[list_name
]
267 when
= when
or time()
268 task
.process_at
= when
270 schedule
= (t
.process_at
for t
in processing_list
)
271 index
= len(list(takewhile(lambda moment
: moment
<= when
, schedule
)))
273 processing_list
.insert(index
, task
)
275 'Schedule of %s in "%s" - waiting position: %d (%f)',
276 task
.id, list_name
, index
, task
.process_at
)
280 def process_list(self
, list_name
='pending'):
281 """Process actions in batches and reschedule them if necessary"""
282 task_list
, handler
= {
283 'refresh': (self
.refresh_tasks
, self
._refresh
_single
),
284 'pending': (self
.pending_tasks
, self
._process
_single
)}[list_name
]
287 waiting
= ((i
, task
) for i
, task
in enumerate(task_list
)
288 if task
.process_at
is None or task
.process_at
<= now
)
290 is_superseded
= pipe(itemgetter(1), attrgetter('is_superseded'))
291 superseded
, active
= partition(is_superseded
, waiting
)
292 superseded
= [(i
, t
.save(self
.persist
)) for i
, t
in superseded
]
294 batch
= islice(active
, self
.BATCH
)
295 refreshed
= [(i
, handler(t
)) for i
, t
in batch
]
297 # Since pop changes the indexes in the list, we need to do it backwards
298 remove
= sorted([i
for i
, _
in chain(refreshed
, superseded
)])
299 return len([task_list
.pop(i
) for i
in reversed(remove
)])
301 def _refresh_single(self
, task
):
302 """Refresh just a single task, and reschedule it if necessary"""
305 result
= task
.refresh(self
.connector
, self
.persist
)
306 self
.logger
.debug('Refreshing WIM task: %s (%s): %s %s => %r',
307 task
.id, task
.status
, task
.action
, task
.item
, result
)
309 interval
= self
.REFRESH_BUILD
if task
.is_build
else self
.REFRESH_ACTIVE
310 self
.schedule(task
, now
+ interval
, 'refresh')
314 def _process_single(self
, task
):
315 """Process just a single task, and reschedule it if necessary"""
318 result
= task
.process(self
.connector
, self
.persist
, self
.ovim
)
319 self
.logger
.debug('Executing WIM task: %s (%s): %s %s => %r',
320 task
.id, task
.status
, task
.action
, task
.item
, result
)
322 if task
.action
== 'DELETE':
323 del self
.grouped_tasks
[task
.group_key
]
325 self
._insert
_task
[task
.processing
](task
, now
+ self
.RETRY_SCHEDULED
)
329 def insert_task(self
, task
):
330 """Send a message to the running thread
332 This function is supposed to be called outside of the WIM Thread.
335 task (str or dict): `"exit"`, `"reload"` or dict representing a
336 task. For more information about the fields in task, please
337 check the Action class.
340 self
.task_queue
.put(task
, False)
343 ex
= QueueFull(self
.name
)
344 reraise(ex
.__class
__, ex
, exc_info()[2])
347 """Send a message to the running thread to reload itself"""
348 self
.insert_task('reload')
351 """Send a message to the running thread to kill itself"""
352 self
.insert_task('exit')
355 self
.logger
.debug('Starting: %s', self
.name
)
359 reload_thread
= False
360 self
.logger
.debug('Reloaded: %s', self
.name
)
363 with self
.avoid_exceptions():
364 while not self
.task_queue
.empty():
365 task
= self
.task_queue
.get()
366 if isinstance(task
, dict):
367 self
.insert_pending_tasks([task
])
368 elif isinstance(task
, list):
369 self
.insert_pending_tasks(task
)
370 elif isinstance(task
, str):
372 self
.logger
.debug('Finishing: %s', self
.name
)
374 elif task
== 'reload':
377 self
.task_queue
.task_done()
382 if not(self
.process_list('pending') +
383 self
.process_list('refresh')):
384 sleep(self
.WAITING_TIME
)
386 if isinstance(self
.connector
, FailingConnector
):
387 # Wait sometime to try instantiating the connector
389 # Increase the recovery time if restarting is not
390 # working (up to a limit)
391 recovery_time
= min(self
.MAX_RECOVERY_TIME
,
392 recovery_time
+ self
.RECOVERY_TIME
)
398 self
.logger
.debug("Finishing")
401 def is_pending_group(group
):
402 return all(task
['action'] != 'DELETE' or
403 task
['status'] == 'SCHEDULED'
407 def filter_pending_tasks(group
):
408 return (t
for t
in group
409 if (t
['status'] == 'SCHEDULED' or
410 t
['action'] in ('CREATE', 'FIND')))
413 def action_from(record
, logger
=None, mapping
=ACTIONS
):
414 """Create an Action object from a action record (dict)
417 mapping (dict): Nested data structure that maps the relationship
418 between action properties and object constructors. This data
419 structure should be a dict with 2 levels of keys: item type and
420 action type. Example::
422 {'CREATE': WanLinkCreate}
425 record (dict): action information
428 (Action.Base): Object representing the action
430 ensure('item' in record
, Invalid('`record` should contain "item"'))
431 ensure('action' in record
, Invalid('`record` should contain "action"'))
434 factory
= mapping
[record
['item']][record
['action']]
435 return factory(record
, logger
=logger
)
437 ex
= UndefinedAction(record
['item'], record
['action'])
438 reraise(ex
.__class
__, ex
, exc_info()[2])