Merge "Fixed accidential override of `sfc_encap`"
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 can contain an int (single index on the same instance-action) or str (compete action ID)
41 sdn_net_id: used for net.
42 tries:
43 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
44 iface_id: uuid of intance_interfaces
45 sdn_port_id:
46 sdn_net_id:
47 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
48 created: False if the VIM element is not created by other actions, and it should not be deleted
49 vim_status: VIM status of the element. Stored also at database in the instance_XXX
50 M depends: dict with task_index(from depends_on) to task class
51 M params: same as extra[params] but with the resolved dependencies
52 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_wim_actions
53 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_wim_actions
54 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
55 MD created_at: task creation time
56 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
57
58 """
59
60 import threading
61 import time
62 import Queue
63 import logging
64 import vimconn
65 import vimconn_openvim
66 import vimconn_aws
67 import vimconn_opennebula
68 import vimconn_openstack
69 import vimconn_vmware
70 import yaml
71 from db_base import db_base_Exception
72 from lib_osm_openvim.ovim import ovimException
73 from copy import deepcopy
74
75 __author__ = "Alfonso Tierno, Pablo Montes"
76 __date__ = "$28-Sep-2017 12:07:15$"
77
78 vim_module = {
79 "openvim": vimconn_openvim,
80 "aws": vimconn_aws,
81 "opennebula": vimconn_opennebula,
82 "openstack": vimconn_openstack,
83 "vmware": vimconn_vmware,
84 }
85
86
87 def is_task_id(task_id):
88 return task_id.startswith("TASK-")
89
90
91 class VimThreadException(Exception):
92 pass
93
94
95 class VimThreadExceptionNotFound(VimThreadException):
96 pass
97
98
99 class vim_thread(threading.Thread):
100 REFRESH_BUILD = 5 # 5 seconds
101 REFRESH_ACTIVE = 60 # 1 minute
102
103 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
104 db=None, db_lock=None, ovim=None):
105 """Init a thread.
106 Arguments:
107 'id' number of thead
108 'name' name of thread
109 'host','user': host ip or name to manage and user
110 'db', 'db_lock': database class and lock to use it in exclusion
111 """
112 threading.Thread.__init__(self)
113 self.vim = None
114 self.error_status = None
115 self.datacenter_name = datacenter_name
116 self.datacenter_tenant_id = datacenter_tenant_id
117 self.ovim = ovim
118 if not name:
119 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
120 else:
121 self.name = name
122 self.vim_persistent_info = {}
123
124 self.logger = logging.getLogger('openmano.vim.' + self.name)
125 self.db = db
126 self.db_lock = db_lock
127
128 self.task_lock = task_lock
129 self.task_queue = Queue.Queue(2000)
130
131 self.refresh_tasks = []
132 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
133
134 self.pending_tasks = []
135 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
136
137 self.grouped_tasks = {}
138 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
139 <item><item_id>:
140 - <task1> # e.g. CREATE task
141 <task2> # e.g. DELETE task
142 """
143
144 def get_vimconnector(self):
145 try:
146 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
147 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
148 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
149 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
150 'user', 'passwd', 'dt.config as dt_config')
151 where_ = {"dt.uuid": self.datacenter_tenant_id}
152 with self.db_lock:
153 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
154 vim = vims[0]
155 vim_config = {}
156 if vim["config"]:
157 vim_config.update(yaml.load(vim["config"]))
158 if vim["dt_config"]:
159 vim_config.update(yaml.load(vim["dt_config"]))
160 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
161 vim_config['datacenter_id'] = vim.get('datacenter_id')
162
163 # get port_mapping
164 vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
165 db_filter={"region": vim_config['datacenter_id'], "pci": None})
166
167 self.vim = vim_module[vim["type"]].vimconnector(
168 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
169 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
170 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
171 user=vim['user'], passwd=vim['passwd'],
172 config=vim_config, persistent_info=self.vim_persistent_info
173 )
174 self.error_status = None
175 except Exception as e:
176 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
177 self.vim = None
178 self.error_status = "Error loading vimconnector: {}".format(e)
179
180 def _reload_vim_actions(self):
181 """
182 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
183 :return: None
184 """
185 try:
186 action_completed = False
187 task_list = []
188 old_action_key = None
189
190 old_item_id = ""
191 old_item = ""
192 old_created_at = 0.0
193 database_limit = 200
194 while True:
195 # get 200 (database_limit) entries each time
196 with self.db_lock:
197 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
198 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
199 "item_id>=": old_item_id},
200 ORDER_BY=("item_id", "item", "created_at",),
201 LIMIT=database_limit)
202 for task in vim_actions:
203 item = task["item"]
204 item_id = task["item_id"]
205
206 # skip the first entries that are already processed in the previous pool of 200
207 if old_item_id:
208 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
209 old_item_id = False # next one will be a new un-processed task
210 continue
211
212 action_key = item + item_id
213 if old_action_key != action_key:
214 if not action_completed and task_list:
215 # This will fill needed task parameters into memory, and insert the task if needed in
216 # self.pending_tasks or self.refresh_tasks
217 try:
218 self._insert_pending_tasks(task_list)
219 except Exception as e:
220 self.logger.critical(
221 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
222 exc_info=True)
223 task_list = []
224 old_action_key = action_key
225 action_completed = False
226 elif action_completed:
227 continue
228
229 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
230 task_list.append(task)
231 elif task["action"] == "DELETE":
232 # action completed because deleted and status is not SCHEDULED. Not needed anything
233 action_completed = True
234 if len(vim_actions) == database_limit:
235 # update variables for get the next database iteration
236 old_item_id = item_id
237 old_item = item
238 old_created_at = task["created_at"]
239 else:
240 break
241 # Last actions group need to be inserted too
242 if not action_completed and task_list:
243 try:
244 self._insert_pending_tasks(task_list)
245 except Exception as e:
246 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
247 exc_info=True)
248 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
249 len(self.pending_tasks), len(self.refresh_tasks)))
250 except Exception as e:
251 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
252
253 def _refres_elements(self):
254 """Call VIM to get VMs and networks status until 10 elements"""
255 now = time.time()
256 nb_processed = 0
257 vm_to_refresh_list = []
258 net_to_refresh_list = []
259 vm_to_refresh_dict = {}
260 net_to_refresh_dict = {}
261 items_to_refresh = 0
262 while self.refresh_tasks:
263 task = self.refresh_tasks[0]
264 with self.task_lock:
265 if task['status'] == 'SUPERSEDED':
266 self.refresh_tasks.pop(0)
267 continue
268 if task['modified_at'] > now:
269 break
270 # task["status"] = "processing"
271 nb_processed += 1
272 self.refresh_tasks.pop(0)
273 if task["item"] == 'instance_vms':
274 if task["vim_id"] not in vm_to_refresh_dict:
275 vm_to_refresh_dict[task["vim_id"]] = [task]
276 vm_to_refresh_list.append(task["vim_id"])
277 else:
278 vm_to_refresh_dict[task["vim_id"]].append(task)
279 elif task["item"] == 'instance_nets':
280 if task["vim_id"] not in net_to_refresh_dict:
281 net_to_refresh_dict[task["vim_id"]] = [task]
282 net_to_refresh_list.append(task["vim_id"])
283 else:
284 net_to_refresh_dict[task["vim_id"]].append(task)
285 else:
286 task_id = task["instance_action_id"] + "." + str(task["task_index"])
287 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
288 items_to_refresh += 1
289 if items_to_refresh == 10:
290 break
291
292 if vm_to_refresh_list:
293 now = time.time()
294 try:
295 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
296 except vimconn.vimconnException as e:
297 # Mark all tasks at VIM_ERROR status
298 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
299 vim_dict = {}
300 for vim_id in vm_to_refresh_list:
301 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
302
303 for vim_id, vim_info in vim_dict.items():
304
305 # look for task
306 for task in vm_to_refresh_dict[vim_id]:
307 task_need_update = False
308 task_id = task["instance_action_id"] + "." + str(task["task_index"])
309 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
310
311 # check and update interfaces
312 task_warning_msg = ""
313 for interface in vim_info.get("interfaces", ()):
314 vim_interface_id = interface["vim_interface_id"]
315 if vim_interface_id not in task["extra"]["interfaces"]:
316 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
317 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
318 continue
319 task_interface = task["extra"]["interfaces"][vim_interface_id]
320 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
321 if task_vim_interface != interface:
322 # delete old port
323 if task_interface.get("sdn_port_id"):
324 try:
325 with self.db_lock:
326 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
327 task_interface["sdn_port_id"] = None
328 task_need_update = True
329 except ovimException as e:
330 error_text = "ovimException deleting external_port={}: {}".format(
331 task_interface["sdn_port_id"], e)
332 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
333 task_warning_msg += error_text
334 # TODO Set error_msg at instance_nets instead of instance VMs
335
336 # Create SDN port
337 sdn_net_id = task_interface.get("sdn_net_id")
338 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
339 sdn_port_name = sdn_net_id + "." + task["vim_id"]
340 sdn_port_name = sdn_port_name[:63]
341 try:
342 with self.db_lock:
343 sdn_port_id = self.ovim.new_external_port(
344 {"compute_node": interface["compute_node"],
345 "pci": interface["pci"],
346 "vlan": interface.get("vlan"),
347 "net_id": sdn_net_id,
348 "region": self.vim["config"]["datacenter_id"],
349 "name": sdn_port_name,
350 "mac": interface.get("mac_address")})
351 task_interface["sdn_port_id"] = sdn_port_id
352 task_need_update = True
353 except (ovimException, Exception) as e:
354 error_text = "ovimException creating new_external_port compute_node={}" \
355 " pci={} vlan={} {}".format(
356 interface["compute_node"],
357 interface["pci"],
358 interface.get("vlan"), e)
359 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
360 task_warning_msg += error_text
361 # TODO Set error_msg at instance_nets instead of instance VMs
362
363 with self.db_lock:
364 self.db.update_rows(
365 'instance_interfaces',
366 UPDATE={"mac_address": interface.get("mac_address"),
367 "ip_address": interface.get("ip_address"),
368 "vim_interface_id": interface.get("vim_interface_id"),
369 "vim_info": interface.get("vim_info"),
370 "sdn_port_id": task_interface.get("sdn_port_id"),
371 "compute_node": interface.get("compute_node"),
372 "pci": interface.get("pci"),
373 "vlan": interface.get("vlan")},
374 WHERE={'uuid': task_interface["iface_id"]})
375 task["vim_interfaces"][vim_interface_id] = interface
376
377 # check and update task and instance_vms database
378 vim_info_error_msg = None
379 if vim_info.get("error_msg"):
380 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
381 elif task_warning_msg:
382 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
383 task_vim_info = task.get("vim_info")
384 task_error_msg = task.get("error_msg")
385 task_vim_status = task["extra"].get("vim_status")
386 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
387 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
388 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
389 if vim_info.get("vim_info"):
390 temp_dict["vim_info"] = vim_info["vim_info"]
391 with self.db_lock:
392 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
393 task["extra"]["vim_status"] = vim_info["status"]
394 task["error_msg"] = vim_info_error_msg
395 if vim_info.get("vim_info"):
396 task["vim_info"] = vim_info["vim_info"]
397 task_need_update = True
398
399 if task_need_update:
400 with self.db_lock:
401 self.db.update_rows(
402 'vim_wim_actions',
403 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
404 "error_msg": task.get("error_msg"), "modified_at": now},
405 WHERE={'instance_action_id': task['instance_action_id'],
406 'task_index': task['task_index']})
407 if task["extra"].get("vim_status") == "BUILD":
408 self._insert_refresh(task, now + self.REFRESH_BUILD)
409 else:
410 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
411
412 if net_to_refresh_list:
413 now = time.time()
414 try:
415 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
416 except vimconn.vimconnException as e:
417 # Mark all tasks at VIM_ERROR status
418 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
419 vim_dict = {}
420 for vim_id in net_to_refresh_list:
421 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
422
423 for vim_id, vim_info in vim_dict.items():
424 # look for task
425 for task in net_to_refresh_dict[vim_id]:
426 task_id = task["instance_action_id"] + "." + str(task["task_index"])
427 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
428
429 task_vim_info = task.get("vim_info")
430 task_vim_status = task["extra"].get("vim_status")
431 task_error_msg = task.get("error_msg")
432 task_sdn_net_id = task["extra"].get("sdn_net_id")
433
434 vim_info_status = vim_info["status"]
435 vim_info_error_msg = vim_info.get("error_msg")
436 # get ovim status
437 if task_sdn_net_id:
438 try:
439 with self.db_lock:
440 sdn_net = self.ovim.show_network(task_sdn_net_id)
441 except (ovimException, Exception) as e:
442 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
443 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
444 sdn_net = {"status": "ERROR", "last_error": text_error}
445 if sdn_net["status"] == "ERROR":
446 if not vim_info_error_msg:
447 vim_info_error_msg = str(sdn_net.get("last_error"))
448 else:
449 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
450 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
451 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
452 vim_info_status = "ERROR"
453 elif sdn_net["status"] == "BUILD":
454 if vim_info_status == "ACTIVE":
455 vim_info_status = "BUILD"
456
457 # update database
458 if vim_info_error_msg:
459 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
460 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
461 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
462 task["extra"]["vim_status"] = vim_info_status
463 task["error_msg"] = vim_info_error_msg
464 if vim_info.get("vim_info"):
465 task["vim_info"] = vim_info["vim_info"]
466 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
467 if vim_info.get("vim_info"):
468 temp_dict["vim_info"] = vim_info["vim_info"]
469 with self.db_lock:
470 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
471 self.db.update_rows(
472 'vim_wim_actions',
473 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
474 "error_msg": task.get("error_msg"), "modified_at": now},
475 WHERE={'instance_action_id': task['instance_action_id'],
476 'task_index': task['task_index']})
477 if task["extra"].get("vim_status") == "BUILD":
478 self._insert_refresh(task, now + self.REFRESH_BUILD)
479 else:
480 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
481
482 return nb_processed
483
484 def _insert_refresh(self, task, threshold_time=None):
485 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
486 It is assumed that this is called inside this thread
487 """
488 if not self.vim:
489 return
490 if not threshold_time:
491 threshold_time = time.time()
492 task["modified_at"] = threshold_time
493 task_name = task["item"][9:] + "-" + task["action"]
494 task_id = task["instance_action_id"] + "." + str(task["task_index"])
495 for index in range(0, len(self.refresh_tasks)):
496 if self.refresh_tasks[index]["modified_at"] > threshold_time:
497 self.refresh_tasks.insert(index, task)
498 break
499 else:
500 index = len(self.refresh_tasks)
501 self.refresh_tasks.append(task)
502 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
503 task_id, task_name, task["modified_at"], index))
504
505 def _remove_refresh(self, task_name, vim_id):
506 """Remove a task with this name and vim_id from the list of refreshing elements.
507 It is assumed that this is called inside this thread outside _refres_elements method
508 Return True if self.refresh_list is modified, task is found
509 Return False if not found
510 """
511 index_to_delete = None
512 for index in range(0, len(self.refresh_tasks)):
513 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
514 index_to_delete = index
515 break
516 else:
517 return False
518 if not index_to_delete:
519 del self.refresh_tasks[index_to_delete]
520 return True
521
522 def _proccess_pending_tasks(self):
523 nb_created = 0
524 nb_processed = 0
525 while self.pending_tasks:
526 task = self.pending_tasks.pop(0)
527 nb_processed += 1
528 try:
529 # check if tasks that this depends on have been completed
530 dependency_not_completed = False
531 for task_index in task["extra"].get("depends_on", ()):
532 task_dependency = task["depends"].get("TASK-" + str(task_index))
533 if not task_dependency:
534 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
535 if not task_dependency:
536 raise VimThreadException(
537 "Cannot get depending net task trying to get depending task {}.{}".format(
538 task["instance_action_id"], task_index))
539 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
540 if task_dependency["status"] == "SCHEDULED":
541 dependency_not_completed = True
542 break
543 elif task_dependency["status"] == "FAILED":
544 raise VimThreadException(
545 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
546 task["action"], task["item"],
547 task["instance_action_id"], task["task_index"],
548 task_dependency["instance_action_id"], task_dependency["task_index"],
549 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
550 if dependency_not_completed:
551 # Move this task to the end.
552 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
553 if task["extra"]["tries"] <= 3:
554 self.pending_tasks.append(task)
555 continue
556 else:
557 raise VimThreadException(
558 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
559 "(task {}.{})".format(task["action"], task["item"],
560 task["instance_action_id"], task["task_index"],
561 task_dependency["instance_action_id"], task_dependency["task_index"],
562 task_dependency["action"], task_dependency["item"]))
563
564 if task["status"] == "SUPERSEDED":
565 # not needed to do anything but update database with the new status
566 result = True
567 database_update = None
568 elif not self.vim:
569 task["status"] = "ERROR"
570 task["error_msg"] = self.error_status
571 result = False
572 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
573 elif task["item"] == 'instance_vms':
574 if task["action"] == "CREATE":
575 result, database_update = self.new_vm(task)
576 nb_created += 1
577 elif task["action"] == "DELETE":
578 result, database_update = self.del_vm(task)
579 else:
580 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
581 elif task["item"] == 'instance_nets':
582 if task["action"] == "CREATE":
583 result, database_update = self.new_net(task)
584 nb_created += 1
585 elif task["action"] == "DELETE":
586 result, database_update = self.del_net(task)
587 elif task["action"] == "FIND":
588 result, database_update = self.get_net(task)
589 else:
590 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
591 elif task["item"] == 'instance_sfis':
592 if task["action"] == "CREATE":
593 result, database_update = self.new_sfi(task)
594 nb_created += 1
595 elif task["action"] == "DELETE":
596 result, database_update = self.del_sfi(task)
597 else:
598 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
599 elif task["item"] == 'instance_sfs':
600 if task["action"] == "CREATE":
601 result, database_update = self.new_sf(task)
602 nb_created += 1
603 elif task["action"] == "DELETE":
604 result, database_update = self.del_sf(task)
605 else:
606 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
607 elif task["item"] == 'instance_classifications':
608 if task["action"] == "CREATE":
609 result, database_update = self.new_classification(task)
610 nb_created += 1
611 elif task["action"] == "DELETE":
612 result, database_update = self.del_classification(task)
613 else:
614 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
615 elif task["item"] == 'instance_sfps':
616 if task["action"] == "CREATE":
617 result, database_update = self.new_sfp(task)
618 nb_created += 1
619 elif task["action"] == "DELETE":
620 result, database_update = self.del_sfp(task)
621 else:
622 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
623 else:
624 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
625 # TODO
626 except VimThreadException as e:
627 result = False
628 task["error_msg"] = str(e)
629 task["status"] = "FAILED"
630 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
631 if task["item"] == 'instance_vms':
632 database_update["vim_vm_id"] = None
633 elif task["item"] == 'instance_nets':
634 database_update["vim_net_id"] = None
635
636 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
637 'instance_classifications', 'instance_sfps']
638 if task["action"] == "DELETE":
639 action_key = task["item"] + task["item_id"]
640 del self.grouped_tasks[action_key]
641 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
642 if task["item"] not in no_refresh_tasks:
643 self._insert_refresh(task)
644
645 task_id = task["instance_action_id"] + "." + str(task["task_index"])
646 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
647 task_id, task["item"], task["action"], task["status"],
648 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
649 try:
650 now = time.time()
651 with self.db_lock:
652 self.db.update_rows(
653 table="vim_wim_actions",
654 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
655 "error_msg": task["error_msg"],
656 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
657 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
658 if result is not None:
659 self.db.update_rows(
660 table="instance_actions",
661 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
662 "modified_at": now},
663 WHERE={"uuid": task["instance_action_id"]})
664 if database_update:
665 self.db.update_rows(table=task["item"],
666 UPDATE=database_update,
667 WHERE={"uuid": task["item_id"]})
668 except db_base_Exception as e:
669 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
670
671 if nb_created == 10:
672 break
673 return nb_processed
674
675 def _insert_pending_tasks(self, vim_actions_list):
676 for task in vim_actions_list:
677 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
678 continue
679 item = task["item"]
680 item_id = task["item_id"]
681 action_key = item + item_id
682 if action_key not in self.grouped_tasks:
683 self.grouped_tasks[action_key] = []
684 task["params"] = None
685 task["depends"] = {}
686 if task["extra"]:
687 extra = yaml.load(task["extra"])
688 task["extra"] = extra
689 task["params"] = extra.get("params")
690 depends_on_list = extra.get("depends_on")
691 if depends_on_list:
692 for dependency_task in depends_on_list:
693 if isinstance(dependency_task, int):
694 index = dependency_task
695 else:
696 instance_action_id, _, task_id = dependency_task.rpartition(".")
697 if instance_action_id != task["instance_action_id"]:
698 continue
699 index = int(task_id)
700
701 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and \
702 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
703 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
704 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index]
705 if extra.get("interfaces"):
706 task["vim_interfaces"] = {}
707 else:
708 task["extra"] = {}
709 if "error_msg" not in task:
710 task["error_msg"] = None
711 if "vim_id" not in task:
712 task["vim_id"] = None
713
714 if task["action"] == "DELETE":
715 need_delete_action = False
716 for to_supersede in self.grouped_tasks.get(action_key, ()):
717 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
718 task["vim_id"] = to_supersede["vim_id"]
719 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
720 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
721 need_delete_action = True
722 task["vim_id"] = to_supersede["vim_id"]
723 if to_supersede["extra"].get("sdn_net_id"):
724 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
725 if to_supersede["extra"].get("interfaces"):
726 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
727 if to_supersede["extra"].get("created_items"):
728 if not task["extra"].get("created_items"):
729 task["extra"]["created_items"] = {}
730 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
731 # Mark task as SUPERSEDED.
732 # If task is in self.pending_tasks, it will be removed and database will be update
733 # If task is in self.refresh_tasks, it will be removed
734 to_supersede["status"] = "SUPERSEDED"
735 if not need_delete_action:
736 task["status"] = "SUPERSEDED"
737
738 self.grouped_tasks[action_key].append(task)
739 self.pending_tasks.append(task)
740 elif task["status"] == "SCHEDULED":
741 self.grouped_tasks[action_key].append(task)
742 self.pending_tasks.append(task)
743 elif task["action"] in ("CREATE", "FIND"):
744 self.grouped_tasks[action_key].append(task)
745 if task["status"] in ("DONE", "BUILD"):
746 self._insert_refresh(task)
747 # TODO add VM reset, get console, etc...
748 else:
749 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
750
751 def insert_task(self, task):
752 try:
753 self.task_queue.put(task, False)
754 return None
755 except Queue.Full:
756 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
757
758 def del_task(self, task):
759 with self.task_lock:
760 if task["status"] == "SCHEDULED":
761 task["status"] = "SUPERSEDED"
762 return True
763 else: # task["status"] == "processing"
764 self.task_lock.release()
765 return False
766
767 def run(self):
768 self.logger.debug("Starting")
769 while True:
770 self.get_vimconnector()
771 self.logger.debug("Vimconnector loaded")
772 self._reload_vim_actions()
773 reload_thread = False
774
775 while True:
776 try:
777 while not self.task_queue.empty():
778 task = self.task_queue.get()
779 if isinstance(task, list):
780 self._insert_pending_tasks(task)
781 elif isinstance(task, str):
782 if task == 'exit':
783 return 0
784 elif task == 'reload':
785 reload_thread = True
786 break
787 self.task_queue.task_done()
788 if reload_thread:
789 break
790 nb_processed = self._proccess_pending_tasks()
791 nb_processed += self._refres_elements()
792 if not nb_processed:
793 time.sleep(1)
794
795 except Exception as e:
796 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
797
798 self.logger.debug("Finishing")
799
800 def _look_for_task(self, instance_action_id, task_id):
801 """
802 Look for a concrete task at vim_actions database table
803 :param instance_action_id: The instance_action_id
804 :param task_id: Can have several formats:
805 <task index>: integer
806 TASK-<task index> :backward compatibility,
807 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
808 :return: Task dictionary or None if not found
809 """
810 if isinstance(task_id, int):
811 task_index = task_id
812 else:
813 if task_id.startswith("TASK-"):
814 task_id = task_id[5:]
815 ins_action_id, _, task_index = task_id.rpartition(".")
816 if ins_action_id:
817 instance_action_id = ins_action_id
818
819 with self.db_lock:
820 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
821 "task_index": task_index})
822 if not tasks:
823 return None
824 task = tasks[0]
825 task["params"] = None
826 task["depends"] = {}
827 if task["extra"]:
828 extra = yaml.load(task["extra"])
829 task["extra"] = extra
830 task["params"] = extra.get("params")
831 if extra.get("interfaces"):
832 task["vim_interfaces"] = {}
833 else:
834 task["extra"] = {}
835 return task
836
837 @staticmethod
838 def _format_vim_error_msg(error_text, max_length=1024):
839 if error_text and len(error_text) >= max_length:
840 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
841 return error_text
842
843 def new_vm(self, task):
844 task_id = task["instance_action_id"] + "." + str(task["task_index"])
845 try:
846 params = task["params"]
847 depends = task.get("depends")
848 net_list = params[5]
849 for net in net_list:
850 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
851 task_dependency = task["depends"].get(net["net_id"])
852 if not task_dependency:
853 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
854 if not task_dependency:
855 raise VimThreadException(
856 "Cannot get depending net task trying to get depending task {}.{}".format(
857 task["instance_action_id"], net["net_id"]))
858 network_id = task_dependency.get("vim_id")
859 if not network_id:
860 raise VimThreadException(
861 "Cannot create VM because depends on a network not created or found: " +
862 str(depends[net["net_id"]]["error_msg"]))
863 net["net_id"] = network_id
864 params_copy = deepcopy(params)
865 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
866
867 # fill task_interfaces. Look for snd_net_id at database for each interface
868 task_interfaces = {}
869 for iface in params_copy[5]:
870 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
871 with self.db_lock:
872 result = self.db.get_rows(
873 SELECT=('sdn_net_id', 'interface_id'),
874 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
875 WHERE={'ii.uuid': iface["uuid"]})
876 if result:
877 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
878 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
879 else:
880 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
881 iface["uuid"]), exc_info=True)
882
883 task["vim_info"] = {}
884 task["vim_interfaces"] = {}
885 task["extra"]["interfaces"] = task_interfaces
886 task["extra"]["created"] = True
887 task["extra"]["created_items"] = created_items
888 task["error_msg"] = None
889 task["status"] = "DONE"
890 task["vim_id"] = vim_vm_id
891 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
892 return True, instance_element_update
893
894 except (vimconn.vimconnException, VimThreadException) as e:
895 self.logger.error("task={} new-VM: {}".format(task_id, e))
896 error_text = self._format_vim_error_msg(str(e))
897 task["error_msg"] = error_text
898 task["status"] = "FAILED"
899 task["vim_id"] = None
900 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
901 return False, instance_element_update
902
903 def del_vm(self, task):
904 task_id = task["instance_action_id"] + "." + str(task["task_index"])
905 vm_vim_id = task["vim_id"]
906 interfaces = task["extra"].get("interfaces", ())
907 try:
908 for iface in interfaces.values():
909 if iface.get("sdn_port_id"):
910 try:
911 with self.db_lock:
912 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
913 except ovimException as e:
914 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
915 task_id, iface["sdn_port_id"], e), exc_info=True)
916 # TODO Set error_msg at instance_nets
917
918 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
919 task["status"] = "DONE"
920 task["error_msg"] = None
921 return True, None
922
923 except vimconn.vimconnException as e:
924 task["error_msg"] = self._format_vim_error_msg(str(e))
925 if isinstance(e, vimconn.vimconnNotFoundException):
926 # If not found mark as Done and fill error_msg
927 task["status"] = "DONE"
928 return True, None
929 task["status"] = "FAILED"
930 return False, None
931
932 def _get_net_internal(self, task, filter_param):
933 """
934 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
935 :param task: task for this find or find-or-create action
936 :param filter_param: parameters to send to the vimconnector
937 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
938 when network is not found or found more than one
939 """
940 vim_nets = self.vim.get_network_list(filter_param)
941 if not vim_nets:
942 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
943 elif len(vim_nets) > 1:
944 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
945 vim_net_id = vim_nets[0]["id"]
946
947 # Discover if this network is managed by a sdn controller
948 sdn_net_id = None
949 with self.db_lock:
950 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
951 WHERE={'vim_net_id': vim_net_id,
952 'datacenter_tenant_id': self.datacenter_tenant_id},
953 ORDER="instance_scenario_id")
954 if result:
955 sdn_net_id = result[0]['sdn_net_id']
956
957 task["status"] = "DONE"
958 task["extra"]["vim_info"] = {}
959 task["extra"]["created"] = False
960 task["extra"]["sdn_net_id"] = sdn_net_id
961 task["error_msg"] = None
962 task["vim_id"] = vim_net_id
963 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
964 "error_msg": None, "sdn_net_id": sdn_net_id}
965 return instance_element_update
966
967 def get_net(self, task):
968 task_id = task["instance_action_id"] + "." + str(task["task_index"])
969 try:
970 params = task["params"]
971 filter_param = params[0]
972 instance_element_update = self._get_net_internal(task, filter_param)
973 return True, instance_element_update
974
975 except (vimconn.vimconnException, VimThreadException) as e:
976 self.logger.error("task={} get-net: {}".format(task_id, e))
977 task["status"] = "FAILED"
978 task["vim_id"] = None
979 task["error_msg"] = self._format_vim_error_msg(str(e))
980 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
981 "error_msg": task["error_msg"]}
982 return False, instance_element_update
983
984 def new_net(self, task):
985 vim_net_id = None
986 sdn_net_id = None
987 task_id = task["instance_action_id"] + "." + str(task["task_index"])
988 action_text = ""
989 try:
990 # FIND
991 if task["extra"].get("find"):
992 action_text = "finding"
993 filter_param = task["extra"]["find"][0]
994 try:
995 instance_element_update = self._get_net_internal(task, filter_param)
996 return True, instance_element_update
997 except VimThreadExceptionNotFound:
998 pass
999 # CREATE
1000 params = task["params"]
1001 action_text = "creating VIM"
1002 vim_net_id = self.vim.new_network(*params[0:2])
1003
1004 net_name = params[0]
1005 net_type = params[1]
1006 wim_account_name = params[3]
1007
1008 sdn_controller = self.vim.config.get('sdn-controller')
1009 if sdn_controller and (net_type == "data" or net_type == "ptp"):
1010 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
1011
1012 vim_net = self.vim.get_network(vim_net_id)
1013 if vim_net.get('encapsulation') != 'vlan':
1014 raise vimconn.vimconnException(
1015 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
1016 net_name, net_type, vim_net['encapsulation']))
1017 network["vlan"] = vim_net.get('segmentation_id')
1018 action_text = "creating SDN"
1019 with self.db_lock:
1020 sdn_net_id = self.ovim.new_network(network)
1021
1022 if wim_account_name and self.vim.config["wim_external_ports"]:
1023 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1024 action_text = "attaching external port to ovim network"
1025 sdn_port_name = sdn_net_id + "." + task["vim_id"]
1026 sdn_port_name = sdn_port_name[:63]
1027 sdn_port_data = {
1028 "compute_node": "__WIM:" + wim_account_name[0:58],
1029 "pci": None,
1030 "vlan": network["vlan"],
1031 "net_id": sdn_net_id,
1032 "region": self.vim["config"]["datacenter_id"],
1033 "name": sdn_port_name,
1034 }
1035 try:
1036 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1037 except ovimException:
1038 sdn_port_data["compute_node"] = "__WIM"
1039 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1040 self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1041 sdn_net_id))
1042
1043 task["status"] = "DONE"
1044 task["extra"]["vim_info"] = {}
1045 task["extra"]["sdn_net_id"] = sdn_net_id
1046 task["extra"]["created"] = True
1047 task["error_msg"] = None
1048 task["vim_id"] = vim_net_id
1049 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1050 "created": True, "error_msg": None}
1051 return True, instance_element_update
1052 except (vimconn.vimconnException, ovimException) as e:
1053 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1054 task["status"] = "FAILED"
1055 task["vim_id"] = vim_net_id
1056 task["error_msg"] = self._format_vim_error_msg(str(e))
1057 task["extra"]["sdn_net_id"] = sdn_net_id
1058 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1059 "error_msg": task["error_msg"]}
1060 return False, instance_element_update
1061
1062 def del_net(self, task):
1063 net_vim_id = task["vim_id"]
1064 sdn_net_id = task["extra"].get("sdn_net_id")
1065 try:
1066 if sdn_net_id:
1067 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1068 # it was manually done using 'openmano vim-net-sdn-attach'
1069 with self.db_lock:
1070 port_list = self.ovim.get_ports(columns={'uuid'},
1071 filter={'name': 'external_port', 'net_id': sdn_net_id})
1072 for port in port_list:
1073 self.ovim.delete_port(port['uuid'], idempotent=True)
1074 self.ovim.delete_network(sdn_net_id, idempotent=True)
1075 if net_vim_id:
1076 self.vim.delete_network(net_vim_id)
1077 task["status"] = "DONE"
1078 task["error_msg"] = None
1079 return True, None
1080 except ovimException as e:
1081 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1082 "ports for net {}: {}".format(sdn_net_id, str(e)))
1083 except vimconn.vimconnException as e:
1084 task["error_msg"] = self._format_vim_error_msg(str(e))
1085 if isinstance(e, vimconn.vimconnNotFoundException):
1086 # If not found mark as Done and fill error_msg
1087 task["status"] = "DONE"
1088 return True, None
1089 task["status"] = "FAILED"
1090 return False, None
1091
1092 ## Service Function Instances
1093
1094 def new_sfi(self, task):
1095 vim_sfi_id = None
1096 try:
1097 # Waits for interfaces to be ready (avoids failure)
1098 time.sleep(1)
1099 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1100 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1101 error_text = ""
1102 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1103 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1104 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1105 ingress_vim_interface_id = None
1106 egress_vim_interface_id = None
1107 for vim_interface, interface_data in interfaces.iteritems():
1108 if interface_data.get("interface_id") == ingress_interface_id:
1109 ingress_vim_interface_id = vim_interface
1110 break
1111 if ingress_interface_id != egress_interface_id:
1112 for vim_interface, interface_data in interfaces.iteritems():
1113 if interface_data.get("interface_id") == egress_interface_id:
1114 egress_vim_interface_id = vim_interface
1115 break
1116 else:
1117 egress_vim_interface_id = ingress_vim_interface_id
1118 if not ingress_vim_interface_id or not egress_vim_interface_id:
1119 self.logger.error("Error creating Service Function Instance, Ingress: %s, Egress: %s",
1120 ingress_vim_interface_id, egress_vim_interface_id)
1121 return False, None
1122 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1123 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1124 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1125 ingress_port_id_list = [ingress_vim_interface_id]
1126 egress_port_id_list = [egress_vim_interface_id]
1127 name = "sfi-%s" % task["item_id"][:8]
1128 # By default no form of IETF SFC Encapsulation will be used
1129 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1130
1131 task["extra"]["created"] = True
1132 task["error_msg"] = None
1133 task["status"] = "DONE"
1134 task["vim_id"] = vim_sfi_id
1135 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1136 return True, instance_element_update
1137
1138 except (vimconn.vimconnException, VimThreadException) as e:
1139 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1140 error_text = self._format_vim_error_msg(str(e))
1141 task["error_msg"] = error_text
1142 task["status"] = "FAILED"
1143 task["vim_id"] = None
1144 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1145 return False, instance_element_update
1146
1147 def del_sfi(self, task):
1148 sfi_vim_id = task["vim_id"]
1149 try:
1150 self.vim.delete_sfi(sfi_vim_id)
1151 task["status"] = "DONE"
1152 task["error_msg"] = None
1153 return True, None
1154
1155 except vimconn.vimconnException as e:
1156 task["error_msg"] = self._format_vim_error_msg(str(e))
1157 if isinstance(e, vimconn.vimconnNotFoundException):
1158 # If not found mark as Done and fill error_msg
1159 task["status"] = "DONE"
1160 return True, None
1161 task["status"] = "FAILED"
1162 return False, None
1163
1164 def new_sf(self, task):
1165 vim_sf_id = None
1166 try:
1167 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1168 error_text = ""
1169 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1170 # sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1171 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1172 sfi_id_list = []
1173 for sfi in sfis:
1174 sfi_id_list.append(sfi.get("vim_id"))
1175 name = "sf-%s" % task["item_id"][:8]
1176 # By default no form of IETF SFC Encapsulation will be used
1177 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1178
1179 task["extra"]["created"] = True
1180 task["error_msg"] = None
1181 task["status"] = "DONE"
1182 task["vim_id"] = vim_sf_id
1183 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1184 return True, instance_element_update
1185
1186 except (vimconn.vimconnException, VimThreadException) as e:
1187 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1188 error_text = self._format_vim_error_msg(str(e))
1189 task["error_msg"] = error_text
1190 task["status"] = "FAILED"
1191 task["vim_id"] = None
1192 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1193 return False, instance_element_update
1194
1195 def del_sf(self, task):
1196 sf_vim_id = task["vim_id"]
1197 try:
1198 self.vim.delete_sf(sf_vim_id)
1199 task["status"] = "DONE"
1200 task["error_msg"] = None
1201 return True, None
1202
1203 except vimconn.vimconnException as e:
1204 task["error_msg"] = self._format_vim_error_msg(str(e))
1205 if isinstance(e, vimconn.vimconnNotFoundException):
1206 # If not found mark as Done and fill error_msg
1207 task["status"] = "DONE"
1208 return True, None
1209 task["status"] = "FAILED"
1210 return False, None
1211
1212 def new_classification(self, task):
1213 vim_classification_id = None
1214 try:
1215 params = task["params"]
1216 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1217 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1218 error_text = ""
1219 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1220 # Bear in mind that different VIM connectors might support Classifications differently.
1221 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1222 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1223 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1224 # using the IPv4 flow classifier.
1225 name = "c-%s" % task["item_id"][:8]
1226 # if not CIDR is given for the IP addresses, add /32:
1227 ip_proto = int(params.get("ip_proto"))
1228 source_ip = params.get("source_ip")
1229 destination_ip = params.get("destination_ip")
1230 if ip_proto == 1:
1231 ip_proto = 'icmp'
1232 elif ip_proto == 6:
1233 ip_proto = 'tcp'
1234 elif ip_proto == 17:
1235 ip_proto = 'udp'
1236 if '/' not in source_ip:
1237 source_ip += '/32'
1238 if '/' not in destination_ip:
1239 destination_ip += '/32'
1240 definition = {
1241 "logical_source_port": interfaces[0],
1242 "protocol": ip_proto,
1243 "source_ip_prefix": source_ip,
1244 "destination_ip_prefix": destination_ip,
1245 "source_port_range_min": params.get("source_port"),
1246 "source_port_range_max": params.get("source_port"),
1247 "destination_port_range_min": params.get("destination_port"),
1248 "destination_port_range_max": params.get("destination_port"),
1249 }
1250
1251 vim_classification_id = self.vim.new_classification(
1252 name, 'legacy_flow_classifier', definition)
1253
1254 task["extra"]["created"] = True
1255 task["error_msg"] = None
1256 task["status"] = "DONE"
1257 task["vim_id"] = vim_classification_id
1258 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1259 return True, instance_element_update
1260
1261 except (vimconn.vimconnException, VimThreadException) as e:
1262 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1263 error_text = self._format_vim_error_msg(str(e))
1264 task["error_msg"] = error_text
1265 task["status"] = "FAILED"
1266 task["vim_id"] = None
1267 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1268 return False, instance_element_update
1269
1270 def del_classification(self, task):
1271 classification_vim_id = task["vim_id"]
1272 try:
1273 self.vim.delete_classification(classification_vim_id)
1274 task["status"] = "DONE"
1275 task["error_msg"] = None
1276 return True, None
1277
1278 except vimconn.vimconnException as e:
1279 task["error_msg"] = self._format_vim_error_msg(str(e))
1280 if isinstance(e, vimconn.vimconnNotFoundException):
1281 # If not found mark as Done and fill error_msg
1282 task["status"] = "DONE"
1283 return True, None
1284 task["status"] = "FAILED"
1285 return False, None
1286
1287 def new_sfp(self, task):
1288 vim_sfp_id = None
1289 try:
1290 params = task["params"]
1291 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1292 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in task.get("extra").get("depends_on")]
1293 error_text = ""
1294 sf_id_list = []
1295 classification_id_list = []
1296 for dep in depending_tasks:
1297 vim_id = dep.get("vim_id")
1298 resource = dep.get("item")
1299 if resource == "instance_sfs":
1300 sf_id_list.append(vim_id)
1301 elif resource == "instance_classifications":
1302 classification_id_list.append(vim_id)
1303
1304 name = "sfp-%s" % task["item_id"][:8]
1305 # By default no form of IETF SFC Encapsulation will be used
1306 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1307
1308 task["extra"]["created"] = True
1309 task["error_msg"] = None
1310 task["status"] = "DONE"
1311 task["vim_id"] = vim_sfp_id
1312 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1313 return True, instance_element_update
1314
1315 except (vimconn.vimconnException, VimThreadException) as e:
1316 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1317 error_text = self._format_vim_error_msg(str(e))
1318 task["error_msg"] = error_text
1319 task["status"] = "FAILED"
1320 task["vim_id"] = None
1321 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1322 return False, instance_element_update
1323 return
1324
1325 def del_sfp(self, task):
1326 sfp_vim_id = task["vim_id"]
1327 try:
1328 self.vim.delete_sfp(sfp_vim_id)
1329 task["status"] = "DONE"
1330 task["error_msg"] = None
1331 return True, None
1332
1333 except vimconn.vimconnException as e:
1334 task["error_msg"] = self._format_vim_error_msg(str(e))
1335 if isinstance(e, vimconn.vimconnNotFoundException):
1336 # If not found mark as Done and fill error_msg
1337 task["status"] = "DONE"
1338 return True, None
1339 task["status"] = "FAILED"
1340 return False, None