1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
43 No http request handling/direct interaction with the database should be present
48 from contextlib
import contextmanager
49 from itertools
import groupby
50 from operator
import itemgetter
51 from sys
import exc_info
52 from uuid
import uuid4
54 from ..utils
import remove_none_items
55 from .actions
import Action
58 NoWimConnectedToDatacenters
,
59 UnexpectedDatabaseError
,
62 from .wim_thread
import WimThread
65 class WimEngine(object):
66 """Logic supporting the establishment of WAN links when NS spans across
67 different datacenters.
69 def __init__(self
, persistence
, logger
=None, ovim
=None):
70 self
.persist
= persistence
71 self
.logger
= logger
or logging
.getLogger('openmano.wim.engine')
76 def create_wim(self
, properties
):
77 """Create a new wim record according to the properties
79 Please check the wim schema to have more information about
82 The ``config`` property might contain a ``wim_port_mapping`` dict,
83 In this case, the method ``create_wim_port_mappings`` will be
84 automatically invoked.
87 str: uuid of the newly created WIM record
89 port_mapping
= ((properties
.get('config', {}) or {})
90 .pop('wim_port_mapping', {}))
91 uuid
= self
.persist
.create_wim(properties
)
95 self
.create_wim_port_mappings(uuid
, port_mapping
)
96 except DbBaseException
as e
:
99 ex
= UnexpectedDatabaseError('Failed to create port mappings'
100 'Rolling back wim creation')
101 self
.logger
.exception(str(ex
))
106 def get_wim(self
, uuid_or_name
, tenant_id
=None):
107 """Retrieve existing WIM record by name or id.
109 If ``tenant_id`` is specified, the query will be
110 limited to the WIM associated to the given tenant.
112 # Since it is a pure DB operation, we can delegate it directly
113 return self
.persist
.get_wim(uuid_or_name
, tenant_id
)
115 def update_wim(self
, uuid_or_name
, properties
):
116 """Edit an existing WIM record.
118 ``properties`` is a dictionary with the properties being changed,
119 if a property is not present, the old value will be preserved
121 Similarly to create_wim, the ``config`` property might contain a
122 ``wim_port_mapping`` dict, In this case, port mappings will be
123 automatically updated.
125 port_mapping
= ((properties
.get('config', {}) or {})
126 .pop('wim_port_mapping', {}))
127 orig_props
= self
.persist
.get_by_name_or_uuid('wims', uuid_or_name
)
128 uuid
= orig_props
['uuid']
130 response
= self
.persist
.update_wim(uuid
, properties
)
134 # It is very complex to diff and update individually all the
135 # port mappings. Therefore a practical approach is just delete
136 # and create it again.
137 self
.persist
.delete_wim_port_mappings(uuid
)
138 # ^ Calling from persistence avoid reloading twice the thread
139 self
.create_wim_port_mappings(uuid
, port_mapping
)
140 except DbBaseException
as e
:
142 self
.update_wim(uuid_or_name
, orig_props
)
143 ex
= UnexpectedDatabaseError('Failed to update port mappings'
144 'Rolling back wim updates\n')
145 self
.logger
.exception(str(ex
))
150 def delete_wim(self
, uuid_or_name
):
151 """Kill the corresponding wim threads and erase the WIM record"""
152 # Theoretically, we can rely on the database to drop the wim_accounts
153 # automatically, since we have configures 'ON CASCADE DELETE'.
154 # However, use use `delete_wim_accounts` to kill all the running
156 self
.delete_wim_accounts(uuid_or_name
)
157 return self
.persist
.delete_wim(uuid_or_name
)
159 def create_wim_account(self
, wim
, tenant
, properties
):
160 """Create an account that associates a tenant to a WIM.
162 As a side effect this function will spawn a new thread
165 wim (str): name or uuid of the WIM related to the account being
167 tenant (str): name or uuid of the nfvo tenant to which the account
169 properties (dict): properties of the account
170 (eg. username, password, ...)
175 uuid
= self
.persist
.create_wim_account(wim
, tenant
, properties
)
176 account
= self
.persist
.get_wim_account_by(uuid
=uuid
)
177 # ^ We need to use get_wim_account_by here, since this methods returns
178 # all the associations, and we need the wim to create the thread
179 self
._spawn
_thread
(account
)
182 def _update_single_wim_account(self
, account
, properties
):
183 """Update WIM Account, taking care to reload the corresponding thread
186 account (dict): Current account record
187 properties (dict): Properties to be updated
192 account
= self
.persist
.update_wim_account(account
['uuid'], properties
)
193 self
.threads
[account
['uuid']].reload()
196 def update_wim_accounts(self
, wim
, tenant
, properties
):
197 """Update all the accounts related to a WIM and a tenant,
198 thanking care of reloading threads.
201 wim (str): uuid or name of a WIM record
202 tenant (str): uuid or name of a NFVO tenant record
203 properties (dict): attributes with values to be updated
206 list: Records that were updated
208 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
)
209 return [self
._update
_single
_wim
_account
(account
, properties
)
210 for account
in accounts
]
212 def _delete_single_wim_account(self
, account
):
213 """Delete WIM Account, taking care to remove the corresponding thread
214 and delete the internal WIM account, if it was automatically generated.
217 account (dict): Current account record
218 properties (dict): Properties to be updated
221 dict: current record (same as input)
223 self
.persist
.delete_wim_account(account
['uuid'])
225 if account
['uuid'] not in self
.threads
:
226 raise WimAccountNotActive(
227 'Requests send to the WIM Account %s are not currently '
228 'being processed.', account
['uuid'])
230 self
.threads
[account
['uuid']].exit()
231 del self
.threads
[account
['uuid']]
235 def delete_wim_accounts(self
, wim
, tenant
=None, **kwargs
):
236 """Delete all the accounts related to a WIM (and a tenant),
237 thanking care of threads and internal WIM accounts.
240 wim (str): uuid or name of a WIM record
241 tenant (str): uuid or name of a NFVO tenant record
244 list: Records that were deleted
246 kwargs
.setdefault('error_if_none', False)
247 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
, **kwargs
)
248 return [self
._delete
_single
_wim
_account
(a
) for a
in accounts
]
250 def _reload_wim_threads(self
, wim_id
):
251 for thread
in self
.threads
.values():
252 if thread
.wim_account
['wim_id'] == wim_id
:
255 def create_wim_port_mappings(self
, wim
, properties
, tenant
=None):
256 """Store information about port mappings from Database"""
257 # TODO: Review tenants... WIMs can exist across different tenants,
258 # and the port_mappings are a WIM property, not a wim_account
259 # property, so the concepts are not related
260 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
261 result
= self
.persist
.create_wim_port_mappings(wim
, properties
, tenant
)
262 self
._reload
_wim
_threads
(wim
['uuid'])
265 def get_wim_port_mappings(self
, wim
):
266 """Retrive information about port mappings from Database"""
267 return self
.persist
.get_wim_port_mappings(wim
)
269 def delete_wim_port_mappings(self
, wim
):
270 """Erase the port mapping records associated with the WIM"""
271 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
272 message
= self
.persist
.delete_wim_port_mappings(wim
['uuid'])
273 self
._reload
_wim
_threads
(wim
['uuid'])
276 def find_common_wims(self
, datacenter_ids
, tenant
):
277 """Find WIMs that are common to all datacenters listed"""
278 mappings
= self
.persist
.get_wim_port_mappings(
279 datacenter
=datacenter_ids
, tenant
=tenant
, error_if_none
=False)
281 wim_id_of
= itemgetter('wim_id')
282 sorted_mappings
= sorted(mappings
, key
=wim_id_of
) # needed by groupby
283 grouped_mappings
= groupby(sorted_mappings
, key
=wim_id_of
)
284 mapped_datacenters
= {
285 wim_id
: [m
['datacenter_id'] for m
in mappings
]
286 for wim_id
, mappings
in grouped_mappings
291 for wim_id
, connected_datacenters
in mapped_datacenters
.items()
292 if set(connected_datacenters
) >= set(datacenter_ids
)
295 def find_common_wim(self
, datacenter_ids
, tenant
):
296 """Find a single WIM that is able to connect all the datacenters
300 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
303 suitable_wim_ids
= self
.find_common_wims(datacenter_ids
, tenant
)
305 if not suitable_wim_ids
:
306 raise NoWimConnectedToDatacenters(datacenter_ids
)
308 # TODO: use a criteria to determine which WIM is going to be used,
309 # instead of always using the first one (strategy pattern can be
311 return suitable_wim_ids
[0]
313 def find_suitable_wim_account(self
, datacenter_ids
, tenant
):
314 """Find a WIM account that is able to connect all the datacenters
318 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
319 that need to be connected.
320 tenant (str): UUID of the OSM tenant
323 object with the WIM account that is able to connect all the
326 wim_id
= self
.find_common_wim(datacenter_ids
, tenant
)
327 return self
.persist
.get_wim_account_by(wim_id
, tenant
)
329 def derive_wan_link(self
,
331 instance_scenario_id
, sce_net_id
,
332 networks
, tenant
, related
=None):
333 """Create a instance_wim_nets record for the given information"""
334 if sce_net_id
in wim_usage
:
335 account_id
= wim_usage
[sce_net_id
]
336 account
= self
.persist
.get_wim_account_by(uuid
=account_id
)
337 wim_id
= account
['wim_id']
339 datacenters
= [n
['datacenter_id'] for n
in networks
]
340 wim_id
= self
.find_common_wim(datacenters
, tenant
)
341 account
= self
.persist
.get_wim_account_by(wim_id
, tenant
)
344 'uuid': str(uuid4()),
345 'instance_scenario_id': instance_scenario_id
,
346 'sce_net_id': sce_net_id
,
348 'wim_account_id': account
['uuid'],
352 def derive_wan_links(self
, wim_usage
, networks
, tenant
=None):
353 """Discover and return what are the wan_links that have to be created
354 considering a set of networks (VLDs) required for a scenario instance
358 wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
359 networks(list): Dicts containing the information about the networks
360 that will be instantiated to materialize a Network Service
362 Corresponding to the ``instance_net`` record.
365 list: list of WAN links to be written to the database
367 # Group networks by key=(instance_scenario_id, sce_net_id)
370 related
= networks
[0].get("related")
371 filtered
= _filter_multi_vim(networks
)
372 grouped_networks
= _group_networks(filtered
)
373 datacenters_per_group
= _count_datacenters(grouped_networks
)
374 # For each group count the number of networks. If greater then 1,
375 # we have to create a wan link connecting them.
377 for key
, counter
in datacenters_per_group
379 # Keys are tuples(instance_scenario_id, sce_net_id)
381 self
.derive_wan_link(wim_usage
,
382 key
[0], key
[1], grouped_networks
[key
], tenant
, related
)
383 for key
in wan_groups
if wim_usage
.get(key
[1]) is not False
386 def create_action(self
, wan_link
):
387 """For a single wan_link create the corresponding create action"""
390 'status': 'SCHEDULED',
391 'item': 'instance_wim_nets',
392 'item_id': wan_link
['uuid'],
393 'wim_account_id': wan_link
['wim_account_id']
396 def create_actions(self
, wan_links
):
397 """For an array of wan_links, create all the corresponding actions"""
398 return [self
.create_action(l
) for l
in wan_links
]
400 def delete_action(self
, wan_link
):
401 """For a single wan_link create the corresponding create action"""
404 'status': 'SCHEDULED',
405 'item': 'instance_wim_nets',
406 'item_id': wan_link
['uuid'],
407 'wim_account_id': wan_link
['wim_account_id'],
408 'extra': json
.dumps({'wan_link': wan_link
})
409 # We serialize and cache the wan_link here, because it can be
410 # deleted during the delete process
413 def delete_actions(self
, wan_links
=(), instance_scenario_id
=None):
414 """Given a Instance Scenario, remove all the WAN Links created in the
416 if instance_scenario_id
:
417 wan_links
= self
.persist
.get_wan_links(
418 instance_scenario_id
=instance_scenario_id
)
419 return [self
.delete_action(l
) for l
in wan_links
]
421 def incorporate_actions(self
, wim_actions
, instance_action
):
422 """Make the instance action consider new WIM actions and make the WIM
423 actions aware of the instance action
425 current
= instance_action
.setdefault('number_tasks', 0)
426 for i
, action
in enumerate(wim_actions
):
427 action
['task_index'] = current
+ i
428 action
['instance_action_id'] = instance_action
['uuid']
429 instance_action
['number_tasks'] += len(wim_actions
)
431 return wim_actions
, instance_action
433 def dispatch(self
, tasks
):
434 """Enqueue a list of tasks for further processing.
436 This function is supposed to be called outside from the WIM Thread.
439 if task
['wim_account_id'] not in self
.threads
:
440 error_msg
= str(WimAccountNotActive(
441 'Requests send to the WIM Account %s are not currently '
442 'being processed.', task
['wim_account_id']))
443 Action(task
, self
.logger
).fail(self
.persist
, error_msg
)
444 self
.persist
.update_wan_link(task
['item_id'],
446 'error_msg': error_msg
})
447 self
.logger
.error('Task %s %s %s not dispatched.\n%s',
448 task
['action'], task
['item'],
449 task
['instance_account_id'], error_msg
)
451 self
.threads
[task
['wim_account_id']].insert_task(task
)
452 self
.logger
.debug('Task %s %s %s dispatched',
453 task
['action'], task
['item'],
454 task
['instance_action_id'])
456 def _spawn_thread(self
, wim_account
):
457 """Spawn a WIM thread
460 wim_account (dict): WIM information (usually persisted)
461 The `wim` field is required to be set with a valid WIM record
462 inside the `wim_account` dict
465 threading.Thread: Thread object
469 thread
= WimThread(self
.persist
, wim_account
, ovim
=self
.ovim
)
470 self
.threads
[wim_account
['uuid']] = thread
473 self
.logger
.error('Error when spawning WIM thread for %s',
474 wim_account
['uuid'], exc_info
=True)
478 def start_threads(self
):
479 """Start the threads responsible for processing WIM Actions"""
480 accounts
= self
.persist
.get_wim_accounts(error_if_none
=False)
481 self
.threads
= remove_none_items(
482 {a
['uuid']: self
._spawn
_thread
(a
) for a
in accounts
})
484 def stop_threads(self
):
485 """Stop the threads responsible for processing WIM Actions"""
486 for uuid
, thread
in self
.threads
.items():
491 def threads_running(self
):
492 """Ensure no thread will be left running"""
493 # This method is particularly important for testing :)
501 def _filter_multi_vim(networks
):
502 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
503 return [n
for n
in networks
if 'sce_net_id' in n
and n
['sce_net_id']]
506 def _group_networks(networks
):
507 """Group networks that correspond to the same instance_scenario_id and
508 sce_net_id (NSR and VLD).
511 networks(list): Dicts containing the information about the networks
512 that will be instantiated to materialize a Network Service
515 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
516 are list of networks.
518 criteria
= itemgetter('instance_scenario_id', 'sce_net_id')
520 networks
= sorted(networks
, key
=criteria
)
521 return {k
: list(v
) for k
, v
in groupby(networks
, key
=criteria
)}
524 def _count_datacenters(grouped_networks
):
525 """Count the number of datacenters in each group of networks
528 list of tuples: the first element is the group key, while the second
529 element is the number of datacenters in each group.
531 return ((key
, len(set(n
['datacenter_id'] for n
in group
)))
532 for key
, group
in grouped_networks
.items())