feature 8029 change RO to python3. Using vim plugins
[osm/RO.git] / RO / osm_ro / wim / engine.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
37 environment.
38
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
42
43 No http request handling/direct interaction with the database should be present
44 in this file.
45 """
46 import json
47 import logging
48 from contextlib import contextmanager
49 from itertools import groupby
50 from operator import itemgetter
51 from sys import exc_info
52 from uuid import uuid4
53
54 from ..utils import remove_none_items
55 from .actions import Action
56 from .errors import (
57 DbBaseException,
58 NoWimConnectedToDatacenters,
59 UnexpectedDatabaseError,
60 WimAccountNotActive
61 )
62 from .wim_thread import WimThread
63
64
65 class WimEngine(object):
66 """Logic supporting the establishment of WAN links when NS spans across
67 different datacenters.
68 """
69 def __init__(self, persistence, logger=None, ovim=None):
70 self.persist = persistence
71 self.logger = logger or logging.getLogger('openmano.wim.engine')
72 self.threads = {}
73 self.connectors = {}
74 self.ovim = ovim
75
76 def create_wim(self, properties):
77 """Create a new wim record according to the properties
78
79 Please check the wim schema to have more information about
80 ``properties``.
81
82 The ``config`` property might contain a ``wim_port_mapping`` dict,
83 In this case, the method ``create_wim_port_mappings`` will be
84 automatically invoked.
85
86 Returns:
87 str: uuid of the newly created WIM record
88 """
89 port_mapping = ((properties.get('config', {}) or {})
90 .pop('wim_port_mapping', {}))
91 uuid = self.persist.create_wim(properties)
92
93 if port_mapping:
94 try:
95 self.create_wim_port_mappings(uuid, port_mapping)
96 except DbBaseException as e:
97 # Rollback
98 self.delete_wim(uuid)
99 ex = UnexpectedDatabaseError('Failed to create port mappings'
100 'Rolling back wim creation')
101 self.logger.exception(str(ex))
102 raise ex from e
103
104 return uuid
105
106 def get_wim(self, uuid_or_name, tenant_id=None):
107 """Retrieve existing WIM record by name or id.
108
109 If ``tenant_id`` is specified, the query will be
110 limited to the WIM associated to the given tenant.
111 """
112 # Since it is a pure DB operation, we can delegate it directly
113 return self.persist.get_wim(uuid_or_name, tenant_id)
114
115 def update_wim(self, uuid_or_name, properties):
116 """Edit an existing WIM record.
117
118 ``properties`` is a dictionary with the properties being changed,
119 if a property is not present, the old value will be preserved
120
121 Similarly to create_wim, the ``config`` property might contain a
122 ``wim_port_mapping`` dict, In this case, port mappings will be
123 automatically updated.
124 """
125 port_mapping = ((properties.get('config', {}) or {})
126 .pop('wim_port_mapping', {}))
127 orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
128 uuid = orig_props['uuid']
129
130 response = self.persist.update_wim(uuid, properties)
131
132 if port_mapping:
133 try:
134 # It is very complex to diff and update individually all the
135 # port mappings. Therefore a practical approach is just delete
136 # and create it again.
137 self.persist.delete_wim_port_mappings(uuid)
138 # ^ Calling from persistence avoid reloading twice the thread
139 self.create_wim_port_mappings(uuid, port_mapping)
140 except DbBaseException as e:
141 # Rollback
142 self.update_wim(uuid_or_name, orig_props)
143 ex = UnexpectedDatabaseError('Failed to update port mappings'
144 'Rolling back wim updates\n')
145 self.logger.exception(str(ex))
146 raise ex from e
147
148 return response
149
150 def delete_wim(self, uuid_or_name):
151 """Kill the corresponding wim threads and erase the WIM record"""
152 # Theoretically, we can rely on the database to drop the wim_accounts
153 # automatically, since we have configures 'ON CASCADE DELETE'.
154 # However, use use `delete_wim_accounts` to kill all the running
155 # threads.
156 self.delete_wim_accounts(uuid_or_name)
157 return self.persist.delete_wim(uuid_or_name)
158
159 def create_wim_account(self, wim, tenant, properties):
160 """Create an account that associates a tenant to a WIM.
161
162 As a side effect this function will spawn a new thread
163
164 Arguments:
165 wim (str): name or uuid of the WIM related to the account being
166 created
167 tenant (str): name or uuid of the nfvo tenant to which the account
168 will be created
169 properties (dict): properties of the account
170 (eg. username, password, ...)
171
172 Returns:
173 dict: Created record
174 """
175 uuid = self.persist.create_wim_account(wim, tenant, properties)
176 account = self.persist.get_wim_account_by(uuid=uuid)
177 # ^ We need to use get_wim_account_by here, since this methods returns
178 # all the associations, and we need the wim to create the thread
179 self._spawn_thread(account)
180 return account
181
182 def _update_single_wim_account(self, account, properties):
183 """Update WIM Account, taking care to reload the corresponding thread
184
185 Arguments:
186 account (dict): Current account record
187 properties (dict): Properties to be updated
188
189 Returns:
190 dict: updated record
191 """
192 account = self.persist.update_wim_account(account['uuid'], properties)
193 self.threads[account['uuid']].reload()
194 return account
195
196 def update_wim_accounts(self, wim, tenant, properties):
197 """Update all the accounts related to a WIM and a tenant,
198 thanking care of reloading threads.
199
200 Arguments:
201 wim (str): uuid or name of a WIM record
202 tenant (str): uuid or name of a NFVO tenant record
203 properties (dict): attributes with values to be updated
204
205 Returns
206 list: Records that were updated
207 """
208 accounts = self.persist.get_wim_accounts_by(wim, tenant)
209 return [self._update_single_wim_account(account, properties)
210 for account in accounts]
211
212 def _delete_single_wim_account(self, account):
213 """Delete WIM Account, taking care to remove the corresponding thread
214 and delete the internal WIM account, if it was automatically generated.
215
216 Arguments:
217 account (dict): Current account record
218 properties (dict): Properties to be updated
219
220 Returns:
221 dict: current record (same as input)
222 """
223 self.persist.delete_wim_account(account['uuid'])
224
225 if account['uuid'] not in self.threads:
226 raise WimAccountNotActive(
227 'Requests send to the WIM Account %s are not currently '
228 'being processed.', account['uuid'])
229 else:
230 self.threads[account['uuid']].exit()
231 del self.threads[account['uuid']]
232
233 return account
234
235 def delete_wim_accounts(self, wim, tenant=None, **kwargs):
236 """Delete all the accounts related to a WIM (and a tenant),
237 thanking care of threads and internal WIM accounts.
238
239 Arguments:
240 wim (str): uuid or name of a WIM record
241 tenant (str): uuid or name of a NFVO tenant record
242
243 Returns
244 list: Records that were deleted
245 """
246 kwargs.setdefault('error_if_none', False)
247 accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
248 return [self._delete_single_wim_account(a) for a in accounts]
249
250 def _reload_wim_threads(self, wim_id):
251 for thread in self.threads.values():
252 if thread.wim_account['wim_id'] == wim_id:
253 thread.reload()
254
255 def create_wim_port_mappings(self, wim, properties, tenant=None):
256 """Store information about port mappings from Database"""
257 # TODO: Review tenants... WIMs can exist across different tenants,
258 # and the port_mappings are a WIM property, not a wim_account
259 # property, so the concepts are not related
260 wim = self.persist.get_by_name_or_uuid('wims', wim)
261 result = self.persist.create_wim_port_mappings(wim, properties, tenant)
262 self._reload_wim_threads(wim['uuid'])
263 return result
264
265 def get_wim_port_mappings(self, wim):
266 """Retrive information about port mappings from Database"""
267 return self.persist.get_wim_port_mappings(wim)
268
269 def delete_wim_port_mappings(self, wim):
270 """Erase the port mapping records associated with the WIM"""
271 wim = self.persist.get_by_name_or_uuid('wims', wim)
272 message = self.persist.delete_wim_port_mappings(wim['uuid'])
273 self._reload_wim_threads(wim['uuid'])
274 return message
275
276 def find_common_wims(self, datacenter_ids, tenant):
277 """Find WIMs that are common to all datacenters listed"""
278 mappings = self.persist.get_wim_port_mappings(
279 datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
280
281 wim_id_of = itemgetter('wim_id')
282 sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
283 grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
284 mapped_datacenters = {
285 wim_id: [m['datacenter_id'] for m in mappings]
286 for wim_id, mappings in grouped_mappings
287 }
288
289 return [
290 wim_id
291 for wim_id, connected_datacenters in mapped_datacenters.items()
292 if set(connected_datacenters) >= set(datacenter_ids)
293 ]
294
295 def find_common_wim(self, datacenter_ids, tenant):
296 """Find a single WIM that is able to connect all the datacenters
297 listed
298
299 Raises:
300 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
301 at once is found
302 """
303 suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
304
305 if not suitable_wim_ids:
306 raise NoWimConnectedToDatacenters(datacenter_ids)
307
308 # TODO: use a criteria to determine which WIM is going to be used,
309 # instead of always using the first one (strategy pattern can be
310 # used here)
311 return suitable_wim_ids[0]
312
313 def find_suitable_wim_account(self, datacenter_ids, tenant):
314 """Find a WIM account that is able to connect all the datacenters
315 listed
316
317 Arguments:
318 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
319 that need to be connected.
320 tenant (str): UUID of the OSM tenant
321
322 Returns:
323 object with the WIM account that is able to connect all the
324 datacenters.
325 """
326 wim_id = self.find_common_wim(datacenter_ids, tenant)
327 return self.persist.get_wim_account_by(wim_id, tenant)
328
329 def derive_wan_link(self,
330 wim_usage,
331 instance_scenario_id, sce_net_id,
332 networks, tenant, related=None):
333 """Create a instance_wim_nets record for the given information"""
334 if sce_net_id in wim_usage:
335 account_id = wim_usage[sce_net_id]
336 account = self.persist.get_wim_account_by(uuid=account_id)
337 wim_id = account['wim_id']
338 else:
339 datacenters = [n['datacenter_id'] for n in networks]
340 wim_id = self.find_common_wim(datacenters, tenant)
341 account = self.persist.get_wim_account_by(wim_id, tenant)
342
343 return {
344 'uuid': str(uuid4()),
345 'instance_scenario_id': instance_scenario_id,
346 'sce_net_id': sce_net_id,
347 'wim_id': wim_id,
348 'wim_account_id': account['uuid'],
349 'related': related
350 }
351
352 def derive_wan_links(self, wim_usage, networks, tenant=None):
353 """Discover and return what are the wan_links that have to be created
354 considering a set of networks (VLDs) required for a scenario instance
355 (NSR).
356
357 Arguments:
358 wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
359 networks(list): Dicts containing the information about the networks
360 that will be instantiated to materialize a Network Service
361 (scenario) instance.
362 Corresponding to the ``instance_net`` record.
363
364 Returns:
365 list: list of WAN links to be written to the database
366 """
367 # Group networks by key=(instance_scenario_id, sce_net_id)
368 related = None
369 if networks:
370 related = networks[0].get("related")
371 filtered = _filter_multi_vim(networks)
372 grouped_networks = _group_networks(filtered)
373 datacenters_per_group = _count_datacenters(grouped_networks)
374 # For each group count the number of networks. If greater then 1,
375 # we have to create a wan link connecting them.
376 wan_groups = [key
377 for key, counter in datacenters_per_group
378 if counter > 1]
379 # Keys are tuples(instance_scenario_id, sce_net_id)
380 return [
381 self.derive_wan_link(wim_usage,
382 key[0], key[1], grouped_networks[key], tenant, related)
383 for key in wan_groups if wim_usage.get(key[1]) is not False
384 ]
385
386 def create_action(self, wan_link):
387 """For a single wan_link create the corresponding create action"""
388 return {
389 'action': 'CREATE',
390 'status': 'SCHEDULED',
391 'item': 'instance_wim_nets',
392 'item_id': wan_link['uuid'],
393 'wim_account_id': wan_link['wim_account_id']
394 }
395
396 def create_actions(self, wan_links):
397 """For an array of wan_links, create all the corresponding actions"""
398 return [self.create_action(l) for l in wan_links]
399
400 def delete_action(self, wan_link):
401 """For a single wan_link create the corresponding create action"""
402 return {
403 'action': 'DELETE',
404 'status': 'SCHEDULED',
405 'item': 'instance_wim_nets',
406 'item_id': wan_link['uuid'],
407 'wim_account_id': wan_link['wim_account_id'],
408 'extra': json.dumps({'wan_link': wan_link})
409 # We serialize and cache the wan_link here, because it can be
410 # deleted during the delete process
411 }
412
413 def delete_actions(self, wan_links=(), instance_scenario_id=None):
414 """Given a Instance Scenario, remove all the WAN Links created in the
415 past"""
416 if instance_scenario_id:
417 wan_links = self.persist.get_wan_links(
418 instance_scenario_id=instance_scenario_id)
419 return [self.delete_action(l) for l in wan_links]
420
421 def incorporate_actions(self, wim_actions, instance_action):
422 """Make the instance action consider new WIM actions and make the WIM
423 actions aware of the instance action
424 """
425 current = instance_action.setdefault('number_tasks', 0)
426 for i, action in enumerate(wim_actions):
427 action['task_index'] = current + i
428 action['instance_action_id'] = instance_action['uuid']
429 instance_action['number_tasks'] += len(wim_actions)
430
431 return wim_actions, instance_action
432
433 def dispatch(self, tasks):
434 """Enqueue a list of tasks for further processing.
435
436 This function is supposed to be called outside from the WIM Thread.
437 """
438 for task in tasks:
439 if task['wim_account_id'] not in self.threads:
440 error_msg = str(WimAccountNotActive(
441 'Requests send to the WIM Account %s are not currently '
442 'being processed.', task['wim_account_id']))
443 Action(task, self.logger).fail(self.persist, error_msg)
444 self.persist.update_wan_link(task['item_id'],
445 {'status': 'ERROR',
446 'error_msg': error_msg})
447 self.logger.error('Task %s %s %s not dispatched.\n%s',
448 task['action'], task['item'],
449 task['instance_account_id'], error_msg)
450 else:
451 self.threads[task['wim_account_id']].insert_task(task)
452 self.logger.debug('Task %s %s %s dispatched',
453 task['action'], task['item'],
454 task['instance_action_id'])
455
456 def _spawn_thread(self, wim_account):
457 """Spawn a WIM thread
458
459 Arguments:
460 wim_account (dict): WIM information (usually persisted)
461 The `wim` field is required to be set with a valid WIM record
462 inside the `wim_account` dict
463
464 Return:
465 threading.Thread: Thread object
466 """
467 thread = None
468 try:
469 thread = WimThread(self.persist, wim_account, ovim=self.ovim)
470 self.threads[wim_account['uuid']] = thread
471 thread.start()
472 except: # noqa
473 self.logger.error('Error when spawning WIM thread for %s',
474 wim_account['uuid'], exc_info=True)
475
476 return thread
477
478 def start_threads(self):
479 """Start the threads responsible for processing WIM Actions"""
480 accounts = self.persist.get_wim_accounts(error_if_none=False)
481 self.threads = remove_none_items(
482 {a['uuid']: self._spawn_thread(a) for a in accounts})
483
484 def stop_threads(self):
485 """Stop the threads responsible for processing WIM Actions"""
486 for uuid, thread in self.threads.items():
487 thread.exit()
488 self.threads.clear()
489
490 @contextmanager
491 def threads_running(self):
492 """Ensure no thread will be left running"""
493 # This method is particularly important for testing :)
494 try:
495 self.start_threads()
496 yield
497 finally:
498 self.stop_threads()
499
500
501 def _filter_multi_vim(networks):
502 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
503 return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
504
505
506 def _group_networks(networks):
507 """Group networks that correspond to the same instance_scenario_id and
508 sce_net_id (NSR and VLD).
509
510 Arguments:
511 networks(list): Dicts containing the information about the networks
512 that will be instantiated to materialize a Network Service
513 (scenario) instance.
514 Returns:
515 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
516 are list of networks.
517 """
518 criteria = itemgetter('instance_scenario_id', 'sce_net_id')
519
520 networks = sorted(networks, key=criteria)
521 return {k: list(v) for k, v in groupby(networks, key=criteria)}
522
523
524 def _count_datacenters(grouped_networks):
525 """Count the number of datacenters in each group of networks
526
527 Returns:
528 list of tuples: the first element is the group key, while the second
529 element is the number of datacenters in each group.
530 """
531 return ((key, len(set(n['datacenter_id'] for n in group)))
532 for key, group in grouped_networks.items())