1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
43 No http request handling/direct interaction with the database should be present
48 from contextlib
import contextmanager
49 from itertools
import groupby
50 from operator
import itemgetter
51 # from sys import exc_info
52 from uuid
import uuid4
54 from ..utils
import remove_none_items
55 from .actions
import Action
58 NoWimConnectedToDatacenters
,
59 UnexpectedDatabaseError
,
63 from .wim_thread
import WimThread
64 # from ..http_tools.errors import Bad_Request
65 from pkg_resources
import iter_entry_points
68 class WimEngine(object):
69 """Logic supporting the establishment of WAN links when NS spans across
70 different datacenters.
72 def __init__(self
, persistence
, plugins
, logger
=None, ovim
=None):
73 self
.persist
= persistence
74 self
.plugins
= plugins
if plugins
is not None else {}
75 self
.logger
= logger
or logging
.getLogger('openmano.wim.engine')
80 def _load_plugin(self
, name
, type="sdn"):
81 # type can be vim or sdn
82 for v
in iter_entry_points('osm_ro{}.plugins'.format(type), name
):
83 self
.plugins
[name
] = v
.load()
84 if name
and name
not in self
.plugins
:
85 raise UndefinedWimConnector(type, name
)
87 def create_wim(self
, properties
):
88 """Create a new wim record according to the properties
90 Please check the wim schema to have more information about
93 The ``config`` property might contain a ``wim_port_mapping`` dict,
94 In this case, the method ``create_wim_port_mappings`` will be
95 automatically invoked.
98 str: uuid of the newly created WIM record
100 port_mapping
= ((properties
.get('config', {}) or {})
101 .pop('wim_port_mapping', {}))
102 plugin_name
= "rosdn_" + properties
["type"]
103 if plugin_name
not in self
.plugins
:
104 self
._load
_plugin
(plugin_name
, type="sdn")
106 uuid
= self
.persist
.create_wim(properties
)
110 self
.create_wim_port_mappings(uuid
, port_mapping
)
111 except DbBaseException
as e
:
113 self
.delete_wim(uuid
)
114 ex
= UnexpectedDatabaseError('Failed to create port mappings'
115 'Rolling back wim creation')
116 self
.logger
.exception(str(ex
))
121 def get_wim(self
, uuid_or_name
, tenant_id
=None):
122 """Retrieve existing WIM record by name or id.
124 If ``tenant_id`` is specified, the query will be
125 limited to the WIM associated to the given tenant.
127 # Since it is a pure DB operation, we can delegate it directly
128 return self
.persist
.get_wim(uuid_or_name
, tenant_id
)
130 def update_wim(self
, uuid_or_name
, properties
):
131 """Edit an existing WIM record.
133 ``properties`` is a dictionary with the properties being changed,
134 if a property is not present, the old value will be preserved
136 Similarly to create_wim, the ``config`` property might contain a
137 ``wim_port_mapping`` dict, In this case, port mappings will be
138 automatically updated.
140 port_mapping
= ((properties
.get('config', {}) or {})
141 .pop('wim_port_mapping', {}))
142 orig_props
= self
.persist
.get_by_name_or_uuid('wims', uuid_or_name
)
143 uuid
= orig_props
['uuid']
145 response
= self
.persist
.update_wim(uuid
, properties
)
149 # It is very complex to diff and update individually all the
150 # port mappings. Therefore a practical approach is just delete
151 # and create it again.
152 self
.persist
.delete_wim_port_mappings(uuid
)
153 # ^ Calling from persistence avoid reloading twice the thread
154 self
.create_wim_port_mappings(uuid
, port_mapping
)
155 except DbBaseException
as e
:
157 self
.update_wim(uuid_or_name
, orig_props
)
158 ex
= UnexpectedDatabaseError('Failed to update port mappings'
159 'Rolling back wim updates\n')
160 self
.logger
.exception(str(ex
))
165 def delete_wim(self
, uuid_or_name
):
166 """Kill the corresponding wim threads and erase the WIM record"""
167 # Theoretically, we can rely on the database to drop the wim_accounts
168 # automatically, since we have configures 'ON CASCADE DELETE'.
169 # However, use use `delete_wim_accounts` to kill all the running
171 self
.delete_wim_accounts(uuid_or_name
)
172 return self
.persist
.delete_wim(uuid_or_name
)
174 def create_wim_account(self
, wim
, tenant
, properties
):
175 """Create an account that associates a tenant to a WIM.
177 As a side effect this function will spawn a new thread
180 wim (str): name or uuid of the WIM related to the account being
182 tenant (str): name or uuid of the nfvo tenant to which the account
184 properties (dict): properties of the account
185 (eg. username, password, ...)
190 uuid
= self
.persist
.create_wim_account(wim
, tenant
, properties
)
191 account
= self
.persist
.get_wim_account_by(uuid
=uuid
)
192 # ^ We need to use get_wim_account_by here, since this methods returns
193 # all the associations, and we need the wim to create the thread
194 self
._spawn
_thread
(account
)
197 def _update_single_wim_account(self
, account
, properties
):
198 """Update WIM Account, taking care to reload the corresponding thread
201 account (dict): Current account record
202 properties (dict): Properties to be updated
207 account
= self
.persist
.update_wim_account(account
['uuid'], properties
)
208 self
.threads
[account
['uuid']].reload()
211 def update_wim_accounts(self
, wim
, tenant
, properties
):
212 """Update all the accounts related to a WIM and a tenant,
213 thanking care of reloading threads.
216 wim (str): uuid or name of a WIM record
217 tenant (str): uuid or name of a NFVO tenant record
218 properties (dict): attributes with values to be updated
221 list: Records that were updated
223 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
)
224 return [self
._update
_single
_wim
_account
(account
, properties
)
225 for account
in accounts
]
227 def _delete_single_wim_account(self
, account
):
228 """Delete WIM Account, taking care to remove the corresponding thread
229 and delete the internal WIM account, if it was automatically generated.
232 account (dict): Current account record
233 properties (dict): Properties to be updated
236 dict: current record (same as input)
238 self
.persist
.delete_wim_account(account
['uuid'])
240 if account
['uuid'] not in self
.threads
:
241 raise WimAccountNotActive(
242 'Requests send to the WIM Account %s are not currently '
243 'being processed.', account
['uuid'])
245 self
.threads
[account
['uuid']].exit()
246 del self
.threads
[account
['uuid']]
250 def delete_wim_accounts(self
, wim
, tenant
=None, **kwargs
):
251 """Delete all the accounts related to a WIM (and a tenant),
252 thanking care of threads and internal WIM accounts.
255 wim (str): uuid or name of a WIM record
256 tenant (str): uuid or name of a NFVO tenant record
259 list: Records that were deleted
261 kwargs
.setdefault('error_if_none', False)
262 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
, **kwargs
)
263 return [self
._delete
_single
_wim
_account
(a
) for a
in accounts
]
265 def _reload_wim_threads(self
, wim_id
):
266 for thread
in self
.threads
.values():
267 if thread
.wim_account
['wim_id'] == wim_id
:
270 def create_wim_port_mappings(self
, wim
, properties
, tenant
=None):
271 """Store information about port mappings from Database"""
272 # TODO: Review tenants... WIMs can exist across different tenants,
273 # and the port_mappings are a WIM property, not a wim_account
274 # property, so the concepts are not related
275 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
276 result
= self
.persist
.create_wim_port_mappings(wim
, properties
, tenant
)
277 self
._reload
_wim
_threads
(wim
['uuid'])
280 def get_wim_port_mappings(self
, wim
):
281 """Retrive information about port mappings from Database"""
282 return self
.persist
.get_wim_port_mappings(wim
)
284 def delete_wim_port_mappings(self
, wim
):
285 """Erase the port mapping records associated with the WIM"""
286 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
287 message
= self
.persist
.delete_wim_port_mappings(wim
['uuid'])
288 self
._reload
_wim
_threads
(wim
['uuid'])
291 def find_common_wims(self
, datacenter_ids
, tenant
):
292 """Find WIMs that are common to all datacenters listed"""
293 mappings
= self
.persist
.get_wim_port_mappings(
294 datacenter
=datacenter_ids
, tenant
=tenant
, error_if_none
=False)
296 wim_id_of
= itemgetter('wim_id')
297 sorted_mappings
= sorted(mappings
, key
=wim_id_of
) # needed by groupby
298 grouped_mappings
= groupby(sorted_mappings
, key
=wim_id_of
)
299 mapped_datacenters
= {
300 wim_id
: [m
['datacenter_id'] for m
in mappings
]
301 for wim_id
, mappings
in grouped_mappings
306 for wim_id
, connected_datacenters
in mapped_datacenters
.items()
307 if set(connected_datacenters
) >= set(datacenter_ids
)
310 def find_common_wim(self
, datacenter_ids
, tenant
):
311 """Find a single WIM that is able to connect all the datacenters
315 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
318 suitable_wim_ids
= self
.find_common_wims(datacenter_ids
, tenant
)
320 if not suitable_wim_ids
:
321 raise NoWimConnectedToDatacenters(datacenter_ids
)
323 # TODO: use a criteria to determine which WIM is going to be used,
324 # instead of always using the first one (strategy pattern can be
326 return suitable_wim_ids
[0]
328 def find_suitable_wim_account(self
, datacenter_ids
, tenant
):
329 """Find a WIM account that is able to connect all the datacenters
333 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
334 that need to be connected.
335 tenant (str): UUID of the OSM tenant
338 object with the WIM account that is able to connect all the
341 wim_id
= self
.find_common_wim(datacenter_ids
, tenant
)
342 return self
.persist
.get_wim_account_by(wim_id
, tenant
)
344 def derive_wan_link(self
,
346 instance_scenario_id
, sce_net_id
,
347 networks
, tenant
, related
=None):
348 """Create a instance_wim_nets record for the given information"""
349 if sce_net_id
in wim_usage
:
350 account_id
= wim_usage
[sce_net_id
]
351 account
= self
.persist
.get_wim_account_by(uuid
=account_id
)
352 wim_id
= account
['wim_id']
354 datacenters
= [n
['datacenter_id'] for n
in networks
]
355 wim_id
= self
.find_common_wim(datacenters
, tenant
)
356 account
= self
.persist
.get_wim_account_by(wim_id
, tenant
)
359 'uuid': str(uuid4()),
360 'instance_scenario_id': instance_scenario_id
,
361 'sce_net_id': sce_net_id
,
363 'wim_account_id': account
['uuid'],
367 def derive_wan_links(self
, wim_usage
, networks
, tenant
=None):
368 """Discover and return what are the wan_links that have to be created
369 considering a set of networks (VLDs) required for a scenario instance
373 wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
374 networks(list): Dicts containing the information about the networks
375 that will be instantiated to materialize a Network Service
377 Corresponding to the ``instance_net`` record.
380 list: list of WAN links to be written to the database
382 # Group networks by key=(instance_scenario_id, sce_net_id)
385 related
= networks
[0].get("related")
386 filtered
= _filter_multi_vim(networks
)
387 grouped_networks
= _group_networks(filtered
)
388 datacenters_per_group
= _count_datacenters(grouped_networks
)
389 # For each group count the number of networks. If greater then 1,
390 # we have to create a wan link connecting them.
392 for key
, counter
in datacenters_per_group
394 # Keys are tuples(instance_scenario_id, sce_net_id)
396 self
.derive_wan_link(wim_usage
,
397 key
[0], key
[1], grouped_networks
[key
], tenant
, related
)
398 for key
in wan_groups
if wim_usage
.get(key
[1]) is not False
401 def create_action(self
, wan_link
):
402 """For a single wan_link create the corresponding create action"""
405 'status': 'SCHEDULED',
406 'item': 'instance_wim_nets',
407 'item_id': wan_link
['uuid'],
408 'wim_account_id': wan_link
['wim_account_id']
411 def create_actions(self
, wan_links
):
412 """For an array of wan_links, create all the corresponding actions"""
413 return [self
.create_action(li
) for li
in wan_links
]
415 def delete_action(self
, wan_link
):
416 """For a single wan_link create the corresponding create action"""
419 'status': 'SCHEDULED',
420 'item': 'instance_wim_nets',
421 'item_id': wan_link
['uuid'],
422 'wim_account_id': wan_link
['wim_account_id'],
423 'extra': json
.dumps({'wan_link': wan_link
})
424 # We serialize and cache the wan_link here, because it can be
425 # deleted during the delete process
428 def delete_actions(self
, wan_links
=(), instance_scenario_id
=None):
429 """Given a Instance Scenario, remove all the WAN Links created in the
431 if instance_scenario_id
:
432 wan_links
= self
.persist
.get_wan_links(
433 instance_scenario_id
=instance_scenario_id
, sdn
='false')
434 return [self
.delete_action(li
) for li
in wan_links
]
436 def incorporate_actions(self
, wim_actions
, instance_action
):
437 """Make the instance action consider new WIM actions and make the WIM
438 actions aware of the instance action
440 current
= instance_action
.setdefault('number_tasks', 0)
441 for i
, action
in enumerate(wim_actions
):
442 action
['task_index'] = current
+ i
443 action
['instance_action_id'] = instance_action
['uuid']
444 instance_action
['number_tasks'] += len(wim_actions
)
446 return wim_actions
, instance_action
448 def dispatch(self
, tasks
):
449 """Enqueue a list of tasks for further processing.
451 This function is supposed to be called outside from the WIM Thread.
454 if task
['wim_account_id'] not in self
.threads
:
455 error_msg
= str(WimAccountNotActive(
456 'Requests send to the WIM Account %s are not currently '
457 'being processed.', task
['wim_account_id']))
458 Action(task
, self
.logger
).fail(self
.persist
, error_msg
)
459 self
.persist
.update_wan_link(task
['item_id'],
461 'error_msg': error_msg
})
462 self
.logger
.error('Task %s %s %s not dispatched.\n%s',
463 task
['action'], task
['item'],
464 task
['instance_account_id'], error_msg
)
466 self
.threads
[task
['wim_account_id']].insert_task(task
)
467 self
.logger
.debug('Task %s %s %s dispatched',
468 task
['action'], task
['item'],
469 task
['instance_action_id'])
471 def _spawn_thread(self
, wim_account
):
472 """Spawn a WIM thread
475 wim_account (dict): WIM information (usually persisted)
476 The `wim` field is required to be set with a valid WIM record
477 inside the `wim_account` dict
480 threading.Thread: Thread object
484 thread
= WimThread(self
.persist
, self
.plugins
, wim_account
, ovim
=self
.ovim
)
485 self
.threads
[wim_account
['uuid']] = thread
488 self
.logger
.error('Error when spawning WIM thread for %s',
489 wim_account
['uuid'], exc_info
=True)
493 def start_threads(self
):
494 """Start the threads responsible for processing WIM Actions"""
495 accounts
= self
.persist
.get_wim_accounts(error_if_none
=False)
497 for account
in accounts
:
499 plugin_name
= "rosdn_" + account
["wim"]["type"]
500 if plugin_name
not in self
.plugins
:
501 self
._load
_plugin
(plugin_name
, type="sdn")
502 thread_dict
[account
["uuid"]] = self
._spawn
_thread
(account
)
503 except UndefinedWimConnector
as e
:
505 self
.threads
= remove_none_items(thread_dict
)
507 def stop_threads(self
):
508 """Stop the threads responsible for processing WIM Actions"""
509 for uuid
, thread
in self
.threads
.items():
514 def threads_running(self
):
515 """Ensure no thread will be left running"""
516 # This method is particularly important for testing :)
524 def _filter_multi_vim(networks
):
525 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
526 return [n
for n
in networks
if 'sce_net_id' in n
and n
['sce_net_id']]
529 def _group_networks(networks
):
530 """Group networks that correspond to the same instance_scenario_id and
531 sce_net_id (NSR and VLD).
534 networks(list): Dicts containing the information about the networks
535 that will be instantiated to materialize a Network Service
538 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
539 are list of networks.
541 criteria
= itemgetter('instance_scenario_id', 'sce_net_id')
543 networks
= sorted(networks
, key
=criteria
)
544 return {k
: list(v
) for k
, v
in groupby(networks
, key
=criteria
)}
547 def _count_datacenters(grouped_networks
):
548 """Count the number of datacenters in each group of networks
551 list of tuples: the first element is the group key, while the second
552 element is the number of datacenters in each group.
554 return ((key
, len(set(n
['datacenter_id'] for n
in group
)))
555 for key
, group
in grouped_networks
.items())