1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
43 No http request handling/direct interaction with the database should be present
48 from contextlib
import contextmanager
49 from itertools
import groupby
50 from operator
import itemgetter
51 from uuid
import uuid4
53 from ..utils
import remove_none_items
54 from .actions
import Action
56 NoWimConnectedToDatacenters
,
59 from .wim_thread
import WimThread
62 class WimEngine(object):
63 """Logic supporting the establishment of WAN links when NS spans across
64 different datacenters.
66 def __init__(self
, persistence
, logger
=None, ovim
=None):
67 self
.persist
= persistence
68 self
.logger
= logger
or logging
.getLogger('openmano.wim.engine')
73 def create_wim(self
, properties
):
74 """Create a new wim record according to the properties
76 Please check the wim schema to have more information about
80 str: uuid of the newly created WIM record
82 return self
.persist
.create_wim(properties
)
84 def get_wim(self
, uuid_or_name
, tenant_id
=None):
85 """Retrieve existing WIM record by name or id.
87 If ``tenant_id`` is specified, the query will be
88 limited to the WIM associated to the given tenant.
90 # Since it is a pure DB operation, we can delegate it directly
91 return self
.persist
.get_wim(uuid_or_name
, tenant_id
)
93 def update_wim(self
, uuid_or_name
, properties
):
94 """Edit an existing WIM record.
96 ``properties`` is a dictionary with the properties being changed,
97 if a property is not present, the old value will be preserved
99 return self
.persist
.update_wim(uuid_or_name
, properties
)
101 def delete_wim(self
, uuid_or_name
):
102 """Kill the corresponding wim threads and erase the WIM record"""
103 # Theoretically, we can rely on the database to drop the wim_accounts
104 # automatically, since we have configures 'ON CASCADE DELETE'.
105 # However, use use `delete_wim_accounts` to kill all the running
107 self
.delete_wim_accounts(uuid_or_name
)
108 return self
.persist
.delete_wim(uuid_or_name
)
110 def create_wim_account(self
, wim
, tenant
, properties
):
111 """Create an account that associates a tenant to a WIM.
113 As a side effect this function will spawn a new thread
116 wim (str): name or uuid of the WIM related to the account being
118 tenant (str): name or uuid of the nfvo tenant to which the account
120 properties (dict): properties of the account
121 (eg. username, password, ...)
126 uuid
= self
.persist
.create_wim_account(wim
, tenant
, properties
)
127 account
= self
.persist
.get_wim_account_by(uuid
=uuid
)
128 # ^ We need to use get_wim_account_by here, since this methods returns
129 # all the associations, and we need the wim to create the thread
130 self
._spawn
_thread
(account
)
133 def _update_single_wim_account(self
, account
, properties
):
134 """Update WIM Account, taking care to reload the corresponding thread
137 account (dict): Current account record
138 properties (dict): Properties to be updated
143 account
= self
.persist
.update_wim_account(account
['uuid'], properties
)
144 self
.threads
[account
['uuid']].reload()
147 def update_wim_accounts(self
, wim
, tenant
, properties
):
148 """Update all the accounts related to a WIM and a tenant,
149 thanking care of reloading threads.
152 wim (str): uuid or name of a WIM record
153 tenant (str): uuid or name of a NFVO tenant record
154 properties (dict): attributes with values to be updated
157 list: Records that were updated
159 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
)
160 return [self
._update
_single
_wim
_account
(account
, properties
)
161 for account
in accounts
]
163 def _delete_single_wim_account(self
, account
):
164 """Delete WIM Account, taking care to remove the corresponding thread
165 and delete the internal WIM account, if it was automatically generated.
168 account (dict): Current account record
169 properties (dict): Properties to be updated
172 dict: current record (same as input)
174 self
.persist
.delete_wim_account(account
['uuid'])
176 if account
['uuid'] not in self
.threads
:
177 raise WimAccountNotActive(
178 'Requests send to the WIM Account %s are not currently '
179 'being processed.', account
['uuid'])
181 self
.threads
[account
['uuid']].exit()
182 del self
.threads
[account
['uuid']]
186 def delete_wim_accounts(self
, wim
, tenant
=None, **kwargs
):
187 """Delete all the accounts related to a WIM (and a tenant),
188 thanking care of threads and internal WIM accounts.
191 wim (str): uuid or name of a WIM record
192 tenant (str): uuid or name of a NFVO tenant record
195 list: Records that were deleted
197 kwargs
.setdefault('error_if_none', False)
198 accounts
= self
.persist
.get_wim_accounts_by(wim
, tenant
, **kwargs
)
199 return [self
._delete
_single
_wim
_account
(a
) for a
in accounts
]
201 def _reload_wim_threads(self
, wim_id
):
202 for thread
in self
.threads
.values():
203 if thread
.wim_account
['wim_id'] == wim_id
:
206 def create_wim_port_mappings(self
, wim
, properties
, tenant
=None):
207 """Store information about port mappings from Database"""
208 # TODO: Review tenants... WIMs can exist across different tenants,
209 # and the port_mappings are a WIM property, not a wim_account
210 # property, so the concepts are not related
211 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
212 result
= self
.persist
.create_wim_port_mappings(wim
, properties
, tenant
)
213 self
._reload
_wim
_threads
(wim
['uuid'])
216 def get_wim_port_mappings(self
, wim
):
217 """Retrive information about port mappings from Database"""
218 return self
.persist
.get_wim_port_mappings(wim
)
220 def delete_wim_port_mappings(self
, wim
):
221 """Erase the port mapping records associated with the WIM"""
222 wim
= self
.persist
.get_by_name_or_uuid('wims', wim
)
223 message
= self
.persist
.delete_wim_port_mappings(wim
['uuid'])
224 self
._reload
_wim
_threads
(wim
['uuid'])
227 def find_common_wims(self
, datacenter_ids
, tenant
):
228 """Find WIMs that are common to all datacenters listed"""
229 mappings
= self
.persist
.get_wim_port_mappings(
230 datacenter
=datacenter_ids
, tenant
=tenant
, error_if_none
=False)
232 wim_id_of
= itemgetter('wim_id')
233 sorted_mappings
= sorted(mappings
, key
=wim_id_of
) # needed by groupby
234 grouped_mappings
= groupby(sorted_mappings
, key
=wim_id_of
)
235 mapped_datacenters
= {
236 wim_id
: [m
['datacenter_id'] for m
in mappings
]
237 for wim_id
, mappings
in grouped_mappings
242 for wim_id
, connected_datacenters
in mapped_datacenters
.items()
243 if set(connected_datacenters
) >= set(datacenter_ids
)
246 def find_common_wim(self
, datacenter_ids
, tenant
):
247 """Find a single WIM that is able to connect all the datacenters
251 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
254 suitable_wim_ids
= self
.find_common_wims(datacenter_ids
, tenant
)
256 if not suitable_wim_ids
:
257 raise NoWimConnectedToDatacenters(datacenter_ids
)
259 # TODO: use a criteria to determine which WIM is going to be used,
260 # instead of always using the first one (strategy pattern can be
262 return suitable_wim_ids
[0]
264 def derive_wan_link(self
,
265 instance_scenario_id
, sce_net_id
,
267 """Create a instance_wim_nets record for the given information"""
268 datacenters
= [n
['datacenter_id'] for n
in networks
]
269 wim_id
= self
.find_common_wim(datacenters
, tenant
)
271 account
= self
.persist
.get_wim_account_by(wim_id
, tenant
)
274 'uuid': str(uuid4()),
275 'instance_scenario_id': instance_scenario_id
,
276 'sce_net_id': sce_net_id
,
278 'wim_account_id': account
['uuid']
281 def derive_wan_links(self
, networks
, tenant
=None):
282 """Discover and return what are the wan_links that have to be created
283 considering a set of networks (VLDs) required for a scenario instance
287 networks(list): Dicts containing the information about the networks
288 that will be instantiated to materialize a Network Service
292 list: list of WAN links to be written to the database
294 # Group networks by key=(instance_scenario_id, sce_net_id)
295 filtered
= _filter_multi_vim(networks
)
296 grouped_networks
= _group_networks(filtered
)
297 datacenters_per_group
= _count_datacenters(grouped_networks
)
298 # For each group count the number of networks. If greater then 1,
299 # we have to create a wan link connecting them.
301 for key
, counter
in datacenters_per_group
305 self
.derive_wan_link(key
[0], key
[1], grouped_networks
[key
], tenant
)
306 for key
in wan_groups
309 def create_action(self
, wan_link
):
310 """For a single wan_link create the corresponding create action"""
313 'status': 'SCHEDULED',
314 'item': 'instance_wim_nets',
315 'item_id': wan_link
['uuid'],
316 'wim_account_id': wan_link
['wim_account_id']
319 def create_actions(self
, wan_links
):
320 """For an array of wan_links, create all the corresponding actions"""
321 return [self
.create_action(l
) for l
in wan_links
]
323 def delete_action(self
, wan_link
):
324 """For a single wan_link create the corresponding create action"""
327 'status': 'SCHEDULED',
328 'item': 'instance_wim_nets',
329 'item_id': wan_link
['uuid'],
330 'wim_account_id': wan_link
['wim_account_id'],
331 'extra': json
.dumps({'wan_link': wan_link
})
332 # We serialize and cache the wan_link here, because it can be
333 # deleted during the delete process
336 def delete_actions(self
, wan_links
=(), instance_scenario_id
=None):
337 """Given a Instance Scenario, remove all the WAN Links created in the
339 if instance_scenario_id
:
340 wan_links
= self
.persist
.get_wan_links(
341 instance_scenario_id
=instance_scenario_id
)
342 return [self
.delete_action(l
) for l
in wan_links
]
344 def incorporate_actions(self
, wim_actions
, instance_action
):
345 """Make the instance action consider new WIM actions and make the WIM
346 actions aware of the instance action
348 current
= instance_action
.setdefault('number_tasks', 0)
349 for i
, action
in enumerate(wim_actions
):
350 action
['task_index'] = current
+ i
351 action
['instance_action_id'] = instance_action
['uuid']
352 instance_action
['number_tasks'] += len(wim_actions
)
354 return wim_actions
, instance_action
356 def dispatch(self
, tasks
):
357 """Enqueue a list of tasks for further processing.
359 This function is supposed to be called outside from the WIM Thread.
362 if task
['wim_account_id'] not in self
.threads
:
363 error_msg
= str(WimAccountNotActive(
364 'Requests send to the WIM Account %s are not currently '
365 'being processed.', task
['wim_account_id']))
366 Action(task
, self
.logger
).fail(self
.persist
, error_msg
)
367 self
.persist
.update_wan_link(task
['item_id'],
369 'error_msg': error_msg
})
370 self
.logger
.error('Task %s %s %s not dispatched.\n%s',
371 task
['action'], task
['item'],
372 task
['instance_account_id'], error_msg
)
374 self
.threads
[task
['wim_account_id']].insert_task(task
)
375 self
.logger
.debug('Task %s %s %s dispatched',
376 task
['action'], task
['item'],
377 task
['instance_action_id'])
379 def _spawn_thread(self
, wim_account
):
380 """Spawn a WIM thread
383 wim_account (dict): WIM information (usually persisted)
384 The `wim` field is required to be set with a valid WIM record
385 inside the `wim_account` dict
388 threading.Thread: Thread object
392 thread
= WimThread(self
.persist
, wim_account
, ovim
=self
.ovim
)
393 self
.threads
[wim_account
['uuid']] = thread
396 self
.logger
.error('Error when spawning WIM thread for %s',
397 wim_account
['uuid'], exc_info
=True)
401 def start_threads(self
):
402 """Start the threads responsible for processing WIM Actions"""
403 accounts
= self
.persist
.get_wim_accounts(error_if_none
=False)
404 self
.threads
= remove_none_items(
405 {a
['uuid']: self
._spawn
_thread
(a
) for a
in accounts
})
407 def stop_threads(self
):
408 """Stop the threads responsible for processing WIM Actions"""
409 for uuid
, thread
in self
.threads
.items():
411 del self
.threads
[uuid
]
414 def threads_running(self
):
415 """Ensure no thread will be left running"""
416 # This method is particularly important for testing :)
424 def _filter_multi_vim(networks
):
425 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
426 return [n
for n
in networks
if 'sce_net_id' in n
and n
['sce_net_id']]
429 def _group_networks(networks
):
430 """Group networks that correspond to the same instance_scenario_id and
431 sce_net_id (NSR and VLD).
434 networks(list): Dicts containing the information about the networks
435 that will be instantiated to materialize a Network Service
438 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
439 are lits of networks.
441 criteria
= itemgetter('instance_scenario_id', 'sce_net_id')
443 networks
= sorted(networks
, key
=criteria
)
444 return {k
: list(v
) for k
, v
in groupby(networks
, key
=criteria
)}
447 def _count_datacenters(grouped_networks
):
448 """Count the number of datacenters in each group of networks
451 list of tuples: the first element is the group key, while the second
452 element is the number of datacenters in each group.
454 return ((key
, len(set(n
['datacenter_id'] for n
in group
)))
455 for key
, group
in grouped_networks
.items())