eecf10fd7f58b628a6488b0776769ef7056478ae
[osm/RO.git] / osm_ro / wim / engine.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
37 environment.
38
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
42
43 No http request handling/direct interaction with the database should be present
44 in this file.
45 """
46 import json
47 import logging
48 from contextlib import contextmanager
49 from itertools import groupby
50 from operator import itemgetter
51 from sys import exc_info
52 from uuid import uuid4
53
54 from six import reraise
55
56 from ..utils import remove_none_items
57 from .actions import Action
58 from .errors import (
59 DbBaseException,
60 NoWimConnectedToDatacenters,
61 UnexpectedDatabaseError,
62 WimAccountNotActive
63 )
64 from .wim_thread import WimThread
65
66
67 class WimEngine(object):
68 """Logic supporting the establishment of WAN links when NS spans across
69 different datacenters.
70 """
71 def __init__(self, persistence, logger=None, ovim=None):
72 self.persist = persistence
73 self.logger = logger or logging.getLogger('openmano.wim.engine')
74 self.threads = {}
75 self.connectors = {}
76 self.ovim = ovim
77
78 def create_wim(self, properties):
79 """Create a new wim record according to the properties
80
81 Please check the wim schema to have more information about
82 ``properties``.
83
84 The ``config`` property might contain a ``wim_port_mapping`` dict,
85 In this case, the method ``create_wim_port_mappings`` will be
86 automatically invoked.
87
88 Returns:
89 str: uuid of the newly created WIM record
90 """
91 port_mapping = ((properties.get('config', {}) or {})
92 .pop('wim_port_mapping', {}))
93 uuid = self.persist.create_wim(properties)
94
95 if port_mapping:
96 try:
97 self.create_wim_port_mappings(uuid, port_mapping)
98 except DbBaseException:
99 # Rollback
100 self.delete_wim(uuid)
101 ex = UnexpectedDatabaseError('Failed to create port mappings'
102 'Rolling back wim creation')
103 self.logger.exception(str(ex))
104 reraise(ex.__class__, ex, exc_info()[2])
105
106 return uuid
107
108 def get_wim(self, uuid_or_name, tenant_id=None):
109 """Retrieve existing WIM record by name or id.
110
111 If ``tenant_id`` is specified, the query will be
112 limited to the WIM associated to the given tenant.
113 """
114 # Since it is a pure DB operation, we can delegate it directly
115 return self.persist.get_wim(uuid_or_name, tenant_id)
116
117 def update_wim(self, uuid_or_name, properties):
118 """Edit an existing WIM record.
119
120 ``properties`` is a dictionary with the properties being changed,
121 if a property is not present, the old value will be preserved
122
123 Similarly to create_wim, the ``config`` property might contain a
124 ``wim_port_mapping`` dict, In this case, port mappings will be
125 automatically updated.
126 """
127 port_mapping = ((properties.get('config', {}) or {})
128 .pop('wim_port_mapping', {}))
129 orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
130 uuid = orig_props['uuid']
131
132 response = self.persist.update_wim(uuid, properties)
133
134 if port_mapping:
135 try:
136 # It is very complex to diff and update individually all the
137 # port mappings. Therefore a practical approach is just delete
138 # and create it again.
139 self.persist.delete_wim_port_mappings(uuid)
140 # ^ Calling from persistence avoid reloading twice the thread
141 self.create_wim_port_mappings(uuid, port_mapping)
142 except DbBaseException:
143 # Rollback
144 self.update_wim(uuid_or_name, orig_props)
145 ex = UnexpectedDatabaseError('Failed to update port mappings'
146 'Rolling back wim updates\n')
147 self.logger.exception(str(ex))
148 reraise(ex.__class__, ex, exc_info()[2])
149
150 return response
151
152 def delete_wim(self, uuid_or_name):
153 """Kill the corresponding wim threads and erase the WIM record"""
154 # Theoretically, we can rely on the database to drop the wim_accounts
155 # automatically, since we have configures 'ON CASCADE DELETE'.
156 # However, use use `delete_wim_accounts` to kill all the running
157 # threads.
158 self.delete_wim_accounts(uuid_or_name)
159 return self.persist.delete_wim(uuid_or_name)
160
161 def create_wim_account(self, wim, tenant, properties):
162 """Create an account that associates a tenant to a WIM.
163
164 As a side effect this function will spawn a new thread
165
166 Arguments:
167 wim (str): name or uuid of the WIM related to the account being
168 created
169 tenant (str): name or uuid of the nfvo tenant to which the account
170 will be created
171 properties (dict): properties of the account
172 (eg. username, password, ...)
173
174 Returns:
175 dict: Created record
176 """
177 uuid = self.persist.create_wim_account(wim, tenant, properties)
178 account = self.persist.get_wim_account_by(uuid=uuid)
179 # ^ We need to use get_wim_account_by here, since this methods returns
180 # all the associations, and we need the wim to create the thread
181 self._spawn_thread(account)
182 return account
183
184 def _update_single_wim_account(self, account, properties):
185 """Update WIM Account, taking care to reload the corresponding thread
186
187 Arguments:
188 account (dict): Current account record
189 properties (dict): Properties to be updated
190
191 Returns:
192 dict: updated record
193 """
194 account = self.persist.update_wim_account(account['uuid'], properties)
195 self.threads[account['uuid']].reload()
196 return account
197
198 def update_wim_accounts(self, wim, tenant, properties):
199 """Update all the accounts related to a WIM and a tenant,
200 thanking care of reloading threads.
201
202 Arguments:
203 wim (str): uuid or name of a WIM record
204 tenant (str): uuid or name of a NFVO tenant record
205 properties (dict): attributes with values to be updated
206
207 Returns
208 list: Records that were updated
209 """
210 accounts = self.persist.get_wim_accounts_by(wim, tenant)
211 return [self._update_single_wim_account(account, properties)
212 for account in accounts]
213
214 def _delete_single_wim_account(self, account):
215 """Delete WIM Account, taking care to remove the corresponding thread
216 and delete the internal WIM account, if it was automatically generated.
217
218 Arguments:
219 account (dict): Current account record
220 properties (dict): Properties to be updated
221
222 Returns:
223 dict: current record (same as input)
224 """
225 self.persist.delete_wim_account(account['uuid'])
226
227 if account['uuid'] not in self.threads:
228 raise WimAccountNotActive(
229 'Requests send to the WIM Account %s are not currently '
230 'being processed.', account['uuid'])
231 else:
232 self.threads[account['uuid']].exit()
233 del self.threads[account['uuid']]
234
235 return account
236
237 def delete_wim_accounts(self, wim, tenant=None, **kwargs):
238 """Delete all the accounts related to a WIM (and a tenant),
239 thanking care of threads and internal WIM accounts.
240
241 Arguments:
242 wim (str): uuid or name of a WIM record
243 tenant (str): uuid or name of a NFVO tenant record
244
245 Returns
246 list: Records that were deleted
247 """
248 kwargs.setdefault('error_if_none', False)
249 accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
250 return [self._delete_single_wim_account(a) for a in accounts]
251
252 def _reload_wim_threads(self, wim_id):
253 for thread in self.threads.values():
254 if thread.wim_account['wim_id'] == wim_id:
255 thread.reload()
256
257 def create_wim_port_mappings(self, wim, properties, tenant=None):
258 """Store information about port mappings from Database"""
259 # TODO: Review tenants... WIMs can exist across different tenants,
260 # and the port_mappings are a WIM property, not a wim_account
261 # property, so the concepts are not related
262 wim = self.persist.get_by_name_or_uuid('wims', wim)
263 result = self.persist.create_wim_port_mappings(wim, properties, tenant)
264 self._reload_wim_threads(wim['uuid'])
265 return result
266
267 def get_wim_port_mappings(self, wim):
268 """Retrive information about port mappings from Database"""
269 return self.persist.get_wim_port_mappings(wim)
270
271 def delete_wim_port_mappings(self, wim):
272 """Erase the port mapping records associated with the WIM"""
273 wim = self.persist.get_by_name_or_uuid('wims', wim)
274 message = self.persist.delete_wim_port_mappings(wim['uuid'])
275 self._reload_wim_threads(wim['uuid'])
276 return message
277
278 def find_common_wims(self, datacenter_ids, tenant):
279 """Find WIMs that are common to all datacenters listed"""
280 mappings = self.persist.get_wim_port_mappings(
281 datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
282
283 wim_id_of = itemgetter('wim_id')
284 sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
285 grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
286 mapped_datacenters = {
287 wim_id: [m['datacenter_id'] for m in mappings]
288 for wim_id, mappings in grouped_mappings
289 }
290
291 return [
292 wim_id
293 for wim_id, connected_datacenters in mapped_datacenters.items()
294 if set(connected_datacenters) >= set(datacenter_ids)
295 ]
296
297 def find_common_wim(self, datacenter_ids, tenant):
298 """Find a single WIM that is able to connect all the datacenters
299 listed
300
301 Raises:
302 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
303 at once is found
304 """
305 suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
306
307 if not suitable_wim_ids:
308 raise NoWimConnectedToDatacenters(datacenter_ids)
309
310 # TODO: use a criteria to determine which WIM is going to be used,
311 # instead of always using the first one (strategy pattern can be
312 # used here)
313 return suitable_wim_ids[0]
314
315 def find_suitable_wim_account(self, datacenter_ids, tenant):
316 """Find a WIM account that is able to connect all the datacenters
317 listed
318
319 Arguments:
320 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
321 that need to be connected.
322 tenant (str): UUID of the OSM tenant
323
324 Returns:
325 object with the WIM account that is able to connect all the
326 datacenters.
327 """
328 wim_id = self.find_common_wim(datacenter_ids, tenant)
329 return self.persist.get_wim_account_by(wim_id, tenant)
330
331 def derive_wan_link(self,
332 wim_usage,
333 instance_scenario_id, sce_net_id,
334 networks, tenant, related=None):
335 """Create a instance_wim_nets record for the given information"""
336 if sce_net_id in wim_usage:
337 account_id = wim_usage[sce_net_id]
338 account = self.persist.get_wim_account_by(uuid=account_id)
339 wim_id = account['wim_id']
340 else:
341 datacenters = [n['datacenter_id'] for n in networks]
342 wim_id = self.find_common_wim(datacenters, tenant)
343 account = self.persist.get_wim_account_by(wim_id, tenant)
344
345 return {
346 'uuid': str(uuid4()),
347 'instance_scenario_id': instance_scenario_id,
348 'sce_net_id': sce_net_id,
349 'wim_id': wim_id,
350 'wim_account_id': account['uuid'],
351 related: related
352 }
353
354 def derive_wan_links(self, wim_usage, networks, tenant=None):
355 """Discover and return what are the wan_links that have to be created
356 considering a set of networks (VLDs) required for a scenario instance
357 (NSR).
358
359 Arguments:
360 wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
361 networks(list): Dicts containing the information about the networks
362 that will be instantiated to materialize a Network Service
363 (scenario) instance.
364 Corresponding to the ``instance_net`` record.
365
366 Returns:
367 list: list of WAN links to be written to the database
368 """
369 # Group networks by key=(instance_scenario_id, sce_net_id)
370 related = None
371 if networks:
372 related = networks[0].get("related")
373 filtered = _filter_multi_vim(networks)
374 grouped_networks = _group_networks(filtered)
375 datacenters_per_group = _count_datacenters(grouped_networks)
376 # For each group count the number of networks. If greater then 1,
377 # we have to create a wan link connecting them.
378 wan_groups = [key
379 for key, counter in datacenters_per_group
380 if counter > 1]
381 # Keys are tuples(instance_scenario_id, sce_net_id)
382 return [
383 self.derive_wan_link(wim_usage,
384 key[0], key[1], grouped_networks[key], tenant, related)
385 for key in wan_groups if wim_usage.get(key[1]) is not False
386 ]
387
388 def create_action(self, wan_link):
389 """For a single wan_link create the corresponding create action"""
390 return {
391 'action': 'CREATE',
392 'status': 'SCHEDULED',
393 'item': 'instance_wim_nets',
394 'item_id': wan_link['uuid'],
395 'wim_account_id': wan_link['wim_account_id']
396 }
397
398 def create_actions(self, wan_links):
399 """For an array of wan_links, create all the corresponding actions"""
400 return [self.create_action(l) for l in wan_links]
401
402 def delete_action(self, wan_link):
403 """For a single wan_link create the corresponding create action"""
404 return {
405 'action': 'DELETE',
406 'status': 'SCHEDULED',
407 'item': 'instance_wim_nets',
408 'item_id': wan_link['uuid'],
409 'wim_account_id': wan_link['wim_account_id'],
410 'extra': json.dumps({'wan_link': wan_link})
411 # We serialize and cache the wan_link here, because it can be
412 # deleted during the delete process
413 }
414
415 def delete_actions(self, wan_links=(), instance_scenario_id=None):
416 """Given a Instance Scenario, remove all the WAN Links created in the
417 past"""
418 if instance_scenario_id:
419 wan_links = self.persist.get_wan_links(
420 instance_scenario_id=instance_scenario_id)
421 return [self.delete_action(l) for l in wan_links]
422
423 def incorporate_actions(self, wim_actions, instance_action):
424 """Make the instance action consider new WIM actions and make the WIM
425 actions aware of the instance action
426 """
427 current = instance_action.setdefault('number_tasks', 0)
428 for i, action in enumerate(wim_actions):
429 action['task_index'] = current + i
430 action['instance_action_id'] = instance_action['uuid']
431 instance_action['number_tasks'] += len(wim_actions)
432
433 return wim_actions, instance_action
434
435 def dispatch(self, tasks):
436 """Enqueue a list of tasks for further processing.
437
438 This function is supposed to be called outside from the WIM Thread.
439 """
440 for task in tasks:
441 if task['wim_account_id'] not in self.threads:
442 error_msg = str(WimAccountNotActive(
443 'Requests send to the WIM Account %s are not currently '
444 'being processed.', task['wim_account_id']))
445 Action(task, self.logger).fail(self.persist, error_msg)
446 self.persist.update_wan_link(task['item_id'],
447 {'status': 'ERROR',
448 'error_msg': error_msg})
449 self.logger.error('Task %s %s %s not dispatched.\n%s',
450 task['action'], task['item'],
451 task['instance_account_id'], error_msg)
452 else:
453 self.threads[task['wim_account_id']].insert_task(task)
454 self.logger.debug('Task %s %s %s dispatched',
455 task['action'], task['item'],
456 task['instance_action_id'])
457
458 def _spawn_thread(self, wim_account):
459 """Spawn a WIM thread
460
461 Arguments:
462 wim_account (dict): WIM information (usually persisted)
463 The `wim` field is required to be set with a valid WIM record
464 inside the `wim_account` dict
465
466 Return:
467 threading.Thread: Thread object
468 """
469 thread = None
470 try:
471 thread = WimThread(self.persist, wim_account, ovim=self.ovim)
472 self.threads[wim_account['uuid']] = thread
473 thread.start()
474 except: # noqa
475 self.logger.error('Error when spawning WIM thread for %s',
476 wim_account['uuid'], exc_info=True)
477
478 return thread
479
480 def start_threads(self):
481 """Start the threads responsible for processing WIM Actions"""
482 accounts = self.persist.get_wim_accounts(error_if_none=False)
483 self.threads = remove_none_items(
484 {a['uuid']: self._spawn_thread(a) for a in accounts})
485
486 def stop_threads(self):
487 """Stop the threads responsible for processing WIM Actions"""
488 for uuid, thread in self.threads.items():
489 thread.exit()
490 del self.threads[uuid]
491
492 @contextmanager
493 def threads_running(self):
494 """Ensure no thread will be left running"""
495 # This method is particularly important for testing :)
496 try:
497 self.start_threads()
498 yield
499 finally:
500 self.stop_threads()
501
502
503 def _filter_multi_vim(networks):
504 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
505 return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
506
507
508 def _group_networks(networks):
509 """Group networks that correspond to the same instance_scenario_id and
510 sce_net_id (NSR and VLD).
511
512 Arguments:
513 networks(list): Dicts containing the information about the networks
514 that will be instantiated to materialize a Network Service
515 (scenario) instance.
516 Returns:
517 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
518 are list of networks.
519 """
520 criteria = itemgetter('instance_scenario_id', 'sce_net_id')
521
522 networks = sorted(networks, key=criteria)
523 return {k: list(v) for k, v in groupby(networks, key=criteria)}
524
525
526 def _count_datacenters(grouped_networks):
527 """Count the number of datacenters in each group of networks
528
529 Returns:
530 list of tuples: the first element is the group key, while the second
531 element is the number of datacenters in each group.
532 """
533 return ((key, len(set(n['datacenter_id'] for n in group)))
534 for key, group in grouped_networks.items())