Adding cover to tox.ini default envs
[osm/RO.git] / RO / osm_ro / wim / engine.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
37 environment.
38
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
42
43 No http request handling/direct interaction with the database should be present
44 in this file.
45 """
46 import json
47 import logging
48 from contextlib import contextmanager
49 from itertools import groupby
50 from operator import itemgetter
51 # from sys import exc_info
52 from uuid import uuid4
53
54 from ..utils import remove_none_items
55 from .actions import Action
56 from .errors import (
57 DbBaseException,
58 NoWimConnectedToDatacenters,
59 UnexpectedDatabaseError,
60 WimAccountNotActive,
61 UndefinedWimConnector
62 )
63 from .wim_thread import WimThread
64 # from ..http_tools.errors import Bad_Request
65 from pkg_resources import iter_entry_points
66
67
68 class WimEngine(object):
69 """Logic supporting the establishment of WAN links when NS spans across
70 different datacenters.
71 """
72 def __init__(self, persistence, plugins, logger=None, ovim=None):
73 self.persist = persistence
74 self.plugins = plugins if plugins is not None else {}
75 self.logger = logger or logging.getLogger('openmano.wim.engine')
76 self.threads = {}
77 self.connectors = {}
78 self.ovim = ovim
79
80 def _load_plugin(self, name, type="sdn"):
81 # type can be vim or sdn
82 for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
83 self.plugins[name] = v.load()
84 if name and name not in self.plugins:
85 raise UndefinedWimConnector(type, name)
86
87 def create_wim(self, properties):
88 """Create a new wim record according to the properties
89
90 Please check the wim schema to have more information about
91 ``properties``.
92
93 The ``config`` property might contain a ``wim_port_mapping`` dict,
94 In this case, the method ``create_wim_port_mappings`` will be
95 automatically invoked.
96
97 Returns:
98 str: uuid of the newly created WIM record
99 """
100 port_mapping = ((properties.get('config', {}) or {})
101 .pop('wim_port_mapping', {}))
102 plugin_name = "rosdn_" + properties["type"]
103 if plugin_name not in self.plugins:
104 self._load_plugin(plugin_name, type="sdn")
105
106 uuid = self.persist.create_wim(properties)
107
108 if port_mapping:
109 try:
110 self.create_wim_port_mappings(uuid, port_mapping)
111 except DbBaseException as e:
112 # Rollback
113 self.delete_wim(uuid)
114 ex = UnexpectedDatabaseError('Failed to create port mappings'
115 'Rolling back wim creation')
116 self.logger.exception(str(ex))
117 raise ex from e
118
119 return uuid
120
121 def get_wim(self, uuid_or_name, tenant_id=None):
122 """Retrieve existing WIM record by name or id.
123
124 If ``tenant_id`` is specified, the query will be
125 limited to the WIM associated to the given tenant.
126 """
127 # Since it is a pure DB operation, we can delegate it directly
128 return self.persist.get_wim(uuid_or_name, tenant_id)
129
130 def update_wim(self, uuid_or_name, properties):
131 """Edit an existing WIM record.
132
133 ``properties`` is a dictionary with the properties being changed,
134 if a property is not present, the old value will be preserved
135
136 Similarly to create_wim, the ``config`` property might contain a
137 ``wim_port_mapping`` dict, In this case, port mappings will be
138 automatically updated.
139 """
140 port_mapping = ((properties.get('config', {}) or {})
141 .pop('wim_port_mapping', {}))
142 orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
143 uuid = orig_props['uuid']
144
145 response = self.persist.update_wim(uuid, properties)
146
147 if port_mapping:
148 try:
149 # It is very complex to diff and update individually all the
150 # port mappings. Therefore a practical approach is just delete
151 # and create it again.
152 self.persist.delete_wim_port_mappings(uuid)
153 # ^ Calling from persistence avoid reloading twice the thread
154 self.create_wim_port_mappings(uuid, port_mapping)
155 except DbBaseException as e:
156 # Rollback
157 self.update_wim(uuid_or_name, orig_props)
158 ex = UnexpectedDatabaseError('Failed to update port mappings'
159 'Rolling back wim updates\n')
160 self.logger.exception(str(ex))
161 raise ex from e
162
163 return response
164
165 def delete_wim(self, uuid_or_name):
166 """Kill the corresponding wim threads and erase the WIM record"""
167 # Theoretically, we can rely on the database to drop the wim_accounts
168 # automatically, since we have configures 'ON CASCADE DELETE'.
169 # However, use use `delete_wim_accounts` to kill all the running
170 # threads.
171 self.delete_wim_accounts(uuid_or_name)
172 return self.persist.delete_wim(uuid_or_name)
173
174 def create_wim_account(self, wim, tenant, properties):
175 """Create an account that associates a tenant to a WIM.
176
177 As a side effect this function will spawn a new thread
178
179 Arguments:
180 wim (str): name or uuid of the WIM related to the account being
181 created
182 tenant (str): name or uuid of the nfvo tenant to which the account
183 will be created
184 properties (dict): properties of the account
185 (eg. username, password, ...)
186
187 Returns:
188 dict: Created record
189 """
190 uuid = self.persist.create_wim_account(wim, tenant, properties)
191 account = self.persist.get_wim_account_by(uuid=uuid)
192 # ^ We need to use get_wim_account_by here, since this methods returns
193 # all the associations, and we need the wim to create the thread
194 self._spawn_thread(account)
195 return account
196
197 def _update_single_wim_account(self, account, properties):
198 """Update WIM Account, taking care to reload the corresponding thread
199
200 Arguments:
201 account (dict): Current account record
202 properties (dict): Properties to be updated
203
204 Returns:
205 dict: updated record
206 """
207 account = self.persist.update_wim_account(account['uuid'], properties)
208 self.threads[account['uuid']].reload()
209 return account
210
211 def update_wim_accounts(self, wim, tenant, properties):
212 """Update all the accounts related to a WIM and a tenant,
213 thanking care of reloading threads.
214
215 Arguments:
216 wim (str): uuid or name of a WIM record
217 tenant (str): uuid or name of a NFVO tenant record
218 properties (dict): attributes with values to be updated
219
220 Returns
221 list: Records that were updated
222 """
223 accounts = self.persist.get_wim_accounts_by(wim, tenant)
224 return [self._update_single_wim_account(account, properties)
225 for account in accounts]
226
227 def _delete_single_wim_account(self, account):
228 """Delete WIM Account, taking care to remove the corresponding thread
229 and delete the internal WIM account, if it was automatically generated.
230
231 Arguments:
232 account (dict): Current account record
233 properties (dict): Properties to be updated
234
235 Returns:
236 dict: current record (same as input)
237 """
238 self.persist.delete_wim_account(account['uuid'])
239
240 if account['uuid'] not in self.threads:
241 raise WimAccountNotActive(
242 'Requests send to the WIM Account %s are not currently '
243 'being processed.', account['uuid'])
244 else:
245 self.threads[account['uuid']].exit()
246 del self.threads[account['uuid']]
247
248 return account
249
250 def delete_wim_accounts(self, wim, tenant=None, **kwargs):
251 """Delete all the accounts related to a WIM (and a tenant),
252 thanking care of threads and internal WIM accounts.
253
254 Arguments:
255 wim (str): uuid or name of a WIM record
256 tenant (str): uuid or name of a NFVO tenant record
257
258 Returns
259 list: Records that were deleted
260 """
261 kwargs.setdefault('error_if_none', False)
262 accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
263 return [self._delete_single_wim_account(a) for a in accounts]
264
265 def _reload_wim_threads(self, wim_id):
266 for thread in self.threads.values():
267 if thread.wim_account['wim_id'] == wim_id:
268 thread.reload()
269
270 def create_wim_port_mappings(self, wim, properties, tenant=None):
271 """Store information about port mappings from Database"""
272 # TODO: Review tenants... WIMs can exist across different tenants,
273 # and the port_mappings are a WIM property, not a wim_account
274 # property, so the concepts are not related
275 wim = self.persist.get_by_name_or_uuid('wims', wim)
276 result = self.persist.create_wim_port_mappings(wim, properties, tenant)
277 self._reload_wim_threads(wim['uuid'])
278 return result
279
280 def get_wim_port_mappings(self, wim):
281 """Retrive information about port mappings from Database"""
282 return self.persist.get_wim_port_mappings(wim)
283
284 def delete_wim_port_mappings(self, wim):
285 """Erase the port mapping records associated with the WIM"""
286 wim = self.persist.get_by_name_or_uuid('wims', wim)
287 message = self.persist.delete_wim_port_mappings(wim['uuid'])
288 self._reload_wim_threads(wim['uuid'])
289 return message
290
291 def find_common_wims(self, datacenter_ids, tenant):
292 """Find WIMs that are common to all datacenters listed"""
293 mappings = self.persist.get_wim_port_mappings(
294 datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
295
296 wim_id_of = itemgetter('wim_id')
297 sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
298 grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
299 mapped_datacenters = {
300 wim_id: [m['datacenter_id'] for m in mappings]
301 for wim_id, mappings in grouped_mappings
302 }
303
304 return [
305 wim_id
306 for wim_id, connected_datacenters in mapped_datacenters.items()
307 if set(connected_datacenters) >= set(datacenter_ids)
308 ]
309
310 def find_common_wim(self, datacenter_ids, tenant):
311 """Find a single WIM that is able to connect all the datacenters
312 listed
313
314 Raises:
315 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
316 at once is found
317 """
318 suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
319
320 if not suitable_wim_ids:
321 raise NoWimConnectedToDatacenters(datacenter_ids)
322
323 # TODO: use a criteria to determine which WIM is going to be used,
324 # instead of always using the first one (strategy pattern can be
325 # used here)
326 return suitable_wim_ids[0]
327
328 def find_suitable_wim_account(self, datacenter_ids, tenant):
329 """Find a WIM account that is able to connect all the datacenters
330 listed
331
332 Arguments:
333 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
334 that need to be connected.
335 tenant (str): UUID of the OSM tenant
336
337 Returns:
338 object with the WIM account that is able to connect all the
339 datacenters.
340 """
341 wim_id = self.find_common_wim(datacenter_ids, tenant)
342 return self.persist.get_wim_account_by(wim_id, tenant)
343
344 def derive_wan_link(self,
345 wim_usage,
346 instance_scenario_id, sce_net_id,
347 networks, tenant, related=None):
348 """Create a instance_wim_nets record for the given information"""
349 if sce_net_id in wim_usage:
350 account_id = wim_usage[sce_net_id]
351 account = self.persist.get_wim_account_by(uuid=account_id)
352 wim_id = account['wim_id']
353 else:
354 datacenters = [n['datacenter_id'] for n in networks]
355 wim_id = self.find_common_wim(datacenters, tenant)
356 account = self.persist.get_wim_account_by(wim_id, tenant)
357
358 return {
359 'uuid': str(uuid4()),
360 'instance_scenario_id': instance_scenario_id,
361 'sce_net_id': sce_net_id,
362 'wim_id': wim_id,
363 'wim_account_id': account['uuid'],
364 'related': related
365 }
366
367 def derive_wan_links(self, wim_usage, networks, tenant=None):
368 """Discover and return what are the wan_links that have to be created
369 considering a set of networks (VLDs) required for a scenario instance
370 (NSR).
371
372 Arguments:
373 wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
374 networks(list): Dicts containing the information about the networks
375 that will be instantiated to materialize a Network Service
376 (scenario) instance.
377 Corresponding to the ``instance_net`` record.
378
379 Returns:
380 list: list of WAN links to be written to the database
381 """
382 # Group networks by key=(instance_scenario_id, sce_net_id)
383 related = None
384 if networks:
385 related = networks[0].get("related")
386 filtered = _filter_multi_vim(networks)
387 grouped_networks = _group_networks(filtered)
388 datacenters_per_group = _count_datacenters(grouped_networks)
389 # For each group count the number of networks. If greater then 1,
390 # we have to create a wan link connecting them.
391 wan_groups = [key
392 for key, counter in datacenters_per_group
393 if counter > 1]
394 # Keys are tuples(instance_scenario_id, sce_net_id)
395 return [
396 self.derive_wan_link(wim_usage,
397 key[0], key[1], grouped_networks[key], tenant, related)
398 for key in wan_groups if wim_usage.get(key[1]) is not False
399 ]
400
401 def create_action(self, wan_link):
402 """For a single wan_link create the corresponding create action"""
403 return {
404 'action': 'CREATE',
405 'status': 'SCHEDULED',
406 'item': 'instance_wim_nets',
407 'item_id': wan_link['uuid'],
408 'wim_account_id': wan_link['wim_account_id']
409 }
410
411 def create_actions(self, wan_links):
412 """For an array of wan_links, create all the corresponding actions"""
413 return [self.create_action(li) for li in wan_links]
414
415 def delete_action(self, wan_link):
416 """For a single wan_link create the corresponding create action"""
417 return {
418 'action': 'DELETE',
419 'status': 'SCHEDULED',
420 'item': 'instance_wim_nets',
421 'item_id': wan_link['uuid'],
422 'wim_account_id': wan_link['wim_account_id'],
423 'extra': json.dumps({'wan_link': wan_link})
424 # We serialize and cache the wan_link here, because it can be
425 # deleted during the delete process
426 }
427
428 def delete_actions(self, wan_links=(), instance_scenario_id=None):
429 """Given a Instance Scenario, remove all the WAN Links created in the
430 past"""
431 if instance_scenario_id:
432 wan_links = self.persist.get_wan_links(
433 instance_scenario_id=instance_scenario_id, sdn='false')
434 return [self.delete_action(li) for li in wan_links]
435
436 def incorporate_actions(self, wim_actions, instance_action):
437 """Make the instance action consider new WIM actions and make the WIM
438 actions aware of the instance action
439 """
440 current = instance_action.setdefault('number_tasks', 0)
441 for i, action in enumerate(wim_actions):
442 action['task_index'] = current + i
443 action['instance_action_id'] = instance_action['uuid']
444 instance_action['number_tasks'] += len(wim_actions)
445
446 return wim_actions, instance_action
447
448 def dispatch(self, tasks):
449 """Enqueue a list of tasks for further processing.
450
451 This function is supposed to be called outside from the WIM Thread.
452 """
453 for task in tasks:
454 if task['wim_account_id'] not in self.threads:
455 error_msg = str(WimAccountNotActive(
456 'Requests send to the WIM Account %s are not currently '
457 'being processed.', task['wim_account_id']))
458 Action(task, self.logger).fail(self.persist, error_msg)
459 self.persist.update_wan_link(task['item_id'],
460 {'status': 'ERROR',
461 'error_msg': error_msg})
462 self.logger.error('Task %s %s %s not dispatched.\n%s',
463 task['action'], task['item'],
464 task['instance_account_id'], error_msg)
465 else:
466 self.threads[task['wim_account_id']].insert_task(task)
467 self.logger.debug('Task %s %s %s dispatched',
468 task['action'], task['item'],
469 task['instance_action_id'])
470
471 def _spawn_thread(self, wim_account):
472 """Spawn a WIM thread
473
474 Arguments:
475 wim_account (dict): WIM information (usually persisted)
476 The `wim` field is required to be set with a valid WIM record
477 inside the `wim_account` dict
478
479 Return:
480 threading.Thread: Thread object
481 """
482 thread = None
483 try:
484 thread = WimThread(self.persist, self.plugins, wim_account, ovim=self.ovim)
485 self.threads[wim_account['uuid']] = thread
486 thread.start()
487 except: # noqa
488 self.logger.error('Error when spawning WIM thread for %s',
489 wim_account['uuid'], exc_info=True)
490
491 return thread
492
493 def start_threads(self):
494 """Start the threads responsible for processing WIM Actions"""
495 accounts = self.persist.get_wim_accounts(error_if_none=False)
496 thread_dict = {}
497 for account in accounts:
498 try:
499 plugin_name = "rosdn_" + account["wim"]["type"]
500 if plugin_name not in self.plugins:
501 self._load_plugin(plugin_name, type="sdn")
502 thread_dict[account["uuid"]] = self._spawn_thread(account)
503 except UndefinedWimConnector as e:
504 self.logger.error(e)
505 self.threads = remove_none_items(thread_dict)
506
507 def stop_threads(self):
508 """Stop the threads responsible for processing WIM Actions"""
509 for uuid, thread in self.threads.items():
510 thread.exit()
511 self.threads.clear()
512
513 @contextmanager
514 def threads_running(self):
515 """Ensure no thread will be left running"""
516 # This method is particularly important for testing :)
517 try:
518 self.start_threads()
519 yield
520 finally:
521 self.stop_threads()
522
523
524 def _filter_multi_vim(networks):
525 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
526 return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
527
528
529 def _group_networks(networks):
530 """Group networks that correspond to the same instance_scenario_id and
531 sce_net_id (NSR and VLD).
532
533 Arguments:
534 networks(list): Dicts containing the information about the networks
535 that will be instantiated to materialize a Network Service
536 (scenario) instance.
537 Returns:
538 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
539 are list of networks.
540 """
541 criteria = itemgetter('instance_scenario_id', 'sce_net_id')
542
543 networks = sorted(networks, key=criteria)
544 return {k: list(v) for k, v in groupby(networks, key=criteria)}
545
546
547 def _count_datacenters(grouped_networks):
548 """Count the number of datacenters in each group of networks
549
550 Returns:
551 list of tuples: the first element is the group key, while the second
552 element is the number of datacenters in each group.
553 """
554 return ((key, len(set(n['datacenter_id'] for n in group)))
555 for key, group in grouped_networks.items())