1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
43 No http request handling/direct interaction with the database should be present
48 from contextlib
import contextmanager
49 from itertools
import groupby
50 from operator
import itemgetter
51 from sys
import exc_info
52 from uuid
import uuid4
54 from six
import reraise
56 from ..utils
import remove_none_items
57 from .actions
import Action
60 NoWimConnectedToDatacenters
,
61 UnexpectedDatabaseError
,
64 from .wim_thread
import WimThread
67 class WimEngine(object):
68 """Logic supporting the establishment of WAN links when NS spans across
69 different datacenters.
71 def __init__(self
, persistence
, logger
=None, ovim
=None):
72 self
.persist
= persistence
73 self
.logger
= logger
or logging
.getLogger('openmano.wim.engine')
78 def create_wim(self
, properties
):
79 """Create a new wim record according to the properties
81 Please check the wim schema to have more information about
84 The ``config`` property might contain a ``wim_port_mapping`` dict,
85 In this case, the method ``create_wim_port_mappings`` will be
86 automatically invoked.
89 str: uuid of the newly created WIM record
91 port_mapping
= ((properties
.get('config', {}) or {})
92 .pop('wim_port_mapping', {}))
93 uuid
= self
.persist
.create_wim(properties
)
97 self
.create_wim_port_mappings(uuid
, port_mapping
)
98 except DbBaseException
:
100 self
.delete_wim(uuid
)
101 ex
= UnexpectedDatabaseError('Failed to create port mappings'
102 'Rolling back wim creation')
103 self
.logger
.exception(str(ex
))
104 reraise(ex
.__class
__, ex
, exc_info()[2])
108 def get_wim(self
, uuid_or_name
, tenant_id
=None):
109 """Retrieve existing WIM record by name or id.
111 If ``tenant_id`` is specified, the query will be
112 limited to the WIM associated to the given tenant.
114 # Since it is a pure DB operation, we can delegate it directly
115 return self
.persist
.get_wim(uuid_or_name
, tenant_id
)
117 def update_wim(self
, uuid_or_name
, properties
):
118 """Edit an existing WIM record.
120 ``properties`` is a dictionary with the properties being changed,
121 if a property is not present, the old value will be preserved
123 Similarly to create_wim, the ``config`` property might contain a
124 ``wim_port_mapping`` dict, In this case, port mappings will be
125 automatically updated.
127 port_mapping
= ((properties
.get('config', {}) or {})
128 .pop('wim_port_mapping', {}))
129 orig_props
= self
.persist
.get_by_name_or_uuid('wims', uuid_or_name
)
130 uuid
= orig_props
['uuid']
132 response
= self
.persist
.update_wim(uuid
, properties
)
136 # It is very complex to diff and update individually all the
137 # port mappings. Therefore a practical approach is just delete
138 # and create it again.
139 self
.persist
.delete_wim_port_mappings(uuid
)
140 # ^ Calling from persistence avoid reloading twice the thread
141 self
.create_wim_port_mappings(uuid
, port_mapping
)
142 except DbBaseException
:
144 self
.update_wim(uuid_or_name
, orig_props
)
145 ex
= UnexpectedDatabaseError('Failed to update port mappings'
146 'Rolling back wim updates\n')
147 self
.logger
.exception(str(ex
))
148 reraise(ex
.__class
__, ex
, exc_info()[2])
152 def delete_wim(self
, uuid_or_name
):
153 """Kill the corresponding wim threads and erase the WIM record"""
154 # Theoretically, we can rely on the database to drop the wim_accounts
155 # automatically, since we have configures 'ON CASCADE DELETE'.
156 # However, use use `delete_wim_accounts` to kill all the running
158 self
.delete_wim_accounts(uuid_or_name
)
159 return self
.persist
.delete_wim(uuid_or_name
)
161 def create_wim_account(self
, wim
, tenant
, properties
):
162 """Create an account that associates a tenant to a WIM.
164 As a side effect this function will spawn a new thread
167 wim (str): name or uuid of the WIM related to the account being
169 tenant (str): name or uuid of the nfvo tenant to which the account
171 properties (dict): properties of the account
172 (eg. username, password, ...)
177 uuid
= self
.persist
.create_wim_account(wim
, tenant
, properties
)
178 account
= self
.persist
.get_wim_account_by(uuid
=uuid
)
179 # ^ We need to use get_wim_account_by here, since this methods returns
180 # all the associations, and we need the wim to create the thread
181 self
._spawn
_thread
(account
)
184 def _update_single_wim_account(self
, account
, properties
):
185 """Update WIM Account, taking care to reload the corresponding thread
188 account (dict): Current account record
189 properties (dict): Properties to be updated
194 account
= self
.persist
.update_wim_account(account
['uuid'], properties
)
195 self
.threads
[account
['uuid']].reload()
198 def update_wim_accounts(self
, wim
, tenant
, properties
):
199 """Update all the accounts related to a WIM and a tenant,
200 thanking care of reloading threads.
203 wim (str): uuid or name of a WIM record
204 tenant (str): uuid or name of a NFVO tenant record
205 properties (dict): attributes with values to be updated
208 list: Records that were updated
210 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
)
211 return [self
._update
_single
_wim
_account
(account
, properties
)
212 for account
in accounts
]
214 def _delete_single_wim_account(self
, account
):
215 """Delete WIM Account, taking care to remove the corresponding thread
216 and delete the internal WIM account, if it was automatically generated.
219 account (dict): Current account record
220 properties (dict): Properties to be updated
223 dict: current record (same as input)
225 self
.persist
.delete_wim_account(account
['uuid'])
227 if account
['uuid'] not in self
.threads
:
228 raise WimAccountNotActive(
229 'Requests send to the WIM Account %s are not currently '
230 'being processed.', account
['uuid'])
232 self
.threads
[account
['uuid']].exit()
233 del self
.threads
[account
['uuid']]
237 def delete_wim_accounts(self
, wim
, tenant
=None, **kwargs
):
238 """Delete all the accounts related to a WIM (and a tenant),
239 thanking care of threads and internal WIM accounts.
242 wim (str): uuid or name of a WIM record
243 tenant (str): uuid or name of a NFVO tenant record
246 list: Records that were deleted
248 kwargs
.setdefault('error_if_none', False)
249 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
, **kwargs
)
250 return [self
._delete
_single
_wim
_account
(a
) for a
in accounts
]
252 def _reload_wim_threads(self
, wim_id
):
253 for thread
in self
.threads
.values():
254 if thread
.wim_account
['wim_id'] == wim_id
:
257 def create_wim_port_mappings(self
, wim
, properties
, tenant
=None):
258 """Store information about port mappings from Database"""
259 # TODO: Review tenants... WIMs can exist across different tenants,
260 # and the port_mappings are a WIM property, not a wim_account
261 # property, so the concepts are not related
262 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
263 result
= self
.persist
.create_wim_port_mappings(wim
, properties
, tenant
)
264 self
._reload
_wim
_threads
(wim
['uuid'])
267 def get_wim_port_mappings(self
, wim
):
268 """Retrive information about port mappings from Database"""
269 return self
.persist
.get_wim_port_mappings(wim
)
271 def delete_wim_port_mappings(self
, wim
):
272 """Erase the port mapping records associated with the WIM"""
273 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
274 message
= self
.persist
.delete_wim_port_mappings(wim
['uuid'])
275 self
._reload
_wim
_threads
(wim
['uuid'])
278 def find_common_wims(self
, datacenter_ids
, tenant
):
279 """Find WIMs that are common to all datacenters listed"""
280 mappings
= self
.persist
.get_wim_port_mappings(
281 datacenter
=datacenter_ids
, tenant
=tenant
, error_if_none
=False)
283 wim_id_of
= itemgetter('wim_id')
284 sorted_mappings
= sorted(mappings
, key
=wim_id_of
) # needed by groupby
285 grouped_mappings
= groupby(sorted_mappings
, key
=wim_id_of
)
286 mapped_datacenters
= {
287 wim_id
: [m
['datacenter_id'] for m
in mappings
]
288 for wim_id
, mappings
in grouped_mappings
293 for wim_id
, connected_datacenters
in mapped_datacenters
.items()
294 if set(connected_datacenters
) >= set(datacenter_ids
)
297 def find_common_wim(self
, datacenter_ids
, tenant
):
298 """Find a single WIM that is able to connect all the datacenters
302 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
305 suitable_wim_ids
= self
.find_common_wims(datacenter_ids
, tenant
)
307 if not suitable_wim_ids
:
308 raise NoWimConnectedToDatacenters(datacenter_ids
)
310 # TODO: use a criteria to determine which WIM is going to be used,
311 # instead of always using the first one (strategy pattern can be
313 return suitable_wim_ids
[0]
315 def find_suitable_wim_account(self
, datacenter_ids
, tenant
):
316 """Find a WIM account that is able to connect all the datacenters
320 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
321 that need to be connected.
322 tenant (str): UUID of the OSM tenant
325 object with the WIM account that is able to connect all the
328 wim_id
= self
.find_common_wim(datacenter_ids
, tenant
)
329 return self
.persist
.get_wim_account_by(wim_id
, tenant
)
331 def derive_wan_link(self
,
333 instance_scenario_id
, sce_net_id
,
335 """Create a instance_wim_nets record for the given information"""
336 if sce_net_id
in wim_usage
:
337 account_id
= wim_usage
[sce_net_id
]
338 account
= self
.persist
.get_wim_account_by(uuid
=account_id
)
339 wim_id
= account
['wim_id']
341 datacenters
= [n
['datacenter_id'] for n
in networks
]
342 wim_id
= self
.find_common_wim(datacenters
, tenant
)
343 account
= self
.persist
.get_wim_account_by(wim_id
, tenant
)
346 'uuid': str(uuid4()),
347 'instance_scenario_id': instance_scenario_id
,
348 'sce_net_id': sce_net_id
,
350 'wim_account_id': account
['uuid']
353 def derive_wan_links(self
, wim_usage
, networks
, tenant
=None):
354 """Discover and return what are the wan_links that have to be created
355 considering a set of networks (VLDs) required for a scenario instance
359 wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
360 networks(list): Dicts containing the information about the networks
361 that will be instantiated to materialize a Network Service
363 Corresponding to the ``instance_net`` record.
366 list: list of WAN links to be written to the database
368 # Group networks by key=(instance_scenario_id, sce_net_id)
369 filtered
= _filter_multi_vim(networks
)
370 grouped_networks
= _group_networks(filtered
)
371 datacenters_per_group
= _count_datacenters(grouped_networks
)
372 # For each group count the number of networks. If greater then 1,
373 # we have to create a wan link connecting them.
375 for key
, counter
in datacenters_per_group
377 # Keys are tuples(instance_scenario_id, sce_net_id)
379 self
.derive_wan_link(wim_usage
,
380 key
[0], key
[1], grouped_networks
[key
], tenant
)
381 for key
in wan_groups
if wim_usage
.get(key
[1]) is not False
384 def create_action(self
, wan_link
):
385 """For a single wan_link create the corresponding create action"""
388 'status': 'SCHEDULED',
389 'item': 'instance_wim_nets',
390 'item_id': wan_link
['uuid'],
391 'wim_account_id': wan_link
['wim_account_id']
394 def create_actions(self
, wan_links
):
395 """For an array of wan_links, create all the corresponding actions"""
396 return [self
.create_action(l
) for l
in wan_links
]
398 def delete_action(self
, wan_link
):
399 """For a single wan_link create the corresponding create action"""
402 'status': 'SCHEDULED',
403 'item': 'instance_wim_nets',
404 'item_id': wan_link
['uuid'],
405 'wim_account_id': wan_link
['wim_account_id'],
406 'extra': json
.dumps({'wan_link': wan_link
})
407 # We serialize and cache the wan_link here, because it can be
408 # deleted during the delete process
411 def delete_actions(self
, wan_links
=(), instance_scenario_id
=None):
412 """Given a Instance Scenario, remove all the WAN Links created in the
414 if instance_scenario_id
:
415 wan_links
= self
.persist
.get_wan_links(
416 instance_scenario_id
=instance_scenario_id
)
417 return [self
.delete_action(l
) for l
in wan_links
]
419 def incorporate_actions(self
, wim_actions
, instance_action
):
420 """Make the instance action consider new WIM actions and make the WIM
421 actions aware of the instance action
423 current
= instance_action
.setdefault('number_tasks', 0)
424 for i
, action
in enumerate(wim_actions
):
425 action
['task_index'] = current
+ i
426 action
['instance_action_id'] = instance_action
['uuid']
427 instance_action
['number_tasks'] += len(wim_actions
)
429 return wim_actions
, instance_action
431 def dispatch(self
, tasks
):
432 """Enqueue a list of tasks for further processing.
434 This function is supposed to be called outside from the WIM Thread.
437 if task
['wim_account_id'] not in self
.threads
:
438 error_msg
= str(WimAccountNotActive(
439 'Requests send to the WIM Account %s are not currently '
440 'being processed.', task
['wim_account_id']))
441 Action(task
, self
.logger
).fail(self
.persist
, error_msg
)
442 self
.persist
.update_wan_link(task
['item_id'],
444 'error_msg': error_msg
})
445 self
.logger
.error('Task %s %s %s not dispatched.\n%s',
446 task
['action'], task
['item'],
447 task
['instance_account_id'], error_msg
)
449 self
.threads
[task
['wim_account_id']].insert_task(task
)
450 self
.logger
.debug('Task %s %s %s dispatched',
451 task
['action'], task
['item'],
452 task
['instance_action_id'])
454 def _spawn_thread(self
, wim_account
):
455 """Spawn a WIM thread
458 wim_account (dict): WIM information (usually persisted)
459 The `wim` field is required to be set with a valid WIM record
460 inside the `wim_account` dict
463 threading.Thread: Thread object
467 thread
= WimThread(self
.persist
, wim_account
, ovim
=self
.ovim
)
468 self
.threads
[wim_account
['uuid']] = thread
471 self
.logger
.error('Error when spawning WIM thread for %s',
472 wim_account
['uuid'], exc_info
=True)
476 def start_threads(self
):
477 """Start the threads responsible for processing WIM Actions"""
478 accounts
= self
.persist
.get_wim_accounts(error_if_none
=False)
479 self
.threads
= remove_none_items(
480 {a
['uuid']: self
._spawn
_thread
(a
) for a
in accounts
})
482 def stop_threads(self
):
483 """Stop the threads responsible for processing WIM Actions"""
484 for uuid
, thread
in self
.threads
.items():
486 del self
.threads
[uuid
]
489 def threads_running(self
):
490 """Ensure no thread will be left running"""
491 # This method is particularly important for testing :)
499 def _filter_multi_vim(networks
):
500 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
501 return [n
for n
in networks
if 'sce_net_id' in n
and n
['sce_net_id']]
504 def _group_networks(networks
):
505 """Group networks that correspond to the same instance_scenario_id and
506 sce_net_id (NSR and VLD).
509 networks(list): Dicts containing the information about the networks
510 that will be instantiated to materialize a Network Service
513 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
514 are list of networks.
516 criteria
= itemgetter('instance_scenario_id', 'sce_net_id')
518 networks
= sorted(networks
, key
=criteria
)
519 return {k
: list(v
) for k
, v
in groupby(networks
, key
=criteria
)}
522 def _count_datacenters(grouped_networks
):
523 """Count the number of datacenters in each group of networks
526 list of tuples: the first element is the group key, while the second
527 element is the number of datacenters in each group.
529 return ((key
, len(set(n
['datacenter_id'] for n
in group
)))
530 for key
, group
in grouped_networks
.items())