fcb3477fbae7c2d33b60d5455c85cae4311d8120
[osm/RO.git] / osm_ro / wim / engine.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
37 environment.
38
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
42
43 No http request handling/direct interaction with the database should be present
44 in this file.
45 """
46 import json
47 import logging
48 from contextlib import contextmanager
49 from itertools import groupby
50 from operator import itemgetter
51 from uuid import uuid4
52
53 from ..utils import remove_none_items
54 from .actions import Action
55 from .errors import (
56 NoWimConnectedToDatacenters,
57 WimAccountNotActive
58 )
59 from .wim_thread import WimThread
60
61
62 class WimEngine(object):
63 """Logic supporting the establishment of WAN links when NS spans across
64 different datacenters.
65 """
66 def __init__(self, persistence, logger=None, ovim=None):
67 self.persist = persistence
68 self.logger = logger or logging.getLogger('openmano.wim.engine')
69 self.threads = {}
70 self.connectors = {}
71 self.ovim = ovim
72
73 def create_wim(self, properties):
74 """Create a new wim record according to the properties
75
76 Please check the wim schema to have more information about
77 ``properties``.
78
79 Returns:
80 str: uuid of the newly created WIM record
81 """
82 return self.persist.create_wim(properties)
83
84 def get_wim(self, uuid_or_name, tenant_id=None):
85 """Retrieve existing WIM record by name or id.
86
87 If ``tenant_id`` is specified, the query will be
88 limited to the WIM associated to the given tenant.
89 """
90 # Since it is a pure DB operation, we can delegate it directly
91 return self.persist.get_wim(uuid_or_name, tenant_id)
92
93 def update_wim(self, uuid_or_name, properties):
94 """Edit an existing WIM record.
95
96 ``properties`` is a dictionary with the properties being changed,
97 if a property is not present, the old value will be preserved
98 """
99 return self.persist.update_wim(uuid_or_name, properties)
100
101 def delete_wim(self, uuid_or_name):
102 """Kill the corresponding wim threads and erase the WIM record"""
103 # Theoretically, we can rely on the database to drop the wim_accounts
104 # automatically, since we have configures 'ON CASCADE DELETE'.
105 # However, use use `delete_wim_accounts` to kill all the running
106 # threads.
107 self.delete_wim_accounts(uuid_or_name)
108 return self.persist.delete_wim(uuid_or_name)
109
110 def create_wim_account(self, wim, tenant, properties):
111 """Create an account that associates a tenant to a WIM.
112
113 As a side effect this function will spawn a new thread
114
115 Arguments:
116 wim (str): name or uuid of the WIM related to the account being
117 created
118 tenant (str): name or uuid of the nfvo tenant to which the account
119 will be created
120 properties (dict): properties of the account
121 (eg. username, password, ...)
122
123 Returns:
124 dict: Created record
125 """
126 uuid = self.persist.create_wim_account(wim, tenant, properties)
127 account = self.persist.get_wim_account_by(uuid=uuid)
128 # ^ We need to use get_wim_account_by here, since this methods returns
129 # all the associations, and we need the wim to create the thread
130 self._spawn_thread(account)
131 return account
132
133 def _update_single_wim_account(self, account, properties):
134 """Update WIM Account, taking care to reload the corresponding thread
135
136 Arguments:
137 account (dict): Current account record
138 properties (dict): Properties to be updated
139
140 Returns:
141 dict: updated record
142 """
143 account = self.persist.update_wim_account(account['uuid'], properties)
144 self.threads[account['uuid']].reload()
145 return account
146
147 def update_wim_accounts(self, wim, tenant, properties):
148 """Update all the accounts related to a WIM and a tenant,
149 thanking care of reloading threads.
150
151 Arguments:
152 wim (str): uuid or name of a WIM record
153 tenant (str): uuid or name of a NFVO tenant record
154 properties (dict): attributes with values to be updated
155
156 Returns
157 list: Records that were updated
158 """
159 accounts = self.persist.get_wim_accounts_by(wim, tenant)
160 return [self._update_single_wim_account(account, properties)
161 for account in accounts]
162
163 def _delete_single_wim_account(self, account):
164 """Delete WIM Account, taking care to remove the corresponding thread
165 and delete the internal WIM account, if it was automatically generated.
166
167 Arguments:
168 account (dict): Current account record
169 properties (dict): Properties to be updated
170
171 Returns:
172 dict: current record (same as input)
173 """
174 self.persist.delete_wim_account(account['uuid'])
175
176 if account['uuid'] not in self.threads:
177 raise WimAccountNotActive(
178 'Requests send to the WIM Account %s are not currently '
179 'being processed.', account['uuid'])
180 else:
181 self.threads[account['uuid']].exit()
182 del self.threads[account['uuid']]
183
184 return account
185
186 def delete_wim_accounts(self, wim, tenant=None, **kwargs):
187 """Delete all the accounts related to a WIM (and a tenant),
188 thanking care of threads and internal WIM accounts.
189
190 Arguments:
191 wim (str): uuid or name of a WIM record
192 tenant (str): uuid or name of a NFVO tenant record
193
194 Returns
195 list: Records that were deleted
196 """
197 kwargs.setdefault('error_if_none', False)
198 accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
199 return [self._delete_single_wim_account(a) for a in accounts]
200
201 def _reload_wim_threads(self, wim_id):
202 for thread in self.threads.values():
203 if thread.wim_account['wim_id'] == wim_id:
204 thread.reload()
205
206 def create_wim_port_mappings(self, wim, properties, tenant=None):
207 """Store information about port mappings from Database"""
208 # TODO: Review tenants... WIMs can exist across different tenants,
209 # and the port_mappings are a WIM property, not a wim_account
210 # property, so the concepts are not related
211 wim = self.persist.get_by_name_or_uuid('wims', wim)
212 result = self.persist.create_wim_port_mappings(wim, properties, tenant)
213 self._reload_wim_threads(wim['uuid'])
214 return result
215
216 def get_wim_port_mappings(self, wim):
217 """Retrive information about port mappings from Database"""
218 return self.persist.get_wim_port_mappings(wim)
219
220 def delete_wim_port_mappings(self, wim):
221 """Erase the port mapping records associated with the WIM"""
222 wim = self.persist.get_by_name_or_uuid('wims', wim)
223 message = self.persist.delete_wim_port_mappings(wim['uuid'])
224 self._reload_wim_threads(wim['uuid'])
225 return message
226
227 def find_common_wims(self, datacenter_ids, tenant):
228 """Find WIMs that are common to all datacenters listed"""
229 mappings = self.persist.get_wim_port_mappings(
230 datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
231
232 wim_id_of = itemgetter('wim_id')
233 sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
234 grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
235 mapped_datacenters = {
236 wim_id: [m['datacenter_id'] for m in mappings]
237 for wim_id, mappings in grouped_mappings
238 }
239
240 return [
241 wim_id
242 for wim_id, connected_datacenters in mapped_datacenters.items()
243 if set(connected_datacenters) >= set(datacenter_ids)
244 ]
245
246 def find_common_wim(self, datacenter_ids, tenant):
247 """Find a single WIM that is able to connect all the datacenters
248 listed
249
250 Raises:
251 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
252 at once is found
253 """
254 suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
255
256 if not suitable_wim_ids:
257 raise NoWimConnectedToDatacenters(datacenter_ids)
258
259 # TODO: use a criteria to determine which WIM is going to be used,
260 # instead of always using the first one (strategy pattern can be
261 # used here)
262 return suitable_wim_ids[0]
263
264 def find_suitable_wim_account(self, datacenter_ids, tenant):
265 """Find a WIM account that is able to connect all the datacenters
266 listed
267
268 Arguments:
269 datacenter_ids (list): List of UUIDs of all the datacenters (vims),
270 that need to be connected.
271 tenant (str): UUID of the OSM tenant
272
273 Returns:
274 str: UUID of the WIM account that is able to connect all the
275 datacenters.
276 """
277 wim_id = self.find_common_wim(datacenter_ids, tenant)
278 return self.persist.get_wim_account_by(wim_id, tenant)['uuid']
279
280 def derive_wan_link(self,
281 wim_usage,
282 instance_scenario_id, sce_net_id,
283 networks, tenant):
284 """Create a instance_wim_nets record for the given information"""
285 if sce_net_id in wim_usage:
286 account_id = wim_usage[sce_net_id]
287 account = self.persist.get_wim_account_by(uuid=account_id)
288 wim_id = account['wim_id']
289 else:
290 datacenters = [n['datacenter_id'] for n in networks]
291 wim_id = self.find_common_wim(datacenters, tenant)
292 account = self.persist.get_wim_account_by(wim_id, tenant)
293
294 return {
295 'uuid': str(uuid4()),
296 'instance_scenario_id': instance_scenario_id,
297 'sce_net_id': sce_net_id,
298 'wim_id': wim_id,
299 'wim_account_id': account['uuid']
300 }
301
302 def derive_wan_links(self, wim_usage, networks, tenant=None):
303 """Discover and return what are the wan_links that have to be created
304 considering a set of networks (VLDs) required for a scenario instance
305 (NSR).
306
307 Arguments:
308 wim_usage(dict): Mapping between sce_net_id and wim_id
309 networks(list): Dicts containing the information about the networks
310 that will be instantiated to materialize a Network Service
311 (scenario) instance.
312 Corresponding to the ``instance_net`` record.
313
314 Returns:
315 list: list of WAN links to be written to the database
316 """
317 # Group networks by key=(instance_scenario_id, sce_net_id)
318 filtered = _filter_multi_vim(networks)
319 grouped_networks = _group_networks(filtered)
320 datacenters_per_group = _count_datacenters(grouped_networks)
321 # For each group count the number of networks. If greater then 1,
322 # we have to create a wan link connecting them.
323 wan_groups = [key
324 for key, counter in datacenters_per_group
325 if counter > 1]
326
327 return [
328 self.derive_wan_link(wim_usage,
329 key[0], key[1], grouped_networks[key], tenant)
330 for key in wan_groups
331 ]
332
333 def create_action(self, wan_link):
334 """For a single wan_link create the corresponding create action"""
335 return {
336 'action': 'CREATE',
337 'status': 'SCHEDULED',
338 'item': 'instance_wim_nets',
339 'item_id': wan_link['uuid'],
340 'wim_account_id': wan_link['wim_account_id']
341 }
342
343 def create_actions(self, wan_links):
344 """For an array of wan_links, create all the corresponding actions"""
345 return [self.create_action(l) for l in wan_links]
346
347 def delete_action(self, wan_link):
348 """For a single wan_link create the corresponding create action"""
349 return {
350 'action': 'DELETE',
351 'status': 'SCHEDULED',
352 'item': 'instance_wim_nets',
353 'item_id': wan_link['uuid'],
354 'wim_account_id': wan_link['wim_account_id'],
355 'extra': json.dumps({'wan_link': wan_link})
356 # We serialize and cache the wan_link here, because it can be
357 # deleted during the delete process
358 }
359
360 def delete_actions(self, wan_links=(), instance_scenario_id=None):
361 """Given a Instance Scenario, remove all the WAN Links created in the
362 past"""
363 if instance_scenario_id:
364 wan_links = self.persist.get_wan_links(
365 instance_scenario_id=instance_scenario_id)
366 return [self.delete_action(l) for l in wan_links]
367
368 def incorporate_actions(self, wim_actions, instance_action):
369 """Make the instance action consider new WIM actions and make the WIM
370 actions aware of the instance action
371 """
372 current = instance_action.setdefault('number_tasks', 0)
373 for i, action in enumerate(wim_actions):
374 action['task_index'] = current + i
375 action['instance_action_id'] = instance_action['uuid']
376 instance_action['number_tasks'] += len(wim_actions)
377
378 return wim_actions, instance_action
379
380 def dispatch(self, tasks):
381 """Enqueue a list of tasks for further processing.
382
383 This function is supposed to be called outside from the WIM Thread.
384 """
385 for task in tasks:
386 if task['wim_account_id'] not in self.threads:
387 error_msg = str(WimAccountNotActive(
388 'Requests send to the WIM Account %s are not currently '
389 'being processed.', task['wim_account_id']))
390 Action(task, self.logger).fail(self.persist, error_msg)
391 self.persist.update_wan_link(task['item_id'],
392 {'status': 'ERROR',
393 'error_msg': error_msg})
394 self.logger.error('Task %s %s %s not dispatched.\n%s',
395 task['action'], task['item'],
396 task['instance_account_id'], error_msg)
397 else:
398 self.threads[task['wim_account_id']].insert_task(task)
399 self.logger.debug('Task %s %s %s dispatched',
400 task['action'], task['item'],
401 task['instance_action_id'])
402
403 def _spawn_thread(self, wim_account):
404 """Spawn a WIM thread
405
406 Arguments:
407 wim_account (dict): WIM information (usually persisted)
408 The `wim` field is required to be set with a valid WIM record
409 inside the `wim_account` dict
410
411 Return:
412 threading.Thread: Thread object
413 """
414 thread = None
415 try:
416 thread = WimThread(self.persist, wim_account, ovim=self.ovim)
417 self.threads[wim_account['uuid']] = thread
418 thread.start()
419 except: # noqa
420 self.logger.error('Error when spawning WIM thread for %s',
421 wim_account['uuid'], exc_info=True)
422
423 return thread
424
425 def start_threads(self):
426 """Start the threads responsible for processing WIM Actions"""
427 accounts = self.persist.get_wim_accounts(error_if_none=False)
428 self.threads = remove_none_items(
429 {a['uuid']: self._spawn_thread(a) for a in accounts})
430
431 def stop_threads(self):
432 """Stop the threads responsible for processing WIM Actions"""
433 for uuid, thread in self.threads.items():
434 thread.exit()
435 del self.threads[uuid]
436
437 @contextmanager
438 def threads_running(self):
439 """Ensure no thread will be left running"""
440 # This method is particularly important for testing :)
441 try:
442 self.start_threads()
443 yield
444 finally:
445 self.stop_threads()
446
447
448 def _filter_multi_vim(networks):
449 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
450 return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
451
452
453 def _group_networks(networks):
454 """Group networks that correspond to the same instance_scenario_id and
455 sce_net_id (NSR and VLD).
456
457 Arguments:
458 networks(list): Dicts containing the information about the networks
459 that will be instantiated to materialize a Network Service
460 (scenario) instance.
461 Returns:
462 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
463 are lits of networks.
464 """
465 criteria = itemgetter('instance_scenario_id', 'sce_net_id')
466
467 networks = sorted(networks, key=criteria)
468 return {k: list(v) for k, v in groupby(networks, key=criteria)}
469
470
471 def _count_datacenters(grouped_networks):
472 """Count the number of datacenters in each group of networks
473
474 Returns:
475 list of tuples: the first element is the group key, while the second
476 element is the number of datacenters in each group.
477 """
478 return ((key, len(set(n['datacenter_id'] for n in group)))
479 for key, group in grouped_networks.items())