Fix issue where existing VIM network does not get right status
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 sdn_net_id: used for net.
41 tries:
42 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
43 iface_id: uuid of intance_interfaces
44 sdn_port_id:
45 sdn_net_id:
46 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
47 created: False if the VIM element is not created by other actions, and it should not be deleted
48 vim_status: VIM status of the element. Stored also at database in the instance_XXX
49 M depends: dict with task_index(from depends_on) to task class
50 M params: same as extra[params] but with the resolved dependencies
51 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
52 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
53 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
54 MD created_at: task creation time
55 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
56
57 """
58
59 import threading
60 import time
61 import Queue
62 import logging
63 import vimconn
64 import yaml
65 from db_base import db_base_Exception
66 from lib_osm_openvim.ovim import ovimException
67
68 __author__ = "Alfonso Tierno, Pablo Montes"
69 __date__ = "$28-Sep-2017 12:07:15$"
70
71
72 def is_task_id(task_id):
73 return task_id.startswith("TASK-")
74
75
76 class VimThreadException(Exception):
77 pass
78
79
80 class VimThreadExceptionNotFound(VimThreadException):
81 pass
82
83
84 class vim_thread(threading.Thread):
85 REFRESH_BUILD = 5 # 5 seconds
86 REFRESH_ACTIVE = 60 # 1 minute
87
88 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
89 db=None, db_lock=None, ovim=None):
90 """Init a thread.
91 Arguments:
92 'id' number of thead
93 'name' name of thread
94 'host','user': host ip or name to manage and user
95 'db', 'db_lock': database class and lock to use it in exclusion
96 """
97 threading.Thread.__init__(self)
98 if isinstance(myvimconn, vimconn.vimconnException):
99 self.vim = None
100 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
101 else:
102 self.vim = myvimconn
103 self.error_status = None
104 self.datacenter_name = datacenter_name
105 self.datacenter_tenant_id = datacenter_tenant_id
106 self.ovim = ovim
107 if not name:
108 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
109 else:
110 self.name = name
111
112 self.logger = logging.getLogger('openmano.vim.'+self.name)
113 self.db = db
114 self.db_lock = db_lock
115
116 self.task_lock = task_lock
117 self.task_queue = Queue.Queue(2000)
118
119 self.refresh_tasks = []
120 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
121
122 self.pending_tasks = []
123 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
124
125 self.grouped_tasks = {}
126 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
127 <item><item_id>:
128 - <task1> # e.g. CREATE task
129 <task2> # e.g. DELETE task
130 """
131
132 def _reload_vim_actions(self):
133 """
134 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
135 :return: None
136 """
137 try:
138 action_completed = False
139 task_list = []
140 old_action_key = None
141
142 old_item_id = ""
143 old_item = ""
144 old_created_at = 0.0
145 database_limit = 200
146 while True:
147 # get 200 (database_limit) entries each time
148 with self.db_lock:
149 vim_actions = self.db.get_rows(FROM="vim_actions",
150 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
151 "item_id>=": old_item_id},
152 ORDER_BY=("item_id", "item", "created_at",),
153 LIMIT=database_limit)
154 for task in vim_actions:
155 item = task["item"]
156 item_id = task["item_id"]
157
158 # skip the first entries that are already processed in the previous pool of 200
159 if old_item_id:
160 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
161 old_item_id = False # next one will be a new un-processed task
162 continue
163
164 action_key = item + item_id
165 if old_action_key != action_key:
166 if not action_completed and task_list:
167 # This will fill needed task parameters into memory, and insert the task if needed in
168 # self.pending_tasks or self.refresh_tasks
169 try:
170 self._insert_pending_tasks(task_list)
171 except Exception as e:
172 self.logger.critical(
173 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
174 exc_info=True)
175 task_list = []
176 old_action_key = action_key
177 action_completed = False
178 elif action_completed:
179 continue
180
181 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
182 task_list.append(task)
183 elif task["action"] == "DELETE":
184 # action completed because deleted and status is not SCHEDULED. Not needed anything
185 action_completed = True
186 if len(vim_actions) == database_limit:
187 # update variables for get the next database iteration
188 old_item_id = item_id
189 old_item = item
190 old_created_at = task["created_at"]
191 else:
192 break
193 # Last actions group need to be inserted too
194 if not action_completed and task_list:
195 try:
196 self._insert_pending_tasks(task_list)
197 except Exception as e:
198 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
199 exc_info=True)
200 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
201 len(self.pending_tasks), len(self.refresh_tasks)))
202 except Exception as e:
203 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
204
205 def _refres_elements(self):
206 """Call VIM to get VMs and networks status until 10 elements"""
207 now = time.time()
208 nb_processed = 0
209 vm_to_refresh_list = []
210 net_to_refresh_list = []
211 vm_to_refresh_dict = {}
212 net_to_refresh_dict = {}
213 items_to_refresh = 0
214 while self.refresh_tasks:
215 task = self.refresh_tasks[0]
216 with self.task_lock:
217 if task['status'] == 'SUPERSEDED':
218 self.refresh_tasks.pop(0)
219 continue
220 if task['modified_at'] > now:
221 break
222 # task["status"] = "processing"
223 nb_processed += 1
224 self.refresh_tasks.pop(0)
225 if task["item"] == 'instance_vms':
226 if task["vim_id"] not in vm_to_refresh_dict:
227 vm_to_refresh_dict[task["vim_id"]] = [task]
228 vm_to_refresh_list.append(task["vim_id"])
229 else:
230 vm_to_refresh_dict[task["vim_id"]].append(task)
231 elif task["item"] == 'instance_nets':
232 if task["vim_id"] not in net_to_refresh_dict:
233 net_to_refresh_dict[task["vim_id"]] = [task]
234 net_to_refresh_list.append(task["vim_id"])
235 else:
236 net_to_refresh_dict[task["vim_id"]].append(task)
237 else:
238 task_id = task["instance_action_id"] + "." + str(task["task_index"])
239 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
240 items_to_refresh += 1
241 if items_to_refresh == 10:
242 break
243
244 if vm_to_refresh_list:
245 now = time.time()
246 try:
247 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
248 except vimconn.vimconnException as e:
249 # Mark all tasks at VIM_ERROR status
250 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
251 vim_dict = {}
252 for vim_id in vm_to_refresh_list:
253 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
254
255 for vim_id, vim_info in vim_dict.items():
256
257 # look for task
258 for task in vm_to_refresh_dict[vim_id]:
259 task_need_update = False
260 task_id = task["instance_action_id"] + "." + str(task["task_index"])
261 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
262
263 # check and update interfaces
264 task_warning_msg = ""
265 for interface in vim_info.get("interfaces", ()):
266 vim_interface_id = interface["vim_interface_id"]
267 if vim_interface_id not in task["extra"]["interfaces"]:
268 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
269 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
270 continue
271 task_interface = task["extra"]["interfaces"][vim_interface_id]
272 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
273 if task_vim_interface != interface:
274 # delete old port
275 if task_interface.get("sdn_port_id"):
276 try:
277 with self.db_lock:
278 self.ovim.delete_port(task_interface["sdn_port_id"])
279 task_interface["sdn_port_id"] = None
280 task_need_update = True
281 except ovimException as e:
282 error_text = "ovimException deleting external_port={}: {}".format(
283 task_interface["sdn_port_id"], e)
284 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
285 task_warning_msg += error_text
286 # TODO Set error_msg at instance_nets instead of instance VMs
287
288 # Create SDN port
289 sdn_net_id = task_interface.get("sdn_net_id")
290 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
291 sdn_port_name = sdn_net_id + "." + task["vim_id"]
292 sdn_port_name = sdn_port_name[:63]
293 try:
294 with self.db_lock:
295 sdn_port_id = self.ovim.new_external_port(
296 {"compute_node": interface["compute_node"],
297 "pci": interface["pci"],
298 "vlan": interface.get("vlan"),
299 "net_id": sdn_net_id,
300 "region": self.vim["config"]["datacenter_id"],
301 "name": sdn_port_name,
302 "mac": interface.get("mac_address")})
303 task_interface["sdn_port_id"] = sdn_port_id
304 task_need_update = True
305 except (ovimException, Exception) as e:
306 error_text = "ovimException creating new_external_port compute_node={}"\
307 " pci={} vlan={} {}".format(
308 interface["compute_node"],
309 interface["pci"],
310 interface.get("vlan"), e)
311 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
312 task_warning_msg += error_text
313 # TODO Set error_msg at instance_nets instead of instance VMs
314
315 with self.db_lock:
316 self.db.update_rows(
317 'instance_interfaces',
318 UPDATE={"mac_address": interface.get("mac_address"),
319 "ip_address": interface.get("ip_address"),
320 "vim_info": interface.get("vim_info"),
321 "sdn_port_id": task_interface.get("sdn_port_id"),
322 "compute_node": interface.get("compute_node"),
323 "pci": interface.get("pci"),
324 "vlan": interface.get("vlan")},
325 WHERE={'uuid': task_interface["iface_id"]})
326 task["vim_interfaces"][vim_interface_id] = interface
327
328 # check and update task and instance_vms database
329 vim_info_error_msg = None
330 if vim_info.get("error_msg"):
331 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
332 elif task_warning_msg:
333 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
334 task_vim_info = task.get("vim_info")
335 task_error_msg = task.get("error_msg")
336 task_vim_status = task["extra"].get("vim_status")
337 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
338 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
339 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
340 if vim_info.get("vim_info"):
341 temp_dict["vim_info"] = vim_info["vim_info"]
342 with self.db_lock:
343 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
344 task["extra"]["vim_status"] = vim_info["status"]
345 task["error_msg"] = vim_info_error_msg
346 if vim_info.get("vim_info"):
347 task["vim_info"] = vim_info["vim_info"]
348 task_need_update = True
349
350 if task_need_update:
351 with self.db_lock:
352 self.db.update_rows(
353 'vim_actions',
354 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
355 "error_msg": task.get("error_msg"), "modified_at": now},
356 WHERE={'instance_action_id': task['instance_action_id'],
357 'task_index': task['task_index']})
358 if task["extra"].get("vim_status") == "BUILD":
359 self._insert_refresh(task, now + self.REFRESH_BUILD)
360 else:
361 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
362
363 if net_to_refresh_list:
364 now = time.time()
365 try:
366 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
367 except vimconn.vimconnException as e:
368 # Mark all tasks at VIM_ERROR status
369 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
370 vim_dict = {}
371 for vim_id in net_to_refresh_list:
372 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
373
374 for vim_id, vim_info in vim_dict.items():
375 # look for task
376 for task in net_to_refresh_dict[vim_id]:
377 task_id = task["instance_action_id"] + "." + str(task["task_index"])
378 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
379
380 task_vim_info = task.get("vim_info")
381 task_vim_status = task["extra"].get("vim_status")
382 task_error_msg = task.get("error_msg")
383 task_sdn_net_id = task["extra"].get("sdn_net_id")
384
385 vim_info_status = vim_info["status"]
386 vim_info_error_msg = vim_info.get("error_msg")
387 # get ovim status
388 if task_sdn_net_id:
389 try:
390 with self.db_lock:
391 sdn_net = self.ovim.show_network(task_sdn_net_id)
392 except (ovimException, Exception) as e:
393 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
394 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
395 sdn_net = {"status": "ERROR", "error_msg": text_error}
396 if sdn_net["status"] == "ERROR":
397 if not vim_info_error_msg:
398 vim_info_error_msg = sdn_net["error_msg"]
399 else:
400 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
401 self._format_vim_error_msg(vim_info_error_msg, 1024//2-14),
402 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
403 if vim_info_status == "VIM_ERROR":
404 vim_info_status = "VIM_SDN_ERROR"
405 else:
406 vim_info_status = "SDN_ERROR"
407
408 # update database
409 if vim_info_error_msg:
410 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
411 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
412 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
413 task["extra"]["vim_status"] = vim_info_status
414 task["error_msg"] = vim_info_error_msg
415 if vim_info.get("vim_info"):
416 task["vim_info"] = vim_info["vim_info"]
417 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
418 if vim_info.get("vim_info"):
419 temp_dict["vim_info"] = vim_info["vim_info"]
420 with self.db_lock:
421 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
422 self.db.update_rows(
423 'vim_actions',
424 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
425 "error_msg": task.get("error_msg"), "modified_at": now},
426 WHERE={'instance_action_id': task['instance_action_id'],
427 'task_index': task['task_index']})
428 if task["extra"].get("vim_status") == "BUILD":
429 self._insert_refresh(task, now + self.REFRESH_BUILD)
430 else:
431 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
432
433 return nb_processed
434
435 def _insert_refresh(self, task, threshold_time=None):
436 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
437 It is assumed that this is called inside this thread
438 """
439 if not self.vim:
440 return
441 if not threshold_time:
442 threshold_time = time.time()
443 task["modified_at"] = threshold_time
444 task_name = task["item"][9:] + "-" + task["action"]
445 task_id = task["instance_action_id"] + "." + str(task["task_index"])
446 for index in range(0, len(self.refresh_tasks)):
447 if self.refresh_tasks[index]["modified_at"] > threshold_time:
448 self.refresh_tasks.insert(index, task)
449 break
450 else:
451 index = len(self.refresh_tasks)
452 self.refresh_tasks.append(task)
453 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
454 task_id, task_name, task["modified_at"], index))
455
456 def _remove_refresh(self, task_name, vim_id):
457 """Remove a task with this name and vim_id from the list of refreshing elements.
458 It is assumed that this is called inside this thread outside _refres_elements method
459 Return True if self.refresh_list is modified, task is found
460 Return False if not found
461 """
462 index_to_delete = None
463 for index in range(0, len(self.refresh_tasks)):
464 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
465 index_to_delete = index
466 break
467 else:
468 return False
469 if not index_to_delete:
470 del self.refresh_tasks[index_to_delete]
471 return True
472
473 def _proccess_pending_tasks(self):
474 nb_created = 0
475 nb_processed = 0
476 while self.pending_tasks:
477 task = self.pending_tasks.pop(0)
478 nb_processed += 1
479 try:
480 # check if tasks that this depends on have been completed
481 dependency_not_completed = False
482 for task_index in task["extra"].get("depends_on", ()):
483 task_dependency = task["depends"].get("TASK-" + str(task_index))
484 if not task_dependency:
485 task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
486 if not task_dependency:
487 raise VimThreadException(
488 "Cannot get depending net task trying to get depending task {}.{}".format(
489 task["instance_action_id"], task_index))
490 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
491 if task_dependency["status"] == "SCHEDULED":
492 dependency_not_completed = True
493 break
494 elif task_dependency["status"] == "FAILED":
495 raise VimThreadException(
496 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
497 task["action"], task["item"],
498 task["instance_action_id"], task["task_index"],
499 task_dependency["instance_action_id"], task_dependency["task_index"],
500 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
501 if dependency_not_completed:
502 # Move this task to the end.
503 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
504 if task["extra"]["tries"] <= 3:
505 self.pending_tasks.append(task)
506 continue
507 else:
508 raise VimThreadException(
509 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
510 "(task {}.{})".format(task["action"], task["item"],
511 task["instance_action_id"], task["task_index"],
512 task_dependency["instance_action_id"], task_dependency["task_index"],
513 task_dependency["action"], task_dependency["item"]))
514
515 if task["status"] == "SUPERSEDED":
516 # not needed to do anything but update database with the new status
517 result = True
518 database_update = None
519 elif not self.vim:
520 task["status"] = "ERROR"
521 task["error_msg"] = self.error_status
522 result = False
523 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
524 elif task["item"] == 'instance_vms':
525 if task["action"] == "CREATE":
526 result, database_update = self.new_vm(task)
527 nb_created += 1
528 elif task["action"] == "DELETE":
529 result, database_update = self.del_vm(task)
530 else:
531 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
532 elif task["item"] == 'instance_nets':
533 if task["action"] == "CREATE":
534 result, database_update = self.new_net(task)
535 nb_created += 1
536 elif task["action"] == "DELETE":
537 result, database_update = self.del_net(task)
538 elif task["action"] == "FIND":
539 result, database_update = self.get_net(task)
540 else:
541 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
542 elif task["item"] == 'instance_sfis':
543 if task["action"] == "CREATE":
544 result, database_update = self.new_sfi(task)
545 nb_created += 1
546 elif task["action"] == "DELETE":
547 result, database_update = self.del_sfi(task)
548 else:
549 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
550 elif task["item"] == 'instance_sfs':
551 if task["action"] == "CREATE":
552 result, database_update = self.new_sf(task)
553 nb_created += 1
554 elif task["action"] == "DELETE":
555 result, database_update = self.del_sf(task)
556 else:
557 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
558 elif task["item"] == 'instance_classifications':
559 if task["action"] == "CREATE":
560 result, database_update = self.new_classification(task)
561 nb_created += 1
562 elif task["action"] == "DELETE":
563 result, database_update = self.del_classification(task)
564 else:
565 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
566 elif task["item"] == 'instance_sfps':
567 if task["action"] == "CREATE":
568 result, database_update = self.new_sfp(task)
569 nb_created += 1
570 elif task["action"] == "DELETE":
571 result, database_update = self.del_sfp(task)
572 else:
573 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
574 else:
575 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
576 # TODO
577 except VimThreadException as e:
578 result = False
579 task["error_msg"] = str(e)
580 task["status"] = "FAILED"
581 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
582 if task["item"] == 'instance_vms':
583 database_update["vim_vm_id"] = None
584 elif task["item"] == 'instance_nets':
585 database_update["vim_net_id"] = None
586
587 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
588 'instance_classifications', 'instance_sfps']
589 if task["action"] == "DELETE":
590 action_key = task["item"] + task["item_id"]
591 del self.grouped_tasks[action_key]
592 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
593 if task["item"] not in no_refresh_tasks:
594 self._insert_refresh(task)
595
596 task_id = task["instance_action_id"] + "." + str(task["task_index"])
597 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
598 task_id, task["item"], task["action"], task["status"],
599 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
600 try:
601 now = time.time()
602 with self.db_lock:
603 self.db.update_rows(
604 table="vim_actions",
605 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
606 "error_msg": task["error_msg"],
607 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
608 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
609 if result is not None:
610 self.db.update_rows(
611 table="instance_actions",
612 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
613 "modified_at": now},
614 WHERE={"uuid": task["instance_action_id"]})
615 if database_update:
616 self.db.update_rows(table=task["item"],
617 UPDATE=database_update,
618 WHERE={"uuid": task["item_id"]})
619 except db_base_Exception as e:
620 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
621
622 if nb_created == 10:
623 break
624 return nb_processed
625
626 def _insert_pending_tasks(self, vim_actions_list):
627 for task in vim_actions_list:
628 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
629 continue
630 item = task["item"]
631 item_id = task["item_id"]
632 action_key = item + item_id
633 if action_key not in self.grouped_tasks:
634 self.grouped_tasks[action_key] = []
635 task["params"] = None
636 task["depends"] = {}
637 if task["extra"]:
638 extra = yaml.load(task["extra"])
639 task["extra"] = extra
640 task["params"] = extra.get("params")
641 depends_on_list = extra.get("depends_on")
642 if depends_on_list:
643 for index in depends_on_list:
644 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
645 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
646 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
647 if extra.get("interfaces"):
648 task["vim_interfaces"] = {}
649 else:
650 task["extra"] = {}
651 if "error_msg" not in task:
652 task["error_msg"] = None
653 if "vim_id" not in task:
654 task["vim_id"] = None
655
656 if task["action"] == "DELETE":
657 need_delete_action = False
658 for to_supersede in self.grouped_tasks.get(action_key, ()):
659 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
660 task["vim_id"] = to_supersede["vim_id"]
661 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
662 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
663 need_delete_action = True
664 task["vim_id"] = to_supersede["vim_id"]
665 if to_supersede["extra"].get("sdn_net_id"):
666 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
667 if to_supersede["extra"].get("interfaces"):
668 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
669 if to_supersede["extra"].get("created_items"):
670 if not task["extra"].get("created_items"):
671 task["extra"]["created_items"] = {}
672 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
673 # Mark task as SUPERSEDED.
674 # If task is in self.pending_tasks, it will be removed and database will be update
675 # If task is in self.refresh_tasks, it will be removed
676 to_supersede["status"] = "SUPERSEDED"
677 if not need_delete_action:
678 task["status"] = "SUPERSEDED"
679
680 self.grouped_tasks[action_key].append(task)
681 self.pending_tasks.append(task)
682 elif task["status"] == "SCHEDULED":
683 self.grouped_tasks[action_key].append(task)
684 self.pending_tasks.append(task)
685 elif task["action"] in ("CREATE", "FIND"):
686 self.grouped_tasks[action_key].append(task)
687 if task["status"] in ("DONE", "BUILD"):
688 self._insert_refresh(task)
689 # TODO add VM reset, get console, etc...
690 else:
691 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
692
693 def insert_task(self, task):
694 try:
695 self.task_queue.put(task, False)
696 return None
697 except Queue.Full:
698 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
699
700 def del_task(self, task):
701 with self.task_lock:
702 if task["status"] == "SCHEDULED":
703 task["status"] = "SUPERSEDED"
704 return True
705 else: # task["status"] == "processing"
706 self.task_lock.release()
707 return False
708
709 def run(self):
710 self.logger.debug("Starting")
711 while True:
712 self._reload_vim_actions()
713 reload_thread = False
714
715 while True:
716 try:
717 while not self.task_queue.empty():
718 task = self.task_queue.get()
719 if isinstance(task, list):
720 self._insert_pending_tasks(task)
721 elif isinstance(task, str):
722 if task == 'exit':
723 return 0
724 elif task == 'reload':
725 reload_thread = True
726 break
727 self.task_queue.task_done()
728 if reload_thread:
729 break
730 nb_processed = self._proccess_pending_tasks()
731 nb_processed += self._refres_elements()
732 if not nb_processed:
733 time.sleep(1)
734
735 except Exception as e:
736 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
737
738 self.logger.debug("Finishing")
739
740 def _look_for_task(self, instance_action_id, task_id):
741 task_index = task_id.split("-")[-1]
742 with self.db_lock:
743 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
744 "task_index": task_index})
745 if not tasks:
746 return None
747 task = tasks[0]
748 task["params"] = None
749 task["depends"] = {}
750 if task["extra"]:
751 extra = yaml.load(task["extra"])
752 task["extra"] = extra
753 task["params"] = extra.get("params")
754 if extra.get("interfaces"):
755 task["vim_interfaces"] = {}
756 else:
757 task["extra"] = {}
758 return task
759
760 @staticmethod
761 def _format_vim_error_msg(error_text, max_length=1024):
762 if error_text and len(error_text) >= max_length:
763 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
764 return error_text
765
766 def new_vm(self, task):
767 task_id = task["instance_action_id"] + "." + str(task["task_index"])
768 try:
769 params = task["params"]
770 depends = task.get("depends")
771 net_list = params[5]
772 for net in net_list:
773 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
774 task_dependency = task["depends"].get(net["net_id"])
775 if not task_dependency:
776 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
777 if not task_dependency:
778 raise VimThreadException(
779 "Cannot get depending net task trying to get depending task {}.{}".format(
780 task["instance_action_id"], net["net_id"]))
781 network_id = task_dependency.get("vim_id")
782 if not network_id:
783 raise VimThreadException(
784 "Cannot create VM because depends on a network not created or found: " +
785 str(depends[net["net_id"]]["error_msg"]))
786 net["net_id"] = network_id
787 vim_vm_id, created_items = self.vim.new_vminstance(*params)
788
789 # fill task_interfaces. Look for snd_net_id at database for each interface
790 task_interfaces = {}
791 for iface in net_list:
792 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
793 with self.db_lock:
794 result = self.db.get_rows(
795 SELECT=('sdn_net_id',),
796 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
797 WHERE={'ii.uuid': iface["uuid"]})
798 if result:
799 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
800 else:
801 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
802 iface["uuid"]), exc_info=True)
803
804 task["vim_info"] = {}
805 task["vim_interfaces"] = {}
806 task["extra"]["interfaces"] = task_interfaces
807 task["extra"]["created"] = True
808 task["extra"]["created_items"] = created_items
809 task["error_msg"] = None
810 task["status"] = "DONE"
811 task["vim_id"] = vim_vm_id
812 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
813 return True, instance_element_update
814
815 except (vimconn.vimconnException, VimThreadException) as e:
816 self.logger.error("task={} new-VM: {}".format(task_id, e))
817 error_text = self._format_vim_error_msg(str(e))
818 task["error_msg"] = error_text
819 task["status"] = "FAILED"
820 task["vim_id"] = None
821 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
822 return False, instance_element_update
823
824 def del_vm(self, task):
825 task_id = task["instance_action_id"] + "." + str(task["task_index"])
826 vm_vim_id = task["vim_id"]
827 interfaces = task["extra"].get("interfaces", ())
828 try:
829 for iface in interfaces.values():
830 if iface.get("sdn_port_id"):
831 try:
832 with self.db_lock:
833 self.ovim.delete_port(iface["sdn_port_id"])
834 except ovimException as e:
835 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
836 task_id, iface["sdn_port_id"], e), exc_info=True)
837 # TODO Set error_msg at instance_nets
838
839 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
840 task["status"] = "DONE"
841 task["error_msg"] = None
842 return True, None
843
844 except vimconn.vimconnException as e:
845 task["error_msg"] = self._format_vim_error_msg(str(e))
846 if isinstance(e, vimconn.vimconnNotFoundException):
847 # If not found mark as Done and fill error_msg
848 task["status"] = "DONE"
849 return True, None
850 task["status"] = "FAILED"
851 return False, None
852
853 def _get_net_internal(self, task, filter_param):
854 """
855 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
856 :param task: task for this find or find-or-create action
857 :param filter_param: parameters to send to the vimconnector
858 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
859 when network is not found or found more than one
860 """
861 vim_nets = self.vim.get_network_list(filter_param)
862 if not vim_nets:
863 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
864 elif len(vim_nets) > 1:
865 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
866 vim_net_id = vim_nets[0]["id"]
867
868 # Discover if this network is managed by a sdn controller
869 sdn_net_id = None
870 with self.db_lock:
871 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
872 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
873 'datacenter_tenant_id': self.datacenter_tenant_id})
874 if result:
875 sdn_net_id = result[0]['sdn_net_id']
876
877 task["status"] = "DONE"
878 task["extra"]["vim_info"] = {}
879 task["extra"]["created"] = False
880 task["extra"]["sdn_net_id"] = sdn_net_id
881 task["error_msg"] = None
882 task["vim_id"] = vim_net_id
883 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
884 "error_msg": None, "sdn_net_id": sdn_net_id}
885 return instance_element_update
886
887 def get_net(self, task):
888 task_id = task["instance_action_id"] + "." + str(task["task_index"])
889 try:
890 params = task["params"]
891 filter_param = params[0]
892 instance_element_update = self._get_net_internal(task, filter_param)
893 return True, instance_element_update
894
895 except (vimconn.vimconnException, VimThreadException) as e:
896 self.logger.error("task={} get-net: {}".format(task_id, e))
897 task["status"] = "FAILED"
898 task["vim_id"] = None
899 task["error_msg"] = self._format_vim_error_msg(str(e))
900 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
901 "error_msg": task["error_msg"]}
902 return False, instance_element_update
903
904 def new_net(self, task):
905 vim_net_id = None
906 sdn_net_id = None
907 task_id = task["instance_action_id"] + "." + str(task["task_index"])
908 action_text = ""
909 try:
910 # FIND
911 if task["extra"].get("find"):
912 action_text = "finding"
913 filter_param = task["extra"]["find"][0]
914 try:
915 instance_element_update = self._get_net_internal(task, filter_param)
916 return True, instance_element_update
917 except VimThreadExceptionNotFound:
918 pass
919 # CREATE
920 params = task["params"]
921 action_text = "creating VIM"
922 vim_net_id = self.vim.new_network(*params)
923
924 net_name = params[0]
925 net_type = params[1]
926
927 sdn_controller = self.vim.config.get('sdn-controller')
928 if sdn_controller and (net_type == "data" or net_type == "ptp"):
929 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
930
931 vim_net = self.vim.get_network(vim_net_id)
932 if vim_net.get('encapsulation') != 'vlan':
933 raise vimconn.vimconnException(
934 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
935 net_name, net_type, vim_net['encapsulation']))
936 network["vlan"] = vim_net.get('segmentation_id')
937 action_text = "creating SDN"
938 with self.db_lock:
939 sdn_net_id = self.ovim.new_network(network)
940 task["status"] = "DONE"
941 task["extra"]["vim_info"] = {}
942 task["extra"]["sdn_net_id"] = sdn_net_id
943 task["extra"]["created"] = True
944 task["error_msg"] = None
945 task["vim_id"] = vim_net_id
946 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
947 "created": True, "error_msg": None}
948 return True, instance_element_update
949 except (vimconn.vimconnException, ovimException) as e:
950 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
951 task["status"] = "FAILED"
952 task["vim_id"] = vim_net_id
953 task["error_msg"] = self._format_vim_error_msg(str(e))
954 task["extra"]["sdn_net_id"] = sdn_net_id
955 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
956 "error_msg": task["error_msg"]}
957 return False, instance_element_update
958
959 def del_net(self, task):
960 net_vim_id = task["vim_id"]
961 sdn_net_id = task["extra"].get("sdn_net_id")
962 try:
963 if sdn_net_id:
964 # Delete any attached port to this sdn network. There can be ports associated to this network in case
965 # it was manually done using 'openmano vim-net-sdn-attach'
966 with self.db_lock:
967 port_list = self.ovim.get_ports(columns={'uuid'},
968 filter={'name': 'external_port', 'net_id': sdn_net_id})
969 for port in port_list:
970 self.ovim.delete_port(port['uuid'])
971 self.ovim.delete_network(sdn_net_id)
972 if net_vim_id:
973 self.vim.delete_network(net_vim_id)
974 task["status"] = "DONE"
975 task["error_msg"] = None
976 return True, None
977 except ovimException as e:
978 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
979 "ports for net {}: {}".format(sdn_net_id, str(e)))
980 except vimconn.vimconnException as e:
981 task["error_msg"] = self._format_vim_error_msg(str(e))
982 if isinstance(e, vimconn.vimconnNotFoundException):
983 # If not found mark as Done and fill error_msg
984 task["status"] = "DONE"
985 return True, None
986 task["status"] = "FAILED"
987 return False, None
988
989 ## Service Function Instances
990
991 def new_sfi(self, task):
992 vim_sfi_id = None
993 try:
994 params = task["params"]
995 task_id = task["instance_action_id"] + "." + str(task["task_index"])
996 depends = task.get("depends")
997 error_text = ""
998 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
999 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1000 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1001 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1002 port_id_list = [interfaces[0].get("vim_id")]
1003 name = "sfi-%s" % task["item_id"][:8]
1004 # By default no form of IETF SFC Encapsulation will be used
1005 vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False)
1006
1007 task["extra"]["created"] = True
1008 task["error_msg"] = None
1009 task["status"] = "DONE"
1010 task["vim_id"] = vim_sfi_id
1011 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1012 return True, instance_element_update
1013
1014 except (vimconn.vimconnException, VimThreadException) as e:
1015 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1016 error_text = self._format_vim_error_msg(str(e))
1017 task["error_msg"] = error_text
1018 task["status"] = "FAILED"
1019 task["vim_id"] = None
1020 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1021 return False, instance_element_update
1022
1023 def del_sfi(self, task):
1024 sfi_vim_id = task["vim_id"]
1025 try:
1026 self.vim.delete_sfi(sfi_vim_id)
1027 task["status"] = "DONE"
1028 task["error_msg"] = None
1029 return True, None
1030
1031 except vimconn.vimconnException as e:
1032 task["error_msg"] = self._format_vim_error_msg(str(e))
1033 if isinstance(e, vimconn.vimconnNotFoundException):
1034 # If not found mark as Done and fill error_msg
1035 task["status"] = "DONE"
1036 return True, None
1037 task["status"] = "FAILED"
1038 return False, None
1039
1040 def new_sf(self, task):
1041 vim_sf_id = None
1042 try:
1043 params = task["params"]
1044 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1045 depends = task.get("depends")
1046 error_text = ""
1047 #sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1048 sfis = task.get("depends").values()
1049 sfi_id_list = []
1050 for sfi in sfis:
1051 sfi_id_list.append(sfi.get("vim_id"))
1052 name = "sf-%s" % task["item_id"][:8]
1053 # By default no form of IETF SFC Encapsulation will be used
1054 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1055
1056 task["extra"]["created"] = True
1057 task["error_msg"] = None
1058 task["status"] = "DONE"
1059 task["vim_id"] = vim_sf_id
1060 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1061 return True, instance_element_update
1062
1063 except (vimconn.vimconnException, VimThreadException) as e:
1064 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1065 error_text = self._format_vim_error_msg(str(e))
1066 task["error_msg"] = error_text
1067 task["status"] = "FAILED"
1068 task["vim_id"] = None
1069 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1070 return False, instance_element_update
1071
1072 def del_sf(self, task):
1073 sf_vim_id = task["vim_id"]
1074 try:
1075 self.vim.delete_sf(sf_vim_id)
1076 task["status"] = "DONE"
1077 task["error_msg"] = None
1078 return True, None
1079
1080 except vimconn.vimconnException as e:
1081 task["error_msg"] = self._format_vim_error_msg(str(e))
1082 if isinstance(e, vimconn.vimconnNotFoundException):
1083 # If not found mark as Done and fill error_msg
1084 task["status"] = "DONE"
1085 return True, None
1086 task["status"] = "FAILED"
1087 return False, None
1088
1089 def new_classification(self, task):
1090 vim_classification_id = None
1091 try:
1092 params = task["params"]
1093 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1094 depends = task.get("depends")
1095 error_text = ""
1096 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1097 # Bear in mind that different VIM connectors might support Classifications differently.
1098 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1099 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1100 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1101 # using the IPv4 flow classifier.
1102 name = "c-%s" % task["item_id"][:8]
1103 # if not CIDR is given for the IP addresses, add /32:
1104 ip_proto = int(params.get("ip_proto"))
1105 source_ip = params.get("source_ip")
1106 destination_ip = params.get("destination_ip")
1107 if ip_proto == 1:
1108 ip_proto = 'icmp'
1109 elif ip_proto == 6:
1110 ip_proto = 'tcp'
1111 elif ip_proto == 17:
1112 ip_proto = 'udp'
1113 if '/' not in source_ip:
1114 source_ip += '/32'
1115 if '/' not in destination_ip:
1116 destination_ip += '/32'
1117 definition = {
1118 "logical_source_port": interfaces[0].get("vim_id"),
1119 "protocol": ip_proto,
1120 "source_ip_prefix": source_ip,
1121 "destination_ip_prefix": destination_ip,
1122 "source_port_range_min": params.get("source_port"),
1123 "source_port_range_max": params.get("source_port"),
1124 "destination_port_range_min": params.get("destination_port"),
1125 "destination_port_range_max": params.get("destination_port"),
1126 }
1127
1128 vim_classification_id = self.vim.new_classification(
1129 name, 'legacy_flow_classifier', definition)
1130
1131 task["extra"]["created"] = True
1132 task["error_msg"] = None
1133 task["status"] = "DONE"
1134 task["vim_id"] = vim_classification_id
1135 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1136 return True, instance_element_update
1137
1138 except (vimconn.vimconnException, VimThreadException) as e:
1139 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1140 error_text = self._format_vim_error_msg(str(e))
1141 task["error_msg"] = error_text
1142 task["status"] = "FAILED"
1143 task["vim_id"] = None
1144 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1145 return False, instance_element_update
1146
1147 def del_classification(self, task):
1148 classification_vim_id = task["vim_id"]
1149 try:
1150 self.vim.delete_classification(classification_vim_id)
1151 task["status"] = "DONE"
1152 task["error_msg"] = None
1153 return True, None
1154
1155 except vimconn.vimconnException as e:
1156 task["error_msg"] = self._format_vim_error_msg(str(e))
1157 if isinstance(e, vimconn.vimconnNotFoundException):
1158 # If not found mark as Done and fill error_msg
1159 task["status"] = "DONE"
1160 return True, None
1161 task["status"] = "FAILED"
1162 return False, None
1163
1164 def new_sfp(self, task):
1165 vim_sfp_id = None
1166 try:
1167 params = task["params"]
1168 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1169 depends = task.get("depends")
1170 error_text = ""
1171 deps = task.get("depends").values()
1172 sf_id_list = []
1173 classification_id_list = []
1174 for dep in deps:
1175 vim_id = dep.get("vim_id")
1176 resource = dep.get("item")
1177 if resource == "instance_sfs":
1178 sf_id_list.append(vim_id)
1179 elif resource == "instance_classifications":
1180 classification_id_list.append(vim_id)
1181
1182 name = "sfp-%s" % task["item_id"][:8]
1183 # By default no form of IETF SFC Encapsulation will be used
1184 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1185
1186 task["extra"]["created"] = True
1187 task["error_msg"] = None
1188 task["status"] = "DONE"
1189 task["vim_id"] = vim_sfp_id
1190 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1191 return True, instance_element_update
1192
1193 except (vimconn.vimconnException, VimThreadException) as e:
1194 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1195 error_text = self._format_vim_error_msg(str(e))
1196 task["error_msg"] = error_text
1197 task["status"] = "FAILED"
1198 task["vim_id"] = None
1199 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1200 return False, instance_element_update
1201 return
1202
1203 def del_sfp(self, task):
1204 sfp_vim_id = task["vim_id"]
1205 try:
1206 self.vim.delete_sfp(sfp_vim_id)
1207 task["status"] = "DONE"
1208 task["error_msg"] = None
1209 return True, None
1210
1211 except vimconn.vimconnException as e:
1212 task["error_msg"] = self._format_vim_error_msg(str(e))
1213 if isinstance(e, vimconn.vimconnNotFoundException):
1214 # If not found mark as Done and fill error_msg
1215 task["status"] = "DONE"
1216 return True, None
1217 task["status"] = "FAILED"
1218 return False, None