Implement feature 5949
[osm/RO.git] / osm_ro / wim / engine.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 """This module contains the domain logic, and the implementation of the
36 required steps to perform VNF management and orchestration in a WAN
37 environment.
38
39 It works as an extension/complement to the main functions contained in the
40 ``nfvo.py`` file and avoids interacting directly with the database, by relying
41 on the `persistence` module.
42
43 No http request handling/direct interaction with the database should be present
44 in this file.
45 """
46 import json
47 import logging
48 from contextlib import contextmanager
49 from itertools import groupby
50 from operator import itemgetter
51 from uuid import uuid4
52
53 from ..utils import remove_none_items
54 from .actions import Action
55 from .errors import (
56 NoWimConnectedToDatacenters,
57 WimAccountNotActive
58 )
59 from .wim_thread import WimThread
60
61
62 class WimEngine(object):
63 """Logic supporting the establishment of WAN links when NS spans across
64 different datacenters.
65 """
66 def __init__(self, persistence, logger=None, ovim=None):
67 self.persist = persistence
68 self.logger = logger or logging.getLogger('openmano.wim.engine')
69 self.threads = {}
70 self.connectors = {}
71 self.ovim = ovim
72
73 def create_wim(self, properties):
74 """Create a new wim record according to the properties
75
76 Please check the wim schema to have more information about
77 ``properties``.
78
79 Returns:
80 str: uuid of the newly created WIM record
81 """
82 return self.persist.create_wim(properties)
83
84 def get_wim(self, uuid_or_name, tenant_id=None):
85 """Retrieve existing WIM record by name or id.
86
87 If ``tenant_id`` is specified, the query will be
88 limited to the WIM associated to the given tenant.
89 """
90 # Since it is a pure DB operation, we can delegate it directly
91 return self.persist.get_wim(uuid_or_name, tenant_id)
92
93 def update_wim(self, uuid_or_name, properties):
94 """Edit an existing WIM record.
95
96 ``properties`` is a dictionary with the properties being changed,
97 if a property is not present, the old value will be preserved
98 """
99 return self.persist.update_wim(uuid_or_name, properties)
100
101 def delete_wim(self, uuid_or_name):
102 """Kill the corresponding wim threads and erase the WIM record"""
103 # Theoretically, we can rely on the database to drop the wim_accounts
104 # automatically, since we have configures 'ON CASCADE DELETE'.
105 # However, use use `delete_wim_accounts` to kill all the running
106 # threads.
107 self.delete_wim_accounts(uuid_or_name)
108 return self.persist.delete_wim(uuid_or_name)
109
110 def create_wim_account(self, wim, tenant, properties):
111 """Create an account that associates a tenant to a WIM.
112
113 As a side effect this function will spawn a new thread
114
115 Arguments:
116 wim (str): name or uuid of the WIM related to the account being
117 created
118 tenant (str): name or uuid of the nfvo tenant to which the account
119 will be created
120 properties (dict): properties of the account
121 (eg. username, password, ...)
122
123 Returns:
124 dict: Created record
125 """
126 uuid = self.persist.create_wim_account(wim, tenant, properties)
127 account = self.persist.get_wim_account_by(uuid=uuid)
128 # ^ We need to use get_wim_account_by here, since this methods returns
129 # all the associations, and we need the wim to create the thread
130 self._spawn_thread(account)
131 return account
132
133 def _update_single_wim_account(self, account, properties):
134 """Update WIM Account, taking care to reload the corresponding thread
135
136 Arguments:
137 account (dict): Current account record
138 properties (dict): Properties to be updated
139
140 Returns:
141 dict: updated record
142 """
143 account = self.persist.update_wim_account(account['uuid'], properties)
144 self.threads[account['uuid']].reload()
145 return account
146
147 def update_wim_accounts(self, wim, tenant, properties):
148 """Update all the accounts related to a WIM and a tenant,
149 thanking care of reloading threads.
150
151 Arguments:
152 wim (str): uuid or name of a WIM record
153 tenant (str): uuid or name of a NFVO tenant record
154 properties (dict): attributes with values to be updated
155
156 Returns
157 list: Records that were updated
158 """
159 accounts = self.persist.get_wim_accounts_by(wim, tenant)
160 return [self._update_single_wim_account(account, properties)
161 for account in accounts]
162
163 def _delete_single_wim_account(self, account):
164 """Delete WIM Account, taking care to remove the corresponding thread
165 and delete the internal WIM account, if it was automatically generated.
166
167 Arguments:
168 account (dict): Current account record
169 properties (dict): Properties to be updated
170
171 Returns:
172 dict: current record (same as input)
173 """
174 self.persist.delete_wim_account(account['uuid'])
175
176 if account['uuid'] not in self.threads:
177 raise WimAccountNotActive(
178 'Requests send to the WIM Account %s are not currently '
179 'being processed.', account['uuid'])
180 else:
181 self.threads[account['uuid']].exit()
182 del self.threads[account['uuid']]
183
184 return account
185
186 def delete_wim_accounts(self, wim, tenant=None, **kwargs):
187 """Delete all the accounts related to a WIM (and a tenant),
188 thanking care of threads and internal WIM accounts.
189
190 Arguments:
191 wim (str): uuid or name of a WIM record
192 tenant (str): uuid or name of a NFVO tenant record
193
194 Returns
195 list: Records that were deleted
196 """
197 kwargs.setdefault('error_if_none', False)
198 accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
199 return [self._delete_single_wim_account(a) for a in accounts]
200
201 def _reload_wim_threads(self, wim_id):
202 for thread in self.threads.values():
203 if thread.wim_account['wim_id'] == wim_id:
204 thread.reload()
205
206 def create_wim_port_mappings(self, wim, properties, tenant=None):
207 """Store information about port mappings from Database"""
208 # TODO: Review tenants... WIMs can exist across different tenants,
209 # and the port_mappings are a WIM property, not a wim_account
210 # property, so the concepts are not related
211 wim = self.persist.get_by_name_or_uuid('wims', wim)
212 result = self.persist.create_wim_port_mappings(wim, properties, tenant)
213 self._reload_wim_threads(wim['uuid'])
214 return result
215
216 def get_wim_port_mappings(self, wim):
217 """Retrive information about port mappings from Database"""
218 return self.persist.get_wim_port_mappings(wim)
219
220 def delete_wim_port_mappings(self, wim):
221 """Erase the port mapping records associated with the WIM"""
222 wim = self.persist.get_by_name_or_uuid('wims', wim)
223 message = self.persist.delete_wim_port_mappings(wim['uuid'])
224 self._reload_wim_threads(wim['uuid'])
225 return message
226
227 def find_common_wims(self, datacenter_ids, tenant):
228 """Find WIMs that are common to all datacenters listed"""
229 mappings = self.persist.get_wim_port_mappings(
230 datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
231
232 wim_id_of = itemgetter('wim_id')
233 sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
234 grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
235 mapped_datacenters = {
236 wim_id: [m['datacenter_id'] for m in mappings]
237 for wim_id, mappings in grouped_mappings
238 }
239
240 return [
241 wim_id
242 for wim_id, connected_datacenters in mapped_datacenters.items()
243 if set(connected_datacenters) >= set(datacenter_ids)
244 ]
245
246 def find_common_wim(self, datacenter_ids, tenant):
247 """Find a single WIM that is able to connect all the datacenters
248 listed
249
250 Raises
251 NoWimConnectedToDatacenters: if no WIM connected to all datacenters
252 at once is found
253 """
254 suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
255
256 if not suitable_wim_ids:
257 raise NoWimConnectedToDatacenters(datacenter_ids)
258
259 # TODO: use a criteria to determine which WIM is going to be used,
260 # instead of always using the first one (strategy pattern can be
261 # used here)
262 return suitable_wim_ids[0]
263
264 def derive_wan_link(self,
265 instance_scenario_id, sce_net_id,
266 networks, tenant):
267 """Create a instance_wim_nets record for the given information"""
268 datacenters = [n['datacenter_id'] for n in networks]
269 wim_id = self.find_common_wim(datacenters, tenant)
270
271 account = self.persist.get_wim_account_by(wim_id, tenant)
272
273 return {
274 'uuid': str(uuid4()),
275 'instance_scenario_id': instance_scenario_id,
276 'sce_net_id': sce_net_id,
277 'wim_id': wim_id,
278 'wim_account_id': account['uuid']
279 }
280
281 def derive_wan_links(self, networks, tenant=None):
282 """Discover and return what are the wan_links that have to be created
283 considering a set of networks (VLDs) required for a scenario instance
284 (NSR).
285
286 Arguments:
287 networks(list): Dicts containing the information about the networks
288 that will be instantiated to materialize a Network Service
289 (scenario) instance.
290
291 Returns:
292 list: list of WAN links to be written to the database
293 """
294 # Group networks by key=(instance_scenario_id, sce_net_id)
295 filtered = _filter_multi_vim(networks)
296 grouped_networks = _group_networks(filtered)
297 datacenters_per_group = _count_datacenters(grouped_networks)
298 # For each group count the number of networks. If greater then 1,
299 # we have to create a wan link connecting them.
300 wan_groups = [key
301 for key, counter in datacenters_per_group
302 if counter > 1]
303
304 return [
305 self.derive_wan_link(key[0], key[1], grouped_networks[key], tenant)
306 for key in wan_groups
307 ]
308
309 def create_action(self, wan_link):
310 """For a single wan_link create the corresponding create action"""
311 return {
312 'action': 'CREATE',
313 'status': 'SCHEDULED',
314 'item': 'instance_wim_nets',
315 'item_id': wan_link['uuid'],
316 'wim_account_id': wan_link['wim_account_id']
317 }
318
319 def create_actions(self, wan_links):
320 """For an array of wan_links, create all the corresponding actions"""
321 return [self.create_action(l) for l in wan_links]
322
323 def delete_action(self, wan_link):
324 """For a single wan_link create the corresponding create action"""
325 return {
326 'action': 'DELETE',
327 'status': 'SCHEDULED',
328 'item': 'instance_wim_nets',
329 'item_id': wan_link['uuid'],
330 'wim_account_id': wan_link['wim_account_id'],
331 'extra': json.dumps({'wan_link': wan_link})
332 # We serialize and cache the wan_link here, because it can be
333 # deleted during the delete process
334 }
335
336 def delete_actions(self, wan_links=(), instance_scenario_id=None):
337 """Given a Instance Scenario, remove all the WAN Links created in the
338 past"""
339 if instance_scenario_id:
340 wan_links = self.persist.get_wan_links(
341 instance_scenario_id=instance_scenario_id)
342 return [self.delete_action(l) for l in wan_links]
343
344 def incorporate_actions(self, wim_actions, instance_action):
345 """Make the instance action consider new WIM actions and make the WIM
346 actions aware of the instance action
347 """
348 current = instance_action.setdefault('number_tasks', 0)
349 for i, action in enumerate(wim_actions):
350 action['task_index'] = current + i
351 action['instance_action_id'] = instance_action['uuid']
352 instance_action['number_tasks'] += len(wim_actions)
353
354 return wim_actions, instance_action
355
356 def dispatch(self, tasks):
357 """Enqueue a list of tasks for further processing.
358
359 This function is supposed to be called outside from the WIM Thread.
360 """
361 for task in tasks:
362 if task['wim_account_id'] not in self.threads:
363 error_msg = str(WimAccountNotActive(
364 'Requests send to the WIM Account %s are not currently '
365 'being processed.', task['wim_account_id']))
366 Action(task, self.logger).fail(self.persist, error_msg)
367 self.persist.update_wan_link(task['item_id'],
368 {'status': 'ERROR',
369 'error_msg': error_msg})
370 self.logger.error('Task %s %s %s not dispatched.\n%s',
371 task['action'], task['item'],
372 task['instance_account_id'], error_msg)
373 else:
374 self.threads[task['wim_account_id']].insert_task(task)
375 self.logger.debug('Task %s %s %s dispatched',
376 task['action'], task['item'],
377 task['instance_action_id'])
378
379 def _spawn_thread(self, wim_account):
380 """Spawn a WIM thread
381
382 Arguments:
383 wim_account (dict): WIM information (usually persisted)
384 The `wim` field is required to be set with a valid WIM record
385 inside the `wim_account` dict
386
387 Return:
388 threading.Thread: Thread object
389 """
390 thread = None
391 try:
392 thread = WimThread(self.persist, wim_account, ovim=self.ovim)
393 self.threads[wim_account['uuid']] = thread
394 thread.start()
395 except: # noqa
396 self.logger.error('Error when spawning WIM thread for %s',
397 wim_account['uuid'], exc_info=True)
398
399 return thread
400
401 def start_threads(self):
402 """Start the threads responsible for processing WIM Actions"""
403 accounts = self.persist.get_wim_accounts(error_if_none=False)
404 self.threads = remove_none_items(
405 {a['uuid']: self._spawn_thread(a) for a in accounts})
406
407 def stop_threads(self):
408 """Stop the threads responsible for processing WIM Actions"""
409 for uuid, thread in self.threads.items():
410 thread.exit()
411 del self.threads[uuid]
412
413 @contextmanager
414 def threads_running(self):
415 """Ensure no thread will be left running"""
416 # This method is particularly important for testing :)
417 try:
418 self.start_threads()
419 yield
420 finally:
421 self.stop_threads()
422
423
424 def _filter_multi_vim(networks):
425 """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
426 return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
427
428
429 def _group_networks(networks):
430 """Group networks that correspond to the same instance_scenario_id and
431 sce_net_id (NSR and VLD).
432
433 Arguments:
434 networks(list): Dicts containing the information about the networks
435 that will be instantiated to materialize a Network Service
436 (scenario) instance.
437 Returns:
438 dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
439 are lits of networks.
440 """
441 criteria = itemgetter('instance_scenario_id', 'sce_net_id')
442
443 networks = sorted(networks, key=criteria)
444 return {k: list(v) for k, v in groupby(networks, key=criteria)}
445
446
447 def _count_datacenters(grouped_networks):
448 """Count the number of datacenters in each group of networks
449
450 Returns:
451 list of tuples: the first element is the group key, while the second
452 element is the number of datacenters in each group.
453 """
454 return ((key, len(set(n['datacenter_id'] for n in group)))
455 for key, group in grouped_networks.items())