1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
36 Thread-based interaction with WIMs. Tasks are stored in the
37 database (vim_wim_actions table) and processed sequentially
39 Please check the Action class for information about the content of each action.
44 from contextlib
import contextmanager
45 from functools
import partial
46 from itertools
import islice
, chain
, takewhile
47 from operator
import itemgetter
, attrgetter
48 from sys
import exc_info
49 from time
import time
, sleep
51 from six
import reraise
52 from six
.moves
import queue
54 from . import wan_link_actions
, wimconn_odl
# wimconn_tapi
55 from ..utils
import ensure
, partition
, pipe
56 from .actions
import IGNORE
, PENDING
, REFRESH
60 InvalidParameters
as Invalid
,
63 from .failing_connector
import FailingConnector
64 from .wimconn
import WimConnectorError
67 'instance_wim_nets': wan_link_actions
.ACTIONS
71 "odl": wimconn_odl
.OdlConnector
,
72 # "tapi": wimconn_tapi
73 # Add extra connectors here
77 class WimThread(threading
.Thread
):
78 """Specialized task queue implementation that runs in an isolated thread.
80 Objects of this class have a few methods that are intended to be used
81 outside of the thread:
88 All the other methods are used internally to manipulate/process the task
91 RETRY_SCHEDULED
= 10 # 10 seconds
92 REFRESH_BUILD
= 10 # 10 seconds
93 REFRESH_ACTIVE
= 60 # 1 minute
94 BATCH
= 10 # 10 actions per round
96 RECOVERY_TIME
= 5 # Sleep 5s to leave the system some time to recover
97 MAX_RECOVERY_TIME
= 180
98 WAITING_TIME
= 1 # Wait 1s for taks to arrive, when there are none
100 def __init__(self
, persistence
, wim_account
, logger
=None, ovim
=None):
104 persistence: Database abstraction layer
105 wim_account: Record containing wim_account, tenant and wim
108 name
= '{}.{}.{}'.format(wim_account
['wim']['name'],
109 wim_account
['name'], wim_account
['uuid'])
110 super(WimThread
, self
).__init
__(name
=name
)
113 self
.connector
= None
114 self
.wim_account
= wim_account
116 self
.logger
= logger
or logging
.getLogger('openmano.wim.'+self
.name
)
117 self
.persist
= persistence
120 self
.task_queue
= queue
.Queue(self
.QUEUE_SIZE
)
122 self
.refresh_tasks
= []
123 """Time ordered task list for refreshing the status of WIM nets"""
125 self
.pending_tasks
= []
126 """Time ordered task list for creation, deletion of WIM nets"""
128 self
.grouped_tasks
= {}
129 """ It contains all the creation/deletion pending tasks grouped by
130 its concrete vm, net, etc
133 - <task1> # e.g. CREATE task
134 <task2> # e.g. DELETE task
137 self
._insert
_task
= {
138 PENDING
: partial(self
.schedule
, list_name
='pending'),
139 REFRESH
: partial(self
.schedule
, list_name
='refresh'),
140 IGNORE
: lambda task
, *_
, **__
: task
.save(self
.persist
)}
141 """Send the task to the right processing queue"""
144 """Run a series of procedures every time the thread (re)starts"""
145 self
.connector
= self
.get_connector()
146 self
.reload_actions()
148 def get_connector(self
):
149 """Create an WimConnector instance according to the wim.type"""
151 account_id
= self
.wim_account
['uuid']
153 account
= self
.persist
.get_wim_account_by(
154 uuid
=account_id
, hide
=None) # Credentials need to be available
156 mapping
= self
.persist
.query('wim_port_mappings',
157 WHERE
={'wim_id': wim
['uuid']},
159 return CONNECTORS
[wim
['type']](wim
, account
, {
160 'service_endpoint_mapping': mapping
or []
162 except DbBaseException
as ex
:
163 error_msg
= ('Error when retrieving WIM account ({})\n'
164 .format(account_id
)) + str(ex
)
165 self
.logger
.error(error_msg
, exc_info
=True)
166 except KeyError as ex
:
167 error_msg
= ('Unable to find the WIM connector for WIM ({})\n'
168 .format(wim
['type'])) + str(ex
)
169 self
.logger
.error(error_msg
, exc_info
=True)
170 except (WimConnectorError
, Exception) as ex
:
171 # TODO: Remove the Exception class here when the connector class is
173 error_msg
= ('Error when loading WIM connector for WIM ({})\n'
174 .format(wim
['type'])) + str(ex
)
175 self
.logger
.error(error_msg
, exc_info
=True)
177 error_msg_extra
= ('Any task targeting WIM account {} ({}) will fail.'
178 .format(account_id
, self
.wim_account
.get('name')))
179 self
.logger
.warning(error_msg_extra
)
180 return FailingConnector(error_msg
+ '\n' + error_msg_extra
)
183 def avoid_exceptions(self
):
184 """Make a real effort to keep the thread alive, by avoiding the
185 exceptions. They are instead logged as a critical errors.
189 except Exception as ex
:
190 self
.logger
.critical("Unexpected exception %s", ex
, exc_info
=True)
191 sleep(self
.RECOVERY_TIME
)
193 def reload_actions(self
, group_limit
=100):
194 """Read actions from database and reload them at memory.
196 This method will clean and reload the attributes ``refresh_tasks``,
197 ``pending_tasks`` and ``grouped_tasks``
200 group_limit (int): maximum number of action groups (those that
201 refer to the same ``<item, item_id>``) to be retrieved from the
202 database in each batch.
205 # First we clean the cache to let the garbage collector work
206 self
.refresh_tasks
= []
207 self
.pending_tasks
= []
208 self
.grouped_tasks
= {}
213 # Do things in batches
214 task_groups
= self
.persist
.get_actions_in_groups(
215 self
.wim_account
['uuid'], item_types
=('instance_wim_nets',),
216 group_offset
=offset
, group_limit
=group_limit
)
217 offset
+= (group_limit
- 1) # Update for the next batch
222 pending_groups
= (g
for _
, g
in task_groups
if is_pending_group(g
))
224 for task_list
in pending_groups
:
225 with self
.avoid_exceptions():
226 self
.insert_pending_tasks(filter_pending_tasks(task_list
))
229 'Reloaded wim actions pending: %d refresh: %d',
230 len(self
.pending_tasks
), len(self
.refresh_tasks
))
232 def insert_pending_tasks(self
, task_list
):
233 """Insert task in the list of actions being processed"""
234 task_list
= [action_from(task
, self
.logger
) for task
in task_list
]
236 for task
in task_list
:
237 group
= task
.group_key
238 self
.grouped_tasks
.setdefault(group
, [])
239 # Each task can try to supersede the other ones,
240 # but just DELETE actions will actually do
241 task
.supersede(self
.grouped_tasks
[group
])
242 self
.grouped_tasks
[group
].append(task
)
244 # We need a separate loop so each task can check all the other
245 # ones before deciding
246 for task
in task_list
:
247 self
._insert
_task
[task
.processing
](task
)
248 self
.logger
.debug('Insert WIM task: %s (%s): %s %s',
249 task
.id, task
.status
, task
.action
, task
.item
)
251 def schedule(self
, task
, when
=None, list_name
='pending'):
252 """Insert a task in the correct list, respecting the schedule.
253 The refreshing list is ordered by threshold_time (task.process_at)
254 It is assumed that this is called inside this thread
257 task (Action): object representing the task.
258 This object must implement the ``process`` method and inherit
259 from the ``Action`` class
260 list_name: either 'refresh' or 'pending'
261 when (float): unix time in seconds since as a float number
263 processing_list
= {'refresh': self
.refresh_tasks
,
264 'pending': self
.pending_tasks
}[list_name
]
266 when
= when
or time()
267 task
.process_at
= when
269 schedule
= (t
.process_at
for t
in processing_list
)
270 index
= len(list(takewhile(lambda moment
: moment
<= when
, schedule
)))
272 processing_list
.insert(index
, task
)
274 'Schedule of %s in "%s" - waiting position: %d (%f)',
275 task
.id, list_name
, index
, task
.process_at
)
279 def process_list(self
, list_name
='pending'):
280 """Process actions in batches and reschedule them if necessary"""
281 task_list
, handler
= {
282 'refresh': (self
.refresh_tasks
, self
._refresh
_single
),
283 'pending': (self
.pending_tasks
, self
._process
_single
)}[list_name
]
286 waiting
= ((i
, task
) for i
, task
in enumerate(task_list
)
287 if task
.process_at
is None or task
.process_at
<= now
)
289 is_superseded
= pipe(itemgetter(1), attrgetter('is_superseded'))
290 superseded
, active
= partition(is_superseded
, waiting
)
291 superseded
= [(i
, t
.save(self
.persist
)) for i
, t
in superseded
]
293 batch
= islice(active
, self
.BATCH
)
294 refreshed
= [(i
, handler(t
)) for i
, t
in batch
]
296 # Since pop changes the indexes in the list, we need to do it backwards
297 remove
= sorted([i
for i
, _
in chain(refreshed
, superseded
)])
298 return len([task_list
.pop(i
) for i
in reversed(remove
)])
300 def _refresh_single(self
, task
):
301 """Refresh just a single task, and reschedule it if necessary"""
304 result
= task
.refresh(self
.connector
, self
.persist
)
305 self
.logger
.debug('Refreshing WIM task: %s (%s): %s %s => %r',
306 task
.id, task
.status
, task
.action
, task
.item
, result
)
308 interval
= self
.REFRESH_BUILD
if task
.is_build
else self
.REFRESH_ACTIVE
309 self
.schedule(task
, now
+ interval
, 'refresh')
313 def _process_single(self
, task
):
314 """Process just a single task, and reschedule it if necessary"""
317 result
= task
.process(self
.connector
, self
.persist
, self
.ovim
)
318 self
.logger
.debug('Executing WIM task: %s (%s): %s %s => %r',
319 task
.id, task
.status
, task
.action
, task
.item
, result
)
321 if task
.action
== 'DELETE':
322 del self
.grouped_tasks
[task
.group_key
]
324 self
._insert
_task
[task
.processing
](task
, now
+ self
.RETRY_SCHEDULED
)
328 def insert_task(self
, task
):
329 """Send a message to the running thread
331 This function is supposed to be called outside of the WIM Thread.
334 task (str or dict): `"exit"`, `"reload"` or dict representing a
335 task. For more information about the fields in task, please
336 check the Action class.
339 self
.task_queue
.put(task
, False)
342 ex
= QueueFull(self
.name
)
343 reraise(ex
.__class
__, ex
, exc_info()[2])
346 """Send a message to the running thread to reload itself"""
347 self
.insert_task('reload')
350 """Send a message to the running thread to kill itself"""
351 self
.insert_task('exit')
354 self
.logger
.debug('Starting: %s', self
.name
)
358 reload_thread
= False
359 self
.logger
.debug('Reloaded: %s', self
.name
)
362 with self
.avoid_exceptions():
363 while not self
.task_queue
.empty():
364 task
= self
.task_queue
.get()
365 if isinstance(task
, dict):
366 self
.insert_pending_tasks([task
])
367 elif isinstance(task
, list):
368 self
.insert_pending_tasks(task
)
369 elif isinstance(task
, str):
371 self
.logger
.debug('Finishing: %s', self
.name
)
373 elif task
== 'reload':
376 self
.task_queue
.task_done()
381 if not(self
.process_list('pending') +
382 self
.process_list('refresh')):
383 sleep(self
.WAITING_TIME
)
385 if isinstance(self
.connector
, FailingConnector
):
386 # Wait sometime to try instantiating the connector
388 # Increase the recovery time if restarting is not
389 # working (up to a limit)
390 recovery_time
= min(self
.MAX_RECOVERY_TIME
,
391 recovery_time
+ self
.RECOVERY_TIME
)
397 self
.logger
.debug("Finishing")
400 def is_pending_group(group
):
401 return all(task
['action'] != 'DELETE' or
402 task
['status'] == 'SCHEDULED'
406 def filter_pending_tasks(group
):
407 return (t
for t
in group
408 if (t
['status'] == 'SCHEDULED' or
409 t
['action'] in ('CREATE', 'FIND')))
412 def action_from(record
, logger
=None, mapping
=ACTIONS
):
413 """Create an Action object from a action record (dict)
416 mapping (dict): Nested data structure that maps the relationship
417 between action properties and object constructors. This data
418 structure should be a dict with 2 levels of keys: item type and
419 action type. Example::
421 {'CREATE': WanLinkCreate}
424 record (dict): action information
427 (Action.Base): Object representing the action
429 ensure('item' in record
, Invalid('`record` should contain "item"'))
430 ensure('action' in record
, Invalid('`record` should contain "action"'))
433 factory
= mapping
[record
['item']][record
['action']]
434 return factory(record
, logger
=logger
)
436 ex
= UndefinedAction(record
['item'], record
['action'])
437 reraise(ex
.__class
__, ex
, exc_info()[2])