blob: 6ff2b4fc02a34635e570f9918df8bd1ddde0bf03 [file] [log] [blame]
Anderson Bravalheri0446cd52018-08-17 15:26:19 +01001# -*- coding: utf-8 -*-
2##
3# Copyright 2018 University of Bristol - High Performance Networks Research
4# Group
5# All Rights Reserved.
6#
7# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9#
10# Licensed under the Apache License, Version 2.0 (the "License"); you may
11# not use this file except in compliance with the License. You may obtain
12# a copy of the License at
13#
14# http://www.apache.org/licenses/LICENSE-2.0
15#
16# Unless required by applicable law or agreed to in writing, software
17# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19# License for the specific language governing permissions and limitations
20# under the License.
21#
22# For those usages not covered by the Apache License, Version 2.0 please
23# contact with: <highperformance-networks@bristol.ac.uk>
24#
25# Neither the name of the University of Bristol nor the names of its
26# contributors may be used to endorse or promote products derived from
27# this software without specific prior written permission.
28#
29# This work has been performed in the context of DCMS UK 5G Testbeds
30# & Trials Programme and in the framework of the Metro-Haul project -
31# funded by the European Commission under Grant number 761727 through the
32# Horizon 2020 and 5G-PPP programmes.
33##
34
35"""This module contains the domain logic, and the implementation of the
36required steps to perform VNF management and orchestration in a WAN
37environment.
38
39It works as an extension/complement to the main functions contained in the
40``nfvo.py`` file and avoids interacting directly with the database, by relying
41on the `persistence` module.
42
43No http request handling/direct interaction with the database should be present
44in this file.
45"""
46import json
47import logging
48from contextlib import contextmanager
49from itertools import groupby
50from operator import itemgetter
Anderson Bravalherifed47b02018-12-16 20:44:08 +000051from sys import exc_info
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010052from uuid import uuid4
53
Anderson Bravalherifed47b02018-12-16 20:44:08 +000054from six import reraise
55
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010056from ..utils import remove_none_items
57from .actions import Action
58from .errors import (
Anderson Bravalherifed47b02018-12-16 20:44:08 +000059 DbBaseException,
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010060 NoWimConnectedToDatacenters,
Anderson Bravalherifed47b02018-12-16 20:44:08 +000061 UnexpectedDatabaseError,
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010062 WimAccountNotActive
63)
64from .wim_thread import WimThread
65
66
67class WimEngine(object):
68 """Logic supporting the establishment of WAN links when NS spans across
69 different datacenters.
70 """
71 def __init__(self, persistence, logger=None, ovim=None):
72 self.persist = persistence
73 self.logger = logger or logging.getLogger('openmano.wim.engine')
74 self.threads = {}
75 self.connectors = {}
76 self.ovim = ovim
77
78 def create_wim(self, properties):
79 """Create a new wim record according to the properties
80
81 Please check the wim schema to have more information about
82 ``properties``.
83
Anderson Bravalherifed47b02018-12-16 20:44:08 +000084 The ``config`` property might contain a ``wim_port_mapping`` dict,
85 In this case, the method ``create_wim_port_mappings`` will be
86 automatically invoked.
87
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010088 Returns:
89 str: uuid of the newly created WIM record
90 """
Anderson Bravalherifed47b02018-12-16 20:44:08 +000091 port_mapping = ((properties.get('config', {}) or {})
92 .pop('wim_port_mapping', {}))
93 uuid = self.persist.create_wim(properties)
94
95 if port_mapping:
96 try:
97 self.create_wim_port_mappings(uuid, port_mapping)
98 except DbBaseException:
99 # Rollback
100 self.delete_wim(uuid)
101 ex = UnexpectedDatabaseError('Failed to create port mappings'
102 'Rolling back wim creation')
103 self.logger.exception(str(ex))
104 reraise(ex.__class__, ex, exc_info()[2])
105
106 return uuid
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100107
108 def get_wim(self, uuid_or_name, tenant_id=None):
109 """Retrieve existing WIM record by name or id.
110
111 If ``tenant_id`` is specified, the query will be
112 limited to the WIM associated to the given tenant.
113 """
114 # Since it is a pure DB operation, we can delegate it directly
115 return self.persist.get_wim(uuid_or_name, tenant_id)
116
117 def update_wim(self, uuid_or_name, properties):
118 """Edit an existing WIM record.
119
120 ``properties`` is a dictionary with the properties being changed,
121 if a property is not present, the old value will be preserved
Anderson Bravalherifed47b02018-12-16 20:44:08 +0000122
123 Similarly to create_wim, the ``config`` property might contain a
124 ``wim_port_mapping`` dict, In this case, port mappings will be
125 automatically updated.
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100126 """
Anderson Bravalherifed47b02018-12-16 20:44:08 +0000127 port_mapping = ((properties.get('config', {}) or {})
128 .pop('wim_port_mapping', {}))
129 orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
130 uuid = orig_props['uuid']
131
132 response = self.persist.update_wim(uuid, properties)
133
134 if port_mapping:
135 try:
136 # It is very complex to diff and update individually all the
137 # port mappings. Therefore a practical approach is just delete
138 # and create it again.
139 self.persist.delete_wim_port_mappings(uuid)
140 # ^ Calling from persistence avoid reloading twice the thread
141 self.create_wim_port_mappings(uuid, port_mapping)
142 except DbBaseException:
143 # Rollback
144 self.update_wim(uuid_or_name, orig_props)
145 ex = UnexpectedDatabaseError('Failed to update port mappings'
146 'Rolling back wim updates\n')
147 self.logger.exception(str(ex))
148 reraise(ex.__class__, ex, exc_info()[2])
149
150 return response
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100151
152 def delete_wim(self, uuid_or_name):
153 """Kill the corresponding wim threads and erase the WIM record"""
154 # Theoretically, we can rely on the database to drop the wim_accounts
155 # automatically, since we have configures 'ON CASCADE DELETE'.
156 # However, use use `delete_wim_accounts` to kill all the running
157 # threads.
158 self.delete_wim_accounts(uuid_or_name)
159 return self.persist.delete_wim(uuid_or_name)
160
161 def create_wim_account(self, wim, tenant, properties):
162 """Create an account that associates a tenant to a WIM.
163
164 As a side effect this function will spawn a new thread
165
166 Arguments:
167 wim (str): name or uuid of the WIM related to the account being
168 created
169 tenant (str): name or uuid of the nfvo tenant to which the account
170 will be created
171 properties (dict): properties of the account
172 (eg. username, password, ...)
173
174 Returns:
175 dict: Created record
176 """
177 uuid = self.persist.create_wim_account(wim, tenant, properties)
178 account = self.persist.get_wim_account_by(uuid=uuid)
179 # ^ We need to use get_wim_account_by here, since this methods returns
180 # all the associations, and we need the wim to create the thread
181 self._spawn_thread(account)
182 return account
183
184 def _update_single_wim_account(self, account, properties):
185 """Update WIM Account, taking care to reload the corresponding thread
186
187 Arguments:
188 account (dict): Current account record
189 properties (dict): Properties to be updated
190
191 Returns:
192 dict: updated record
193 """
194 account = self.persist.update_wim_account(account['uuid'], properties)
195 self.threads[account['uuid']].reload()
196 return account
197
198 def update_wim_accounts(self, wim, tenant, properties):
199 """Update all the accounts related to a WIM and a tenant,
200 thanking care of reloading threads.
201
202 Arguments:
203 wim (str): uuid or name of a WIM record
204 tenant (str): uuid or name of a NFVO tenant record
205 properties (dict): attributes with values to be updated
206
207 Returns
208 list: Records that were updated
209 """
210 accounts = self.persist.get_wim_accounts_by(wim, tenant)
211 return [self._update_single_wim_account(account, properties)
212 for account in accounts]
213
214 def _delete_single_wim_account(self, account):
215 """Delete WIM Account, taking care to remove the corresponding thread
216 and delete the internal WIM account, if it was automatically generated.
217
218 Arguments:
219 account (dict): Current account record
220 properties (dict): Properties to be updated
221
222 Returns:
223 dict: current record (same as input)
224 """
225 self.persist.delete_wim_account(account['uuid'])
226
227 if account['uuid'] not in self.threads:
228 raise WimAccountNotActive(
229 'Requests send to the WIM Account %s are not currently '
230 'being processed.', account['uuid'])
231 else:
232 self.threads[account['uuid']].exit()
233 del self.threads[account['uuid']]
234
235 return account
236
237 def delete_wim_accounts(self, wim, tenant=None, **kwargs):
238 """Delete all the accounts related to a WIM (and a tenant),
239 thanking care of threads and internal WIM accounts.
240
241 Arguments:
242 wim (str): uuid or name of a WIM record
243 tenant (str): uuid or name of a NFVO tenant record
244
245 Returns
246 list: Records that were deleted
247 """
248 kwargs.setdefault('error_if_none', False)
249 accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
250 return [self._delete_single_wim_account(a) for a in accounts]
251
252 def _reload_wim_threads(self, wim_id):
253 for thread in self.threads.values():
254 if thread.wim_account['wim_id'] == wim_id:
255 thread.reload()
256
257 def create_wim_port_mappings(self, wim, properties, tenant=None):
258 """Store information about port mappings from Database"""
259 # TODO: Review tenants... WIMs can exist across different tenants,
260 # and the port_mappings are a WIM property, not a wim_account
261 # property, so the concepts are not related
262 wim = self.persist.get_by_name_or_uuid('wims', wim)
263 result = self.persist.create_wim_port_mappings(wim, properties, tenant)
264 self._reload_wim_threads(wim['uuid'])
265 return result
266
267 def get_wim_port_mappings(self, wim):
268 """Retrive information about port mappings from Database"""
269 return self.persist.get_wim_port_mappings(wim)
270
271 def delete_wim_port_mappings(self, wim):
272 """Erase the port mapping records associated with the WIM"""
273 wim = self.persist.get_by_name_or_uuid('wims', wim)
274 message = self.persist.delete_wim_port_mappings(wim['uuid'])
275 self._reload_wim_threads(wim['uuid'])
276 return message
277
278 def find_common_wims(self, datacenter_ids, tenant):
279 """Find WIMs that are common to all datacenters listed"""
280 mappings = self.persist.get_wim_port_mappings(
281 datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
282
283 wim_id_of = itemgetter('wim_id')
284 sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
285 grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
286 mapped_datacenters = {
287 wim_id: [m['datacenter_id'] for m in mappings]
288 for wim_id, mappings in grouped_mappings
289 }
290
291 return [
292 wim_id
293 for wim_id, connected_datacenters in mapped_datacenters.items()
294 if set(connected_datacenters) >= set(datacenter_ids)
295 ]
296
297 def find_common_wim(self, datacenter_ids, tenant):
298 """Find a single WIM that is able to connect all the datacenters
299 listed
300
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000301 Raises:
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100302 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
303 at once is found
304 """
305 suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
306
307 if not suitable_wim_ids:
308 raise NoWimConnectedToDatacenters(datacenter_ids)
309
310 # TODO: use a criteria to determine which WIM is going to be used,
311 # instead of always using the first one (strategy pattern can be
312 # used here)
313 return suitable_wim_ids[0]
314
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000315 def find_suitable_wim_account(self, datacenter_ids, tenant):
316 """Find a WIM account that is able to connect all the datacenters
317 listed
318
319 Arguments:
320 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
321 that need to be connected.
322 tenant (str): UUID of the OSM tenant
323
324 Returns:
325 str: UUID of the WIM account that is able to connect all the
326 datacenters.
327 """
328 wim_id = self.find_common_wim(datacenter_ids, tenant)
329 return self.persist.get_wim_account_by(wim_id, tenant)['uuid']
330
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100331 def derive_wan_link(self,
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000332 wim_usage,
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100333 instance_scenario_id, sce_net_id,
334 networks, tenant):
335 """Create a instance_wim_nets record for the given information"""
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000336 if sce_net_id in wim_usage:
337 account_id = wim_usage[sce_net_id]
338 account = self.persist.get_wim_account_by(uuid=account_id)
339 wim_id = account['wim_id']
340 else:
341 datacenters = [n['datacenter_id'] for n in networks]
342 wim_id = self.find_common_wim(datacenters, tenant)
343 account = self.persist.get_wim_account_by(wim_id, tenant)
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100344
345 return {
346 'uuid': str(uuid4()),
347 'instance_scenario_id': instance_scenario_id,
348 'sce_net_id': sce_net_id,
349 'wim_id': wim_id,
350 'wim_account_id': account['uuid']
351 }
352
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000353 def derive_wan_links(self, wim_usage, networks, tenant=None):
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100354 """Discover and return what are the wan_links that have to be created
355 considering a set of networks (VLDs) required for a scenario instance
356 (NSR).
357
358 Arguments:
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000359 wim_usage(dict): Mapping between sce_net_id and wim_id
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100360 networks(list): Dicts containing the information about the networks
361 that will be instantiated to materialize a Network Service
362 (scenario) instance.
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000363 Corresponding to the ``instance_net`` record.
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100364
365 Returns:
366 list: list of WAN links to be written to the database
367 """
368 # Group networks by key=(instance_scenario_id, sce_net_id)
369 filtered = _filter_multi_vim(networks)
370 grouped_networks = _group_networks(filtered)
371 datacenters_per_group = _count_datacenters(grouped_networks)
372 # For each group count the number of networks. If greater then 1,
373 # we have to create a wan link connecting them.
374 wan_groups = [key
375 for key, counter in datacenters_per_group
376 if counter > 1]
377
378 return [
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000379 self.derive_wan_link(wim_usage,
380 key[0], key[1], grouped_networks[key], tenant)
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100381 for key in wan_groups
382 ]
383
384 def create_action(self, wan_link):
385 """For a single wan_link create the corresponding create action"""
386 return {
387 'action': 'CREATE',
388 'status': 'SCHEDULED',
389 'item': 'instance_wim_nets',
390 'item_id': wan_link['uuid'],
391 'wim_account_id': wan_link['wim_account_id']
392 }
393
394 def create_actions(self, wan_links):
395 """For an array of wan_links, create all the corresponding actions"""
396 return [self.create_action(l) for l in wan_links]
397
398 def delete_action(self, wan_link):
399 """For a single wan_link create the corresponding create action"""
400 return {
401 'action': 'DELETE',
402 'status': 'SCHEDULED',
403 'item': 'instance_wim_nets',
404 'item_id': wan_link['uuid'],
405 'wim_account_id': wan_link['wim_account_id'],
406 'extra': json.dumps({'wan_link': wan_link})
407 # We serialize and cache the wan_link here, because it can be
408 # deleted during the delete process
409 }
410
411 def delete_actions(self, wan_links=(), instance_scenario_id=None):
412 """Given a Instance Scenario, remove all the WAN Links created in the
413 past"""
414 if instance_scenario_id:
415 wan_links = self.persist.get_wan_links(
416 instance_scenario_id=instance_scenario_id)
417 return [self.delete_action(l) for l in wan_links]
418
419 def incorporate_actions(self, wim_actions, instance_action):
420 """Make the instance action consider new WIM actions and make the WIM
421 actions aware of the instance action
422 """
423 current = instance_action.setdefault('number_tasks', 0)
424 for i, action in enumerate(wim_actions):
425 action['task_index'] = current + i
426 action['instance_action_id'] = instance_action['uuid']
427 instance_action['number_tasks'] += len(wim_actions)
428
429 return wim_actions, instance_action
430
431 def dispatch(self, tasks):
432 """Enqueue a list of tasks for further processing.
433
434 This function is supposed to be called outside from the WIM Thread.
435 """
436 for task in tasks:
437 if task['wim_account_id'] not in self.threads:
438 error_msg = str(WimAccountNotActive(
439 'Requests send to the WIM Account %s are not currently '
440 'being processed.', task['wim_account_id']))
441 Action(task, self.logger).fail(self.persist, error_msg)
442 self.persist.update_wan_link(task['item_id'],
443 {'status': 'ERROR',
444 'error_msg': error_msg})
445 self.logger.error('Task %s %s %s not dispatched.\n%s',
446 task['action'], task['item'],
447 task['instance_account_id'], error_msg)
448 else:
449 self.threads[task['wim_account_id']].insert_task(task)
450 self.logger.debug('Task %s %s %s dispatched',
451 task['action'], task['item'],
452 task['instance_action_id'])
453
454 def _spawn_thread(self, wim_account):
455 """Spawn a WIM thread
456
457 Arguments:
458 wim_account (dict): WIM information (usually persisted)
459 The `wim` field is required to be set with a valid WIM record
460 inside the `wim_account` dict
461
462 Return:
463 threading.Thread: Thread object
464 """
465 thread = None
466 try:
467 thread = WimThread(self.persist, wim_account, ovim=self.ovim)
468 self.threads[wim_account['uuid']] = thread
469 thread.start()
470 except: # noqa
471 self.logger.error('Error when spawning WIM thread for %s',
472 wim_account['uuid'], exc_info=True)
473
474 return thread
475
476 def start_threads(self):
477 """Start the threads responsible for processing WIM Actions"""
478 accounts = self.persist.get_wim_accounts(error_if_none=False)
479 self.threads = remove_none_items(
480 {a['uuid']: self._spawn_thread(a) for a in accounts})
481
482 def stop_threads(self):
483 """Stop the threads responsible for processing WIM Actions"""
484 for uuid, thread in self.threads.items():
485 thread.exit()
486 del self.threads[uuid]
487
488 @contextmanager
489 def threads_running(self):
490 """Ensure no thread will be left running"""
491 # This method is particularly important for testing :)
492 try:
493 self.start_threads()
494 yield
495 finally:
496 self.stop_threads()
497
498
499def _filter_multi_vim(networks):
500 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
501 return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
502
503
504def _group_networks(networks):
505 """Group networks that correspond to the same instance_scenario_id and
506 sce_net_id (NSR and VLD).
507
508 Arguments:
509 networks(list): Dicts containing the information about the networks
510 that will be instantiated to materialize a Network Service
511 (scenario) instance.
512 Returns:
513 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
514 are lits of networks.
515 """
516 criteria = itemgetter('instance_scenario_id', 'sce_net_id')
517
518 networks = sorted(networks, key=criteria)
519 return {k: list(v) for k, v in groupby(networks, key=criteria)}
520
521
522def _count_datacenters(grouped_networks):
523 """Count the number of datacenters in each group of networks
524
525 Returns:
526 list of tuples: the first element is the group key, while the second
527 element is the number of datacenters in each group.
528 """
529 return ((key, len(set(n['datacenter_id'] for n in group)))
530 for key, group in grouped_networks.items())