blob: 3fdd03244f125abdde809788deb62cf134d1a4a0 [file] [log] [blame]
Anderson Bravalheri0446cd52018-08-17 15:26:19 +01001# -*- coding: utf-8 -*-
2##
3# Copyright 2018 University of Bristol - High Performance Networks Research
4# Group
5# All Rights Reserved.
6#
7# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9#
10# Licensed under the Apache License, Version 2.0 (the "License"); you may
11# not use this file except in compliance with the License. You may obtain
12# a copy of the License at
13#
14# http://www.apache.org/licenses/LICENSE-2.0
15#
16# Unless required by applicable law or agreed to in writing, software
17# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19# License for the specific language governing permissions and limitations
20# under the License.
21#
22# For those usages not covered by the Apache License, Version 2.0 please
23# contact with: <highperformance-networks@bristol.ac.uk>
24#
25# Neither the name of the University of Bristol nor the names of its
26# contributors may be used to endorse or promote products derived from
27# this software without specific prior written permission.
28#
29# This work has been performed in the context of DCMS UK 5G Testbeds
30# & Trials Programme and in the framework of the Metro-Haul project -
31# funded by the European Commission under Grant number 761727 through the
32# Horizon 2020 and 5G-PPP programmes.
33##
34
35"""This module contains the domain logic, and the implementation of the
36required steps to perform VNF management and orchestration in a WAN
37environment.
38
39It works as an extension/complement to the main functions contained in the
40``nfvo.py`` file and avoids interacting directly with the database, by relying
41on the `persistence` module.
42
43No http request handling/direct interaction with the database should be present
44in this file.
45"""
46import json
47import logging
48from contextlib import contextmanager
49from itertools import groupby
50from operator import itemgetter
Anderson Bravalherifed47b02018-12-16 20:44:08 +000051from sys import exc_info
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010052from uuid import uuid4
53
Anderson Bravalherifed47b02018-12-16 20:44:08 +000054from six import reraise
55
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010056from ..utils import remove_none_items
57from .actions import Action
58from .errors import (
Anderson Bravalherifed47b02018-12-16 20:44:08 +000059 DbBaseException,
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010060 NoWimConnectedToDatacenters,
Anderson Bravalherifed47b02018-12-16 20:44:08 +000061 UnexpectedDatabaseError,
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010062 WimAccountNotActive
63)
64from .wim_thread import WimThread
65
66
67class WimEngine(object):
68 """Logic supporting the establishment of WAN links when NS spans across
69 different datacenters.
70 """
71 def __init__(self, persistence, logger=None, ovim=None):
72 self.persist = persistence
73 self.logger = logger or logging.getLogger('openmano.wim.engine')
74 self.threads = {}
75 self.connectors = {}
76 self.ovim = ovim
77
78 def create_wim(self, properties):
79 """Create a new wim record according to the properties
80
81 Please check the wim schema to have more information about
82 ``properties``.
83
Anderson Bravalherifed47b02018-12-16 20:44:08 +000084 The ``config`` property might contain a ``wim_port_mapping`` dict,
85 In this case, the method ``create_wim_port_mappings`` will be
86 automatically invoked.
87
Anderson Bravalheri0446cd52018-08-17 15:26:19 +010088 Returns:
89 str: uuid of the newly created WIM record
90 """
Anderson Bravalherifed47b02018-12-16 20:44:08 +000091 port_mapping = ((properties.get('config', {}) or {})
92 .pop('wim_port_mapping', {}))
93 uuid = self.persist.create_wim(properties)
94
95 if port_mapping:
96 try:
97 self.create_wim_port_mappings(uuid, port_mapping)
98 except DbBaseException:
99 # Rollback
100 self.delete_wim(uuid)
101 ex = UnexpectedDatabaseError('Failed to create port mappings'
102 'Rolling back wim creation')
103 self.logger.exception(str(ex))
104 reraise(ex.__class__, ex, exc_info()[2])
105
106 return uuid
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100107
108 def get_wim(self, uuid_or_name, tenant_id=None):
109 """Retrieve existing WIM record by name or id.
110
111 If ``tenant_id`` is specified, the query will be
112 limited to the WIM associated to the given tenant.
113 """
114 # Since it is a pure DB operation, we can delegate it directly
115 return self.persist.get_wim(uuid_or_name, tenant_id)
116
117 def update_wim(self, uuid_or_name, properties):
118 """Edit an existing WIM record.
119
120 ``properties`` is a dictionary with the properties being changed,
121 if a property is not present, the old value will be preserved
Anderson Bravalherifed47b02018-12-16 20:44:08 +0000122
123 Similarly to create_wim, the ``config`` property might contain a
124 ``wim_port_mapping`` dict, In this case, port mappings will be
125 automatically updated.
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100126 """
Anderson Bravalherifed47b02018-12-16 20:44:08 +0000127 port_mapping = ((properties.get('config', {}) or {})
128 .pop('wim_port_mapping', {}))
129 orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
130 uuid = orig_props['uuid']
131
132 response = self.persist.update_wim(uuid, properties)
133
134 if port_mapping:
135 try:
136 # It is very complex to diff and update individually all the
137 # port mappings. Therefore a practical approach is just delete
138 # and create it again.
139 self.persist.delete_wim_port_mappings(uuid)
140 # ^ Calling from persistence avoid reloading twice the thread
141 self.create_wim_port_mappings(uuid, port_mapping)
142 except DbBaseException:
143 # Rollback
144 self.update_wim(uuid_or_name, orig_props)
145 ex = UnexpectedDatabaseError('Failed to update port mappings'
146 'Rolling back wim updates\n')
147 self.logger.exception(str(ex))
148 reraise(ex.__class__, ex, exc_info()[2])
149
150 return response
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100151
152 def delete_wim(self, uuid_or_name):
153 """Kill the corresponding wim threads and erase the WIM record"""
154 # Theoretically, we can rely on the database to drop the wim_accounts
155 # automatically, since we have configures 'ON CASCADE DELETE'.
156 # However, use use `delete_wim_accounts` to kill all the running
157 # threads.
158 self.delete_wim_accounts(uuid_or_name)
159 return self.persist.delete_wim(uuid_or_name)
160
161 def create_wim_account(self, wim, tenant, properties):
162 """Create an account that associates a tenant to a WIM.
163
164 As a side effect this function will spawn a new thread
165
166 Arguments:
167 wim (str): name or uuid of the WIM related to the account being
168 created
169 tenant (str): name or uuid of the nfvo tenant to which the account
170 will be created
171 properties (dict): properties of the account
172 (eg. username, password, ...)
173
174 Returns:
175 dict: Created record
176 """
177 uuid = self.persist.create_wim_account(wim, tenant, properties)
178 account = self.persist.get_wim_account_by(uuid=uuid)
179 # ^ We need to use get_wim_account_by here, since this methods returns
180 # all the associations, and we need the wim to create the thread
181 self._spawn_thread(account)
182 return account
183
184 def _update_single_wim_account(self, account, properties):
185 """Update WIM Account, taking care to reload the corresponding thread
186
187 Arguments:
188 account (dict): Current account record
189 properties (dict): Properties to be updated
190
191 Returns:
192 dict: updated record
193 """
194 account = self.persist.update_wim_account(account['uuid'], properties)
195 self.threads[account['uuid']].reload()
196 return account
197
198 def update_wim_accounts(self, wim, tenant, properties):
199 """Update all the accounts related to a WIM and a tenant,
200 thanking care of reloading threads.
201
202 Arguments:
203 wim (str): uuid or name of a WIM record
204 tenant (str): uuid or name of a NFVO tenant record
205 properties (dict): attributes with values to be updated
206
207 Returns
208 list: Records that were updated
209 """
210 accounts = self.persist.get_wim_accounts_by(wim, tenant)
211 return [self._update_single_wim_account(account, properties)
212 for account in accounts]
213
214 def _delete_single_wim_account(self, account):
215 """Delete WIM Account, taking care to remove the corresponding thread
216 and delete the internal WIM account, if it was automatically generated.
217
218 Arguments:
219 account (dict): Current account record
220 properties (dict): Properties to be updated
221
222 Returns:
223 dict: current record (same as input)
224 """
225 self.persist.delete_wim_account(account['uuid'])
226
227 if account['uuid'] not in self.threads:
228 raise WimAccountNotActive(
229 'Requests send to the WIM Account %s are not currently '
230 'being processed.', account['uuid'])
231 else:
232 self.threads[account['uuid']].exit()
233 del self.threads[account['uuid']]
234
235 return account
236
237 def delete_wim_accounts(self, wim, tenant=None, **kwargs):
238 """Delete all the accounts related to a WIM (and a tenant),
239 thanking care of threads and internal WIM accounts.
240
241 Arguments:
242 wim (str): uuid or name of a WIM record
243 tenant (str): uuid or name of a NFVO tenant record
244
245 Returns
246 list: Records that were deleted
247 """
248 kwargs.setdefault('error_if_none', False)
249 accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
250 return [self._delete_single_wim_account(a) for a in accounts]
251
252 def _reload_wim_threads(self, wim_id):
253 for thread in self.threads.values():
254 if thread.wim_account['wim_id'] == wim_id:
255 thread.reload()
256
257 def create_wim_port_mappings(self, wim, properties, tenant=None):
258 """Store information about port mappings from Database"""
259 # TODO: Review tenants... WIMs can exist across different tenants,
260 # and the port_mappings are a WIM property, not a wim_account
261 # property, so the concepts are not related
262 wim = self.persist.get_by_name_or_uuid('wims', wim)
263 result = self.persist.create_wim_port_mappings(wim, properties, tenant)
264 self._reload_wim_threads(wim['uuid'])
265 return result
266
267 def get_wim_port_mappings(self, wim):
268 """Retrive information about port mappings from Database"""
269 return self.persist.get_wim_port_mappings(wim)
270
271 def delete_wim_port_mappings(self, wim):
272 """Erase the port mapping records associated with the WIM"""
273 wim = self.persist.get_by_name_or_uuid('wims', wim)
274 message = self.persist.delete_wim_port_mappings(wim['uuid'])
275 self._reload_wim_threads(wim['uuid'])
276 return message
277
278 def find_common_wims(self, datacenter_ids, tenant):
279 """Find WIMs that are common to all datacenters listed"""
280 mappings = self.persist.get_wim_port_mappings(
281 datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
282
283 wim_id_of = itemgetter('wim_id')
284 sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
285 grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
286 mapped_datacenters = {
287 wim_id: [m['datacenter_id'] for m in mappings]
288 for wim_id, mappings in grouped_mappings
289 }
290
291 return [
292 wim_id
293 for wim_id, connected_datacenters in mapped_datacenters.items()
294 if set(connected_datacenters) >= set(datacenter_ids)
295 ]
296
297 def find_common_wim(self, datacenter_ids, tenant):
298 """Find a single WIM that is able to connect all the datacenters
299 listed
300
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000301 Raises:
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100302 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
303 at once is found
304 """
305 suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
306
307 if not suitable_wim_ids:
308 raise NoWimConnectedToDatacenters(datacenter_ids)
309
310 # TODO: use a criteria to determine which WIM is going to be used,
311 # instead of always using the first one (strategy pattern can be
312 # used here)
313 return suitable_wim_ids[0]
314
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000315 def find_suitable_wim_account(self, datacenter_ids, tenant):
316 """Find a WIM account that is able to connect all the datacenters
317 listed
318
319 Arguments:
320 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
321 that need to be connected.
322 tenant (str): UUID of the OSM tenant
323
324 Returns:
tierno4070e442019-01-23 10:19:23 +0000325 object with the WIM account that is able to connect all the
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000326 datacenters.
327 """
328 wim_id = self.find_common_wim(datacenter_ids, tenant)
tierno4070e442019-01-23 10:19:23 +0000329 return self.persist.get_wim_account_by(wim_id, tenant)
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000330
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100331 def derive_wan_link(self,
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000332 wim_usage,
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100333 instance_scenario_id, sce_net_id,
tierno3c44e7b2019-03-04 17:32:01 +0000334 networks, tenant, related=None):
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100335 """Create a instance_wim_nets record for the given information"""
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000336 if sce_net_id in wim_usage:
337 account_id = wim_usage[sce_net_id]
338 account = self.persist.get_wim_account_by(uuid=account_id)
339 wim_id = account['wim_id']
340 else:
341 datacenters = [n['datacenter_id'] for n in networks]
342 wim_id = self.find_common_wim(datacenters, tenant)
343 account = self.persist.get_wim_account_by(wim_id, tenant)
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100344
345 return {
346 'uuid': str(uuid4()),
347 'instance_scenario_id': instance_scenario_id,
348 'sce_net_id': sce_net_id,
349 'wim_id': wim_id,
tierno3c44e7b2019-03-04 17:32:01 +0000350 'wim_account_id': account['uuid'],
Anderson Bravalheri98d35c22019-06-03 15:14:12 +0100351 'related': related
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100352 }
353
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000354 def derive_wan_links(self, wim_usage, networks, tenant=None):
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100355 """Discover and return what are the wan_links that have to be created
356 considering a set of networks (VLDs) required for a scenario instance
357 (NSR).
358
359 Arguments:
tiernofc7cfbf2019-03-20 17:23:45 +0000360 wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100361 networks(list): Dicts containing the information about the networks
362 that will be instantiated to materialize a Network Service
363 (scenario) instance.
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000364 Corresponding to the ``instance_net`` record.
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100365
366 Returns:
367 list: list of WAN links to be written to the database
368 """
369 # Group networks by key=(instance_scenario_id, sce_net_id)
tierno3c44e7b2019-03-04 17:32:01 +0000370 related = None
371 if networks:
372 related = networks[0].get("related")
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100373 filtered = _filter_multi_vim(networks)
374 grouped_networks = _group_networks(filtered)
375 datacenters_per_group = _count_datacenters(grouped_networks)
376 # For each group count the number of networks. If greater then 1,
377 # we have to create a wan link connecting them.
378 wan_groups = [key
379 for key, counter in datacenters_per_group
380 if counter > 1]
tiernofc7cfbf2019-03-20 17:23:45 +0000381 # Keys are tuples(instance_scenario_id, sce_net_id)
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100382 return [
Anderson Bravalherie2c09f32018-11-30 09:55:29 +0000383 self.derive_wan_link(wim_usage,
tierno3c44e7b2019-03-04 17:32:01 +0000384 key[0], key[1], grouped_networks[key], tenant, related)
tiernofc7cfbf2019-03-20 17:23:45 +0000385 for key in wan_groups if wim_usage.get(key[1]) is not False
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100386 ]
387
388 def create_action(self, wan_link):
389 """For a single wan_link create the corresponding create action"""
390 return {
391 'action': 'CREATE',
392 'status': 'SCHEDULED',
393 'item': 'instance_wim_nets',
394 'item_id': wan_link['uuid'],
395 'wim_account_id': wan_link['wim_account_id']
396 }
397
398 def create_actions(self, wan_links):
399 """For an array of wan_links, create all the corresponding actions"""
400 return [self.create_action(l) for l in wan_links]
401
402 def delete_action(self, wan_link):
403 """For a single wan_link create the corresponding create action"""
404 return {
405 'action': 'DELETE',
406 'status': 'SCHEDULED',
407 'item': 'instance_wim_nets',
408 'item_id': wan_link['uuid'],
409 'wim_account_id': wan_link['wim_account_id'],
410 'extra': json.dumps({'wan_link': wan_link})
411 # We serialize and cache the wan_link here, because it can be
412 # deleted during the delete process
413 }
414
415 def delete_actions(self, wan_links=(), instance_scenario_id=None):
416 """Given a Instance Scenario, remove all the WAN Links created in the
417 past"""
418 if instance_scenario_id:
419 wan_links = self.persist.get_wan_links(
420 instance_scenario_id=instance_scenario_id)
421 return [self.delete_action(l) for l in wan_links]
422
423 def incorporate_actions(self, wim_actions, instance_action):
424 """Make the instance action consider new WIM actions and make the WIM
425 actions aware of the instance action
426 """
427 current = instance_action.setdefault('number_tasks', 0)
428 for i, action in enumerate(wim_actions):
429 action['task_index'] = current + i
430 action['instance_action_id'] = instance_action['uuid']
431 instance_action['number_tasks'] += len(wim_actions)
432
433 return wim_actions, instance_action
434
435 def dispatch(self, tasks):
436 """Enqueue a list of tasks for further processing.
437
438 This function is supposed to be called outside from the WIM Thread.
439 """
440 for task in tasks:
441 if task['wim_account_id'] not in self.threads:
442 error_msg = str(WimAccountNotActive(
443 'Requests send to the WIM Account %s are not currently '
444 'being processed.', task['wim_account_id']))
445 Action(task, self.logger).fail(self.persist, error_msg)
446 self.persist.update_wan_link(task['item_id'],
447 {'status': 'ERROR',
448 'error_msg': error_msg})
449 self.logger.error('Task %s %s %s not dispatched.\n%s',
450 task['action'], task['item'],
451 task['instance_account_id'], error_msg)
452 else:
453 self.threads[task['wim_account_id']].insert_task(task)
454 self.logger.debug('Task %s %s %s dispatched',
455 task['action'], task['item'],
456 task['instance_action_id'])
457
458 def _spawn_thread(self, wim_account):
459 """Spawn a WIM thread
460
461 Arguments:
462 wim_account (dict): WIM information (usually persisted)
463 The `wim` field is required to be set with a valid WIM record
464 inside the `wim_account` dict
465
466 Return:
467 threading.Thread: Thread object
468 """
469 thread = None
470 try:
471 thread = WimThread(self.persist, wim_account, ovim=self.ovim)
472 self.threads[wim_account['uuid']] = thread
473 thread.start()
474 except: # noqa
475 self.logger.error('Error when spawning WIM thread for %s',
476 wim_account['uuid'], exc_info=True)
477
478 return thread
479
480 def start_threads(self):
481 """Start the threads responsible for processing WIM Actions"""
482 accounts = self.persist.get_wim_accounts(error_if_none=False)
483 self.threads = remove_none_items(
484 {a['uuid']: self._spawn_thread(a) for a in accounts})
485
486 def stop_threads(self):
487 """Stop the threads responsible for processing WIM Actions"""
488 for uuid, thread in self.threads.items():
489 thread.exit()
490 del self.threads[uuid]
491
492 @contextmanager
493 def threads_running(self):
494 """Ensure no thread will be left running"""
495 # This method is particularly important for testing :)
496 try:
497 self.start_threads()
498 yield
499 finally:
500 self.stop_threads()
501
502
503def _filter_multi_vim(networks):
504 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
505 return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
506
507
508def _group_networks(networks):
509 """Group networks that correspond to the same instance_scenario_id and
510 sce_net_id (NSR and VLD).
511
512 Arguments:
513 networks(list): Dicts containing the information about the networks
514 that will be instantiated to materialize a Network Service
515 (scenario) instance.
516 Returns:
517 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
tiernofc7cfbf2019-03-20 17:23:45 +0000518 are list of networks.
Anderson Bravalheri0446cd52018-08-17 15:26:19 +0100519 """
520 criteria = itemgetter('instance_scenario_id', 'sce_net_id')
521
522 networks = sorted(networks, key=criteria)
523 return {k: list(v) for k, v in groupby(networks, key=criteria)}
524
525
526def _count_datacenters(grouped_networks):
527 """Count the number of datacenters in each group of networks
528
529 Returns:
530 list of tuples: the first element is the group key, while the second
531 element is the number of datacenters in each group.
532 """
533 return ((key, len(set(n['datacenter_id'] for n in group)))
534 for key, group in grouped_networks.items())