1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
43 No http request handling/direct interaction with the database should be present
48 from contextlib
import contextmanager
49 from itertools
import groupby
50 from operator
import itemgetter
51 from uuid
import uuid4
53 from ..utils
import remove_none_items
54 from .actions
import Action
56 NoWimConnectedToDatacenters
,
59 from .wim_thread
import WimThread
62 class WimEngine(object):
63 """Logic supporting the establishment of WAN links when NS spans across
64 different datacenters.
66 def __init__(self
, persistence
, logger
=None, ovim
=None):
67 self
.persist
= persistence
68 self
.logger
= logger
or logging
.getLogger('openmano.wim.engine')
73 def create_wim(self
, properties
):
74 """Create a new wim record according to the properties
76 Please check the wim schema to have more information about
80 str: uuid of the newly created WIM record
82 return self
.persist
.create_wim(properties
)
84 def get_wim(self
, uuid_or_name
, tenant_id
=None):
85 """Retrieve existing WIM record by name or id.
87 If ``tenant_id`` is specified, the query will be
88 limited to the WIM associated to the given tenant.
90 # Since it is a pure DB operation, we can delegate it directly
91 return self
.persist
.get_wim(uuid_or_name
, tenant_id
)
93 def update_wim(self
, uuid_or_name
, properties
):
94 """Edit an existing WIM record.
96 ``properties`` is a dictionary with the properties being changed,
97 if a property is not present, the old value will be preserved
99 return self
.persist
.update_wim(uuid_or_name
, properties
)
101 def delete_wim(self
, uuid_or_name
):
102 """Kill the corresponding wim threads and erase the WIM record"""
103 # Theoretically, we can rely on the database to drop the wim_accounts
104 # automatically, since we have configures 'ON CASCADE DELETE'.
105 # However, use use `delete_wim_accounts` to kill all the running
107 self
.delete_wim_accounts(uuid_or_name
)
108 return self
.persist
.delete_wim(uuid_or_name
)
110 def create_wim_account(self
, wim
, tenant
, properties
):
111 """Create an account that associates a tenant to a WIM.
113 As a side effect this function will spawn a new thread
116 wim (str): name or uuid of the WIM related to the account being
118 tenant (str): name or uuid of the nfvo tenant to which the account
120 properties (dict): properties of the account
121 (eg. username, password, ...)
126 uuid
= self
.persist
.create_wim_account(wim
, tenant
, properties
)
127 account
= self
.persist
.get_wim_account_by(uuid
=uuid
)
128 # ^ We need to use get_wim_account_by here, since this methods returns
129 # all the associations, and we need the wim to create the thread
130 self
._spawn
_thread
(account
)
133 def _update_single_wim_account(self
, account
, properties
):
134 """Update WIM Account, taking care to reload the corresponding thread
137 account (dict): Current account record
138 properties (dict): Properties to be updated
143 account
= self
.persist
.update_wim_account(account
['uuid'], properties
)
144 self
.threads
[account
['uuid']].reload()
147 def update_wim_accounts(self
, wim
, tenant
, properties
):
148 """Update all the accounts related to a WIM and a tenant,
149 thanking care of reloading threads.
152 wim (str): uuid or name of a WIM record
153 tenant (str): uuid or name of a NFVO tenant record
154 properties (dict): attributes with values to be updated
157 list: Records that were updated
159 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
)
160 return [self
._update
_single
_wim
_account
(account
, properties
)
161 for account
in accounts
]
163 def _delete_single_wim_account(self
, account
):
164 """Delete WIM Account, taking care to remove the corresponding thread
165 and delete the internal WIM account, if it was automatically generated.
168 account (dict): Current account record
169 properties (dict): Properties to be updated
172 dict: current record (same as input)
174 self
.persist
.delete_wim_account(account
['uuid'])
176 if account
['uuid'] not in self
.threads
:
177 raise WimAccountNotActive(
178 'Requests send to the WIM Account %s are not currently '
179 'being processed.', account
['uuid'])
181 self
.threads
[account
['uuid']].exit()
182 del self
.threads
[account
['uuid']]
186 def delete_wim_accounts(self
, wim
, tenant
=None, **kwargs
):
187 """Delete all the accounts related to a WIM (and a tenant),
188 thanking care of threads and internal WIM accounts.
191 wim (str): uuid or name of a WIM record
192 tenant (str): uuid or name of a NFVO tenant record
195 list: Records that were deleted
197 kwargs
.setdefault('error_if_none', False)
198 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
, **kwargs
)
199 return [self
._delete
_single
_wim
_account
(a
) for a
in accounts
]
201 def _reload_wim_threads(self
, wim_id
):
202 for thread
in self
.threads
.values():
203 if thread
.wim_account
['wim_id'] == wim_id
:
206 def create_wim_port_mappings(self
, wim
, properties
, tenant
=None):
207 """Store information about port mappings from Database"""
208 # TODO: Review tenants... WIMs can exist across different tenants,
209 # and the port_mappings are a WIM property, not a wim_account
210 # property, so the concepts are not related
211 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
212 result
= self
.persist
.create_wim_port_mappings(wim
, properties
, tenant
)
213 self
._reload
_wim
_threads
(wim
['uuid'])
216 def get_wim_port_mappings(self
, wim
):
217 """Retrive information about port mappings from Database"""
218 return self
.persist
.get_wim_port_mappings(wim
)
220 def delete_wim_port_mappings(self
, wim
):
221 """Erase the port mapping records associated with the WIM"""
222 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
223 message
= self
.persist
.delete_wim_port_mappings(wim
['uuid'])
224 self
._reload
_wim
_threads
(wim
['uuid'])
227 def find_common_wims(self
, datacenter_ids
, tenant
):
228 """Find WIMs that are common to all datacenters listed"""
229 mappings
= self
.persist
.get_wim_port_mappings(
230 datacenter
=datacenter_ids
, tenant
=tenant
, error_if_none
=False)
232 wim_id_of
= itemgetter('wim_id')
233 sorted_mappings
= sorted(mappings
, key
=wim_id_of
) # needed by groupby
234 grouped_mappings
= groupby(sorted_mappings
, key
=wim_id_of
)
235 mapped_datacenters
= {
236 wim_id
: [m
['datacenter_id'] for m
in mappings
]
237 for wim_id
, mappings
in grouped_mappings
242 for wim_id
, connected_datacenters
in mapped_datacenters
.items()
243 if set(connected_datacenters
) >= set(datacenter_ids
)
246 def find_common_wim(self
, datacenter_ids
, tenant
):
247 """Find a single WIM that is able to connect all the datacenters
251 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
254 suitable_wim_ids
= self
.find_common_wims(datacenter_ids
, tenant
)
256 if not suitable_wim_ids
:
257 raise NoWimConnectedToDatacenters(datacenter_ids
)
259 # TODO: use a criteria to determine which WIM is going to be used,
260 # instead of always using the first one (strategy pattern can be
262 return suitable_wim_ids
[0]
264 def find_suitable_wim_account(self
, datacenter_ids
, tenant
):
265 """Find a WIM account that is able to connect all the datacenters
269 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
270 that need to be connected.
271 tenant (str): UUID of the OSM tenant
274 str: UUID of the WIM account that is able to connect all the
277 wim_id
= self
.find_common_wim(datacenter_ids
, tenant
)
278 return self
.persist
.get_wim_account_by(wim_id
, tenant
)['uuid']
280 def derive_wan_link(self
,
282 instance_scenario_id
, sce_net_id
,
284 """Create a instance_wim_nets record for the given information"""
285 if sce_net_id
in wim_usage
:
286 account_id
= wim_usage
[sce_net_id
]
287 account
= self
.persist
.get_wim_account_by(uuid
=account_id
)
288 wim_id
= account
['wim_id']
290 datacenters
= [n
['datacenter_id'] for n
in networks
]
291 wim_id
= self
.find_common_wim(datacenters
, tenant
)
292 account
= self
.persist
.get_wim_account_by(wim_id
, tenant
)
295 'uuid': str(uuid4()),
296 'instance_scenario_id': instance_scenario_id
,
297 'sce_net_id': sce_net_id
,
299 'wim_account_id': account
['uuid']
302 def derive_wan_links(self
, wim_usage
, networks
, tenant
=None):
303 """Discover and return what are the wan_links that have to be created
304 considering a set of networks (VLDs) required for a scenario instance
308 wim_usage(dict): Mapping between sce_net_id and wim_id
309 networks(list): Dicts containing the information about the networks
310 that will be instantiated to materialize a Network Service
312 Corresponding to the ``instance_net`` record.
315 list: list of WAN links to be written to the database
317 # Group networks by key=(instance_scenario_id, sce_net_id)
318 filtered
= _filter_multi_vim(networks
)
319 grouped_networks
= _group_networks(filtered
)
320 datacenters_per_group
= _count_datacenters(grouped_networks
)
321 # For each group count the number of networks. If greater then 1,
322 # we have to create a wan link connecting them.
324 for key
, counter
in datacenters_per_group
328 self
.derive_wan_link(wim_usage
,
329 key
[0], key
[1], grouped_networks
[key
], tenant
)
330 for key
in wan_groups
333 def create_action(self
, wan_link
):
334 """For a single wan_link create the corresponding create action"""
337 'status': 'SCHEDULED',
338 'item': 'instance_wim_nets',
339 'item_id': wan_link
['uuid'],
340 'wim_account_id': wan_link
['wim_account_id']
343 def create_actions(self
, wan_links
):
344 """For an array of wan_links, create all the corresponding actions"""
345 return [self
.create_action(l
) for l
in wan_links
]
347 def delete_action(self
, wan_link
):
348 """For a single wan_link create the corresponding create action"""
351 'status': 'SCHEDULED',
352 'item': 'instance_wim_nets',
353 'item_id': wan_link
['uuid'],
354 'wim_account_id': wan_link
['wim_account_id'],
355 'extra': json
.dumps({'wan_link': wan_link
})
356 # We serialize and cache the wan_link here, because it can be
357 # deleted during the delete process
360 def delete_actions(self
, wan_links
=(), instance_scenario_id
=None):
361 """Given a Instance Scenario, remove all the WAN Links created in the
363 if instance_scenario_id
:
364 wan_links
= self
.persist
.get_wan_links(
365 instance_scenario_id
=instance_scenario_id
)
366 return [self
.delete_action(l
) for l
in wan_links
]
368 def incorporate_actions(self
, wim_actions
, instance_action
):
369 """Make the instance action consider new WIM actions and make the WIM
370 actions aware of the instance action
372 current
= instance_action
.setdefault('number_tasks', 0)
373 for i
, action
in enumerate(wim_actions
):
374 action
['task_index'] = current
+ i
375 action
['instance_action_id'] = instance_action
['uuid']
376 instance_action
['number_tasks'] += len(wim_actions
)
378 return wim_actions
, instance_action
380 def dispatch(self
, tasks
):
381 """Enqueue a list of tasks for further processing.
383 This function is supposed to be called outside from the WIM Thread.
386 if task
['wim_account_id'] not in self
.threads
:
387 error_msg
= str(WimAccountNotActive(
388 'Requests send to the WIM Account %s are not currently '
389 'being processed.', task
['wim_account_id']))
390 Action(task
, self
.logger
).fail(self
.persist
, error_msg
)
391 self
.persist
.update_wan_link(task
['item_id'],
393 'error_msg': error_msg
})
394 self
.logger
.error('Task %s %s %s not dispatched.\n%s',
395 task
['action'], task
['item'],
396 task
['instance_account_id'], error_msg
)
398 self
.threads
[task
['wim_account_id']].insert_task(task
)
399 self
.logger
.debug('Task %s %s %s dispatched',
400 task
['action'], task
['item'],
401 task
['instance_action_id'])
403 def _spawn_thread(self
, wim_account
):
404 """Spawn a WIM thread
407 wim_account (dict): WIM information (usually persisted)
408 The `wim` field is required to be set with a valid WIM record
409 inside the `wim_account` dict
412 threading.Thread: Thread object
416 thread
= WimThread(self
.persist
, wim_account
, ovim
=self
.ovim
)
417 self
.threads
[wim_account
['uuid']] = thread
420 self
.logger
.error('Error when spawning WIM thread for %s',
421 wim_account
['uuid'], exc_info
=True)
425 def start_threads(self
):
426 """Start the threads responsible for processing WIM Actions"""
427 accounts
= self
.persist
.get_wim_accounts(error_if_none
=False)
428 self
.threads
= remove_none_items(
429 {a
['uuid']: self
._spawn
_thread
(a
) for a
in accounts
})
431 def stop_threads(self
):
432 """Stop the threads responsible for processing WIM Actions"""
433 for uuid
, thread
in self
.threads
.items():
435 del self
.threads
[uuid
]
438 def threads_running(self
):
439 """Ensure no thread will be left running"""
440 # This method is particularly important for testing :)
448 def _filter_multi_vim(networks
):
449 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
450 return [n
for n
in networks
if 'sce_net_id' in n
and n
['sce_net_id']]
453 def _group_networks(networks
):
454 """Group networks that correspond to the same instance_scenario_id and
455 sce_net_id (NSR and VLD).
458 networks(list): Dicts containing the information about the networks
459 that will be instantiated to materialize a Network Service
462 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
463 are lits of networks.
465 criteria
= itemgetter('instance_scenario_id', 'sce_net_id')
467 networks
= sorted(networks
, key
=criteria
)
468 return {k
: list(v
) for k
, v
in groupby(networks
, key
=criteria
)}
471 def _count_datacenters(grouped_networks
):
472 """Count the number of datacenters in each group of networks
475 list of tuples: the first element is the group key, while the second
476 element is the number of datacenters in each group.
478 return ((key
, len(set(n
['datacenter_id'] for n
in group
)))
479 for key
, group
in grouped_networks
.items())