Merge "Add N2VC support"
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 sdn_net_id: used for net.
41 tries:
42 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
43 iface_id: uuid of intance_interfaces
44 sdn_port_id:
45 sdn_net_id:
46 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
47 created: False if the VIM element is not created by other actions, and it should not be deleted
48 vim_status: VIM status of the element. Stored also at database in the instance_XXX
49 M depends: dict with task_index(from depends_on) to task class
50 M params: same as extra[params] but with the resolved dependencies
51 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
52 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
53 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
54 MD created_at: task creation time
55 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
56
57 """
58
59 import threading
60 import time
61 import Queue
62 import logging
63 import vimconn
64 import yaml
65 from db_base import db_base_Exception
66 from lib_osm_openvim.ovim import ovimException
67
68 __author__ = "Alfonso Tierno, Pablo Montes"
69 __date__ = "$28-Sep-2017 12:07:15$"
70
71
72 def is_task_id(task_id):
73 return task_id.startswith("TASK-")
74
75
76 class VimThreadException(Exception):
77 pass
78
79
80 class VimThreadExceptionNotFound(VimThreadException):
81 pass
82
83
84 class vim_thread(threading.Thread):
85 REFRESH_BUILD = 5 # 5 seconds
86 REFRESH_ACTIVE = 60 # 1 minute
87
88 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
89 db=None, db_lock=None, ovim=None):
90 """Init a thread.
91 Arguments:
92 'id' number of thead
93 'name' name of thread
94 'host','user': host ip or name to manage and user
95 'db', 'db_lock': database class and lock to use it in exclusion
96 """
97 threading.Thread.__init__(self)
98 if isinstance(myvimconn, vimconn.vimconnException):
99 self.vim = None
100 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
101 else:
102 self.vim = myvimconn
103 self.error_status = None
104 self.datacenter_name = datacenter_name
105 self.datacenter_tenant_id = datacenter_tenant_id
106 self.ovim = ovim
107 if not name:
108 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
109 else:
110 self.name = name
111
112 self.logger = logging.getLogger('openmano.vim.'+self.name)
113 self.db = db
114 self.db_lock = db_lock
115
116 self.task_lock = task_lock
117 self.task_queue = Queue.Queue(2000)
118
119 self.refresh_tasks = []
120 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
121
122 self.pending_tasks = []
123 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
124
125 self.grouped_tasks = {}
126 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
127 <item><item_id>:
128 - <task1> # e.g. CREATE task
129 <task2> # e.g. DELETE task
130 """
131
132 def _reload_vim_actions(self):
133 """
134 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
135 :return: None
136 """
137 try:
138 action_completed = False
139 task_list = []
140 old_action_key = None
141
142 old_item_id = ""
143 old_item = ""
144 old_created_at = 0.0
145 database_limit = 200
146 while True:
147 # get 200 (database_limit) entries each time
148 with self.db_lock:
149 vim_actions = self.db.get_rows(FROM="vim_actions",
150 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
151 "item_id>=": old_item_id},
152 ORDER_BY=("item_id", "item", "created_at",),
153 LIMIT=database_limit)
154 for task in vim_actions:
155 item = task["item"]
156 item_id = task["item_id"]
157
158 # skip the first entries that are already processed in the previous pool of 200
159 if old_item_id:
160 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
161 old_item_id = False # next one will be a new un-processed task
162 continue
163
164 action_key = item + item_id
165 if old_action_key != action_key:
166 if not action_completed and task_list:
167 # This will fill needed task parameters into memory, and insert the task if needed in
168 # self.pending_tasks or self.refresh_tasks
169 try:
170 self._insert_pending_tasks(task_list)
171 except Exception as e:
172 self.logger.critical(
173 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
174 exc_info=True)
175 task_list = []
176 old_action_key = action_key
177 action_completed = False
178 elif action_completed:
179 continue
180
181 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
182 task_list.append(task)
183 elif task["action"] == "DELETE":
184 # action completed because deleted and status is not SCHEDULED. Not needed anything
185 action_completed = True
186 if len(vim_actions) == database_limit:
187 # update variables for get the next database iteration
188 old_item_id = item_id
189 old_item = item
190 old_created_at = task["created_at"]
191 else:
192 break
193 # Last actions group need to be inserted too
194 if not action_completed and task_list:
195 try:
196 self._insert_pending_tasks(task_list)
197 except Exception as e:
198 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
199 exc_info=True)
200 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
201 len(self.pending_tasks), len(self.refresh_tasks)))
202 except Exception as e:
203 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
204
205 def _refres_elements(self):
206 """Call VIM to get VMs and networks status until 10 elements"""
207 now = time.time()
208 nb_processed = 0
209 vm_to_refresh_list = []
210 net_to_refresh_list = []
211 vm_to_refresh_dict = {}
212 net_to_refresh_dict = {}
213 items_to_refresh = 0
214 while self.refresh_tasks:
215 task = self.refresh_tasks[0]
216 with self.task_lock:
217 if task['status'] == 'SUPERSEDED':
218 self.refresh_tasks.pop(0)
219 continue
220 if task['modified_at'] > now:
221 break
222 # task["status"] = "processing"
223 nb_processed += 1
224 self.refresh_tasks.pop(0)
225 if task["item"] == 'instance_vms':
226 vm_to_refresh_list.append(task["vim_id"])
227 vm_to_refresh_dict[task["vim_id"]] = task
228 elif task["item"] == 'instance_nets':
229 net_to_refresh_list.append(task["vim_id"])
230 net_to_refresh_dict[task["vim_id"]] = task
231 else:
232 task_id = task["instance_action_id"] + "." + str(task["task_index"])
233 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
234 items_to_refresh += 1
235 if items_to_refresh == 10:
236 break
237
238 if vm_to_refresh_list:
239 now = time.time()
240 try:
241 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
242 except vimconn.vimconnException as e:
243 # Mark all tasks at VIM_ERROR status
244 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
245 vim_dict = {}
246 for vim_id in vm_to_refresh_list:
247 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
248
249 for vim_id, vim_info in vim_dict.items():
250 # look for task
251 task_need_update = False
252 task = vm_to_refresh_dict[vim_id]
253 task_id = task["instance_action_id"] + "." + str(task["task_index"])
254 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
255
256 # check and update interfaces
257 task_warning_msg = ""
258 for interface in vim_info.get("interfaces", ()):
259 vim_interface_id = interface["vim_interface_id"]
260 if vim_interface_id not in task["extra"]["interfaces"]:
261 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
262 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
263 continue
264 task_interface = task["extra"]["interfaces"][vim_interface_id]
265 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
266 if task_vim_interface != interface:
267 # delete old port
268 if task_interface.get("sdn_port_id"):
269 try:
270 with self.db_lock:
271 self.ovim.delete_port(task_interface["sdn_port_id"])
272 task_interface["sdn_port_id"] = None
273 task_need_update = True
274 except ovimException as e:
275 error_text = "ovimException deleting external_port={}: {}".format(
276 task_interface["sdn_port_id"], e)
277 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
278 task_warning_msg += error_text
279 # TODO Set error_msg at instance_nets instead of instance VMs
280
281 # Create SDN port
282 sdn_net_id = task_interface.get("sdn_net_id")
283 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
284 sdn_port_name = sdn_net_id + "." + task["vim_id"]
285 sdn_port_name = sdn_port_name[:63]
286 try:
287 with self.db_lock:
288 sdn_port_id = self.ovim.new_external_port(
289 {"compute_node": interface["compute_node"],
290 "pci": interface["pci"],
291 "vlan": interface.get("vlan"),
292 "net_id": sdn_net_id,
293 "region": self.vim["config"]["datacenter_id"],
294 "name": sdn_port_name,
295 "mac": interface.get("mac_address")})
296 task_interface["sdn_port_id"] = sdn_port_id
297 task_need_update = True
298 except (ovimException, Exception) as e:
299 error_text = "ovimException creating new_external_port compute_node={}"\
300 " pci={} vlan={} {}".format(
301 interface["compute_node"],
302 interface["pci"],
303 interface.get("vlan"), e)
304 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
305 task_warning_msg += error_text
306 # TODO Set error_msg at instance_nets instead of instance VMs
307
308 with self.db_lock:
309 self.db.update_rows(
310 'instance_interfaces',
311 UPDATE={"mac_address": interface.get("mac_address"),
312 "ip_address": interface.get("ip_address"),
313 "vim_info": interface.get("vim_info"),
314 "sdn_port_id": task_interface.get("sdn_port_id"),
315 "compute_node": interface.get("compute_node"),
316 "pci": interface.get("pci"),
317 "vlan": interface.get("vlan")},
318 WHERE={'uuid': task_interface["iface_id"]})
319 task["vim_interfaces"][vim_interface_id] = interface
320
321 # check and update task and instance_vms database
322 if vim_info.get("error_msg"):
323 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
324 elif task_warning_msg:
325 vim_info["error_msg"] = self._format_vim_error_msg(task_warning_msg)
326
327 task_vim_info = task.get("vim_info")
328 task_error_msg = task.get("error_msg")
329 task_vim_status = task["extra"].get("vim_status")
330 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
331 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
332 temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
333 if vim_info.get("vim_info"):
334 temp_dict["vim_info"] = vim_info["vim_info"]
335 with self.db_lock:
336 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
337 task["extra"]["vim_status"] = vim_info["status"]
338 task["error_msg"] = vim_info.get("error_msg")
339 if vim_info.get("vim_info"):
340 task["vim_info"] = vim_info["vim_info"]
341 task_need_update = True
342
343 if task_need_update:
344 with self.db_lock:
345 self.db.update_rows(
346 'vim_actions',
347 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
348 "error_msg": task.get("error_msg"), "modified_at": now},
349 WHERE={'instance_action_id': task['instance_action_id'],
350 'task_index': task['task_index']})
351 if task["extra"].get("vim_status") == "BUILD":
352 self._insert_refresh(task, now + self.REFRESH_BUILD)
353 else:
354 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
355
356 if net_to_refresh_list:
357 now = time.time()
358 try:
359 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
360 except vimconn.vimconnException as e:
361 # Mark all tasks at VIM_ERROR status
362 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
363 vim_dict = {}
364 for vim_id in net_to_refresh_list:
365 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
366
367 for vim_id, vim_info in vim_dict.items():
368 # look for task
369 task = net_to_refresh_dict[vim_id]
370 task_id = task["instance_action_id"] + "." + str(task["task_index"])
371 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
372
373 task_vim_info = task.get("vim_info")
374 task_vim_status = task["extra"].get("vim_status")
375 task_error_msg = task.get("error_msg")
376 task_sdn_net_id = task["extra"].get("sdn_net_id")
377
378 # get ovim status
379 if task_sdn_net_id:
380 try:
381 with self.db_lock:
382 sdn_net = self.ovim.show_network(task_sdn_net_id)
383 except (ovimException, Exception) as e:
384 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
385 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
386 sdn_net = {"status": "ERROR", "error_msg": text_error}
387 if sdn_net["status"] == "ERROR":
388 if not vim_info.get("error_msg"):
389 vim_info["error_msg"] = sdn_net["error_msg"]
390 else:
391 vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
392 self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
393 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
394 if vim_info["status"] == "VIM_ERROR":
395 vim_info["status"] = "VIM_SDN_ERROR"
396 else:
397 vim_info["status"] = "SDN_ERROR"
398
399 # update database
400 if vim_info.get("error_msg"):
401 vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
402 if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
403 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
404 task["extra"]["vim_status"] = vim_info["status"]
405 task["error_msg"] = vim_info.get("error_msg")
406 if vim_info.get("vim_info"):
407 task["vim_info"] = vim_info["vim_info"]
408 temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
409 if vim_info.get("vim_info"):
410 temp_dict["vim_info"] = vim_info["vim_info"]
411 with self.db_lock:
412 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
413 self.db.update_rows(
414 'vim_actions',
415 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
416 "error_msg": task.get("error_msg"), "modified_at": now},
417 WHERE={'instance_action_id': task['instance_action_id'],
418 'task_index': task['task_index']})
419 if task["extra"].get("vim_status") == "BUILD":
420 self._insert_refresh(task, now + self.REFRESH_BUILD)
421 else:
422 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
423
424 return nb_processed
425
426 def _insert_refresh(self, task, threshold_time=None):
427 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
428 It is assumed that this is called inside this thread
429 """
430 if not self.vim:
431 return
432 if not threshold_time:
433 threshold_time = time.time()
434 task["modified_at"] = threshold_time
435 task_name = task["item"][9:] + "-" + task["action"]
436 task_id = task["instance_action_id"] + "." + str(task["task_index"])
437 for index in range(0, len(self.refresh_tasks)):
438 if self.refresh_tasks[index]["modified_at"] > threshold_time:
439 self.refresh_tasks.insert(index, task)
440 break
441 else:
442 index = len(self.refresh_tasks)
443 self.refresh_tasks.append(task)
444 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
445 task_id, task_name, task["modified_at"], index))
446
447 def _remove_refresh(self, task_name, vim_id):
448 """Remove a task with this name and vim_id from the list of refreshing elements.
449 It is assumed that this is called inside this thread outside _refres_elements method
450 Return True if self.refresh_list is modified, task is found
451 Return False if not found
452 """
453 index_to_delete = None
454 for index in range(0, len(self.refresh_tasks)):
455 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
456 index_to_delete = index
457 break
458 else:
459 return False
460 if not index_to_delete:
461 del self.refresh_tasks[index_to_delete]
462 return True
463
464 def _proccess_pending_tasks(self):
465 nb_created = 0
466 nb_processed = 0
467 while self.pending_tasks:
468 task = self.pending_tasks.pop(0)
469 nb_processed += 1
470 try:
471 # check if tasks that this depends on have been completed
472 dependency_not_completed = False
473 for task_index in task["extra"].get("depends_on", ()):
474 task_dependency = task["depends"].get("TASK-" + str(task_index))
475 if not task_dependency:
476 task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index))
477 if not task_dependency:
478 raise VimThreadException(
479 "Cannot get depending net task trying to get depending task {}.{}".format(
480 task["instance_action_id"], task_index))
481 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
482 if task_dependency["status"] == "SCHEDULED":
483 dependency_not_completed = True
484 break
485 elif task_dependency["status"] == "FAILED":
486 raise VimThreadException(
487 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
488 task["action"], task["item"],
489 task["instance_action_id"], task["task_index"],
490 task_dependency["instance_action_id"], task_dependency["task_index"],
491 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
492 if dependency_not_completed:
493 # Move this task to the end.
494 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
495 if task["extra"]["tries"] <= 3:
496 self.pending_tasks.append(task)
497 continue
498 else:
499 raise VimThreadException(
500 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
501 "(task {}.{})".format(task["action"], task["item"],
502 task["instance_action_id"], task["task_index"],
503 task_dependency["instance_action_id"], task_dependency["task_index"],
504 task_dependency["action"], task_dependency["item"]))
505
506 if task["status"] == "SUPERSEDED":
507 # not needed to do anything but update database with the new status
508 result = True
509 database_update = None
510 elif not self.vim:
511 task["status"] = "ERROR"
512 task["error_msg"] = self.error_status
513 result = False
514 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
515 elif task["item"] == 'instance_vms':
516 if task["action"] == "CREATE":
517 result, database_update = self.new_vm(task)
518 nb_created += 1
519 elif task["action"] == "DELETE":
520 result, database_update = self.del_vm(task)
521 else:
522 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
523 elif task["item"] == 'instance_nets':
524 if task["action"] == "CREATE":
525 result, database_update = self.new_net(task)
526 nb_created += 1
527 elif task["action"] == "DELETE":
528 result, database_update = self.del_net(task)
529 elif task["action"] == "FIND":
530 result, database_update = self.get_net(task)
531 else:
532 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
533 elif task["item"] == 'instance_sfis':
534 if task["action"] == "CREATE":
535 result, database_update = self.new_sfi(task)
536 nb_created += 1
537 elif task["action"] == "DELETE":
538 result, database_update = self.del_sfi(task)
539 else:
540 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
541 elif task["item"] == 'instance_sfs':
542 if task["action"] == "CREATE":
543 result, database_update = self.new_sf(task)
544 nb_created += 1
545 elif task["action"] == "DELETE":
546 result, database_update = self.del_sf(task)
547 else:
548 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
549 elif task["item"] == 'instance_classifications':
550 if task["action"] == "CREATE":
551 result, database_update = self.new_classification(task)
552 nb_created += 1
553 elif task["action"] == "DELETE":
554 result, database_update = self.del_classification(task)
555 else:
556 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
557 elif task["item"] == 'instance_sfps':
558 if task["action"] == "CREATE":
559 result, database_update = self.new_sfp(task)
560 nb_created += 1
561 elif task["action"] == "DELETE":
562 result, database_update = self.del_sfp(task)
563 else:
564 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
565 else:
566 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
567 # TODO
568 except VimThreadException as e:
569 result = False
570 task["error_msg"] = str(e)
571 task["status"] = "FAILED"
572 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
573 if task["item"] == 'instance_vms':
574 database_update["vim_vm_id"] = None
575 elif task["item"] == 'instance_nets':
576 database_update["vim_net_id"] = None
577
578 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
579 'instance_classifications', 'instance_sfps']
580 if task["action"] == "DELETE":
581 action_key = task["item"] + task["item_id"]
582 del self.grouped_tasks[action_key]
583 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
584 if task["item"] not in no_refresh_tasks:
585 self._insert_refresh(task)
586
587 task_id = task["instance_action_id"] + "." + str(task["task_index"])
588 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
589 task_id, task["item"], task["action"], task["status"],
590 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
591 try:
592 now = time.time()
593 with self.db_lock:
594 self.db.update_rows(
595 table="vim_actions",
596 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
597 "error_msg": task["error_msg"],
598 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
599 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
600 if result is not None:
601 self.db.update_rows(
602 table="instance_actions",
603 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
604 "modified_at": now},
605 WHERE={"uuid": task["instance_action_id"]})
606 if database_update:
607 self.db.update_rows(table=task["item"],
608 UPDATE=database_update,
609 WHERE={"uuid": task["item_id"]})
610 except db_base_Exception as e:
611 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
612
613 if nb_created == 10:
614 break
615 return nb_processed
616
617 def _insert_pending_tasks(self, vim_actions_list):
618 for task in vim_actions_list:
619 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
620 continue
621 item = task["item"]
622 item_id = task["item_id"]
623 action_key = item + item_id
624 if action_key not in self.grouped_tasks:
625 self.grouped_tasks[action_key] = []
626 task["params"] = None
627 task["depends"] = {}
628 if task["extra"]:
629 extra = yaml.load(task["extra"])
630 task["extra"] = extra
631 task["params"] = extra.get("params")
632 depends_on_list = extra.get("depends_on")
633 if depends_on_list:
634 for index in depends_on_list:
635 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
636 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
637 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
638 if extra.get("interfaces"):
639 task["vim_interfaces"] = {}
640 else:
641 task["extra"] = {}
642 if "error_msg" not in task:
643 task["error_msg"] = None
644 if "vim_id" not in task:
645 task["vim_id"] = None
646
647 if task["action"] == "DELETE":
648 need_delete_action = False
649 for to_supersede in self.grouped_tasks.get(action_key, ()):
650 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
651 task["vim_id"] = to_supersede["vim_id"]
652 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
653 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
654 need_delete_action = True
655 task["vim_id"] = to_supersede["vim_id"]
656 if to_supersede["extra"].get("sdn_net_id"):
657 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
658 if to_supersede["extra"].get("interfaces"):
659 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
660 if to_supersede["extra"].get("created_items"):
661 if not task["extra"].get("created_items"):
662 task["extra"]["created_items"] = {}
663 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
664 # Mark task as SUPERSEDED.
665 # If task is in self.pending_tasks, it will be removed and database will be update
666 # If task is in self.refresh_tasks, it will be removed
667 to_supersede["status"] = "SUPERSEDED"
668 if not need_delete_action:
669 task["status"] = "SUPERSEDED"
670
671 self.grouped_tasks[action_key].append(task)
672 self.pending_tasks.append(task)
673 elif task["status"] == "SCHEDULED":
674 self.grouped_tasks[action_key].append(task)
675 self.pending_tasks.append(task)
676 elif task["action"] in ("CREATE", "FIND"):
677 self.grouped_tasks[action_key].append(task)
678 if task["status"] in ("DONE", "BUILD"):
679 self._insert_refresh(task)
680 # TODO add VM reset, get console, etc...
681 else:
682 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
683
684 def insert_task(self, task):
685 try:
686 self.task_queue.put(task, False)
687 return None
688 except Queue.Full:
689 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
690
691 def del_task(self, task):
692 with self.task_lock:
693 if task["status"] == "SCHEDULED":
694 task["status"] = "SUPERSEDED"
695 return True
696 else: # task["status"] == "processing"
697 self.task_lock.release()
698 return False
699
700 def run(self):
701 self.logger.debug("Starting")
702 while True:
703 self._reload_vim_actions()
704 reload_thread = False
705
706 while True:
707 try:
708 while not self.task_queue.empty():
709 task = self.task_queue.get()
710 if isinstance(task, list):
711 self._insert_pending_tasks(task)
712 elif isinstance(task, str):
713 if task == 'exit':
714 return 0
715 elif task == 'reload':
716 reload_thread = True
717 break
718 self.task_queue.task_done()
719 if reload_thread:
720 break
721 nb_processed = self._proccess_pending_tasks()
722 nb_processed += self._refres_elements()
723 if not nb_processed:
724 time.sleep(1)
725
726 except Exception as e:
727 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
728
729 self.logger.debug("Finishing")
730
731 def _look_for_task(self, instance_action_id, task_id):
732 task_index = task_id.split("-")[-1]
733 with self.db_lock:
734 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
735 "task_index": task_index})
736 if not tasks:
737 return None
738 task = tasks[0]
739 task["params"] = None
740 task["depends"] = {}
741 if task["extra"]:
742 extra = yaml.load(task["extra"])
743 task["extra"] = extra
744 task["params"] = extra.get("params")
745 if extra.get("interfaces"):
746 task["vim_interfaces"] = {}
747 else:
748 task["extra"] = {}
749 return task
750
751 @staticmethod
752 def _format_vim_error_msg(error_text, max_length=1024):
753 if error_text and len(error_text) >= max_length:
754 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
755 return error_text
756
757 def new_vm(self, task):
758 task_id = task["instance_action_id"] + "." + str(task["task_index"])
759 try:
760 params = task["params"]
761 depends = task.get("depends")
762 net_list = params[5]
763 for net in net_list:
764 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
765 task_dependency = task["depends"].get(net["net_id"])
766 if not task_dependency:
767 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
768 if not task_dependency:
769 raise VimThreadException(
770 "Cannot get depending net task trying to get depending task {}.{}".format(
771 task["instance_action_id"], net["net_id"]))
772 network_id = task_dependency.get("vim_id")
773 if not network_id:
774 raise VimThreadException(
775 "Cannot create VM because depends on a network not created or found: " +
776 str(depends[net["net_id"]]["error_msg"]))
777 net["net_id"] = network_id
778 vim_vm_id, created_items = self.vim.new_vminstance(*params)
779
780 # fill task_interfaces. Look for snd_net_id at database for each interface
781 task_interfaces = {}
782 for iface in net_list:
783 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
784 with self.db_lock:
785 result = self.db.get_rows(
786 SELECT=('sdn_net_id',),
787 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
788 WHERE={'ii.uuid': iface["uuid"]})
789 if result:
790 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
791 else:
792 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
793 iface["uuid"]), exc_info=True)
794
795 task["vim_info"] = {}
796 task["vim_interfaces"] = {}
797 task["extra"]["interfaces"] = task_interfaces
798 task["extra"]["created"] = True
799 task["extra"]["created_items"] = created_items
800 task["error_msg"] = None
801 task["status"] = "DONE"
802 task["vim_id"] = vim_vm_id
803 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
804 return True, instance_element_update
805
806 except (vimconn.vimconnException, VimThreadException) as e:
807 self.logger.error("task={} new-VM: {}".format(task_id, e))
808 error_text = self._format_vim_error_msg(str(e))
809 task["error_msg"] = error_text
810 task["status"] = "FAILED"
811 task["vim_id"] = None
812 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
813 return False, instance_element_update
814
815 def del_vm(self, task):
816 task_id = task["instance_action_id"] + "." + str(task["task_index"])
817 vm_vim_id = task["vim_id"]
818 interfaces = task["extra"].get("interfaces", ())
819 try:
820 for iface in interfaces.values():
821 if iface.get("sdn_port_id"):
822 try:
823 with self.db_lock:
824 self.ovim.delete_port(iface["sdn_port_id"])
825 except ovimException as e:
826 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
827 task_id, iface["sdn_port_id"], e), exc_info=True)
828 # TODO Set error_msg at instance_nets
829
830 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
831 task["status"] = "DONE"
832 task["error_msg"] = None
833 return True, None
834
835 except vimconn.vimconnException as e:
836 task["error_msg"] = self._format_vim_error_msg(str(e))
837 if isinstance(e, vimconn.vimconnNotFoundException):
838 # If not found mark as Done and fill error_msg
839 task["status"] = "DONE"
840 return True, None
841 task["status"] = "FAILED"
842 return False, None
843
844 def _get_net_internal(self, task, filter_param):
845 """
846 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
847 :param task: task for this find or find-or-create action
848 :param filter_param: parameters to send to the vimconnector
849 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
850 when network is not found or found more than one
851 """
852 vim_nets = self.vim.get_network_list(filter_param)
853 if not vim_nets:
854 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
855 elif len(vim_nets) > 1:
856 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
857 vim_net_id = vim_nets[0]["id"]
858
859 # Discover if this network is managed by a sdn controller
860 sdn_net_id = None
861 with self.db_lock:
862 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
863 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
864 'datacenter_tenant_id': self.datacenter_tenant_id})
865 if result:
866 sdn_net_id = result[0]['sdn_net_id']
867
868 task["status"] = "DONE"
869 task["extra"]["vim_info"] = {}
870 task["extra"]["created"] = False
871 task["extra"]["sdn_net_id"] = sdn_net_id
872 task["error_msg"] = None
873 task["vim_id"] = vim_net_id
874 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
875 "error_msg": None, "sdn_net_id": sdn_net_id}
876 return instance_element_update
877
878 def get_net(self, task):
879 task_id = task["instance_action_id"] + "." + str(task["task_index"])
880 try:
881 params = task["params"]
882 filter_param = params[0]
883 instance_element_update = self._get_net_internal(task, filter_param)
884 return True, instance_element_update
885
886 except (vimconn.vimconnException, VimThreadException) as e:
887 self.logger.error("task={} get-net: {}".format(task_id, e))
888 task["status"] = "FAILED"
889 task["vim_id"] = None
890 task["error_msg"] = self._format_vim_error_msg(str(e))
891 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
892 "error_msg": task["error_msg"]}
893 return False, instance_element_update
894
895 def new_net(self, task):
896 vim_net_id = None
897 sdn_net_id = None
898 task_id = task["instance_action_id"] + "." + str(task["task_index"])
899 action_text = ""
900 try:
901 # FIND
902 if task["extra"].get("find"):
903 action_text = "finding"
904 filter_param = task["extra"]["find"][0]
905 try:
906 instance_element_update = self._get_net_internal(task, filter_param)
907 return True, instance_element_update
908 except VimThreadExceptionNotFound:
909 pass
910 # CREATE
911 params = task["params"]
912 action_text = "creating VIM"
913 vim_net_id = self.vim.new_network(*params)
914
915 net_name = params[0]
916 net_type = params[1]
917
918 sdn_controller = self.vim.config.get('sdn-controller')
919 if sdn_controller and (net_type == "data" or net_type == "ptp"):
920 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
921
922 vim_net = self.vim.get_network(vim_net_id)
923 if vim_net.get('encapsulation') != 'vlan':
924 raise vimconn.vimconnException(
925 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
926 net_name, net_type, vim_net['encapsulation']))
927 network["vlan"] = vim_net.get('segmentation_id')
928 action_text = "creating SDN"
929 with self.db_lock:
930 sdn_net_id = self.ovim.new_network(network)
931 task["status"] = "DONE"
932 task["extra"]["vim_info"] = {}
933 task["extra"]["sdn_net_id"] = sdn_net_id
934 task["extra"]["created"] = True
935 task["error_msg"] = None
936 task["vim_id"] = vim_net_id
937 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
938 "created": True, "error_msg": None}
939 return True, instance_element_update
940 except (vimconn.vimconnException, ovimException) as e:
941 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
942 task["status"] = "FAILED"
943 task["vim_id"] = vim_net_id
944 task["error_msg"] = self._format_vim_error_msg(str(e))
945 task["extra"]["sdn_net_id"] = sdn_net_id
946 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
947 "error_msg": task["error_msg"]}
948 return False, instance_element_update
949
950 def del_net(self, task):
951 net_vim_id = task["vim_id"]
952 sdn_net_id = task["extra"].get("sdn_net_id")
953 try:
954 if sdn_net_id:
955 # Delete any attached port to this sdn network. There can be ports associated to this network in case
956 # it was manually done using 'openmano vim-net-sdn-attach'
957 with self.db_lock:
958 port_list = self.ovim.get_ports(columns={'uuid'},
959 filter={'name': 'external_port', 'net_id': sdn_net_id})
960 for port in port_list:
961 self.ovim.delete_port(port['uuid'])
962 self.ovim.delete_network(sdn_net_id)
963 if net_vim_id:
964 self.vim.delete_network(net_vim_id)
965 task["status"] = "DONE"
966 task["error_msg"] = None
967 return True, None
968 except ovimException as e:
969 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
970 "ports for net {}: {}".format(sdn_net_id, str(e)))
971 except vimconn.vimconnException as e:
972 task["error_msg"] = self._format_vim_error_msg(str(e))
973 if isinstance(e, vimconn.vimconnNotFoundException):
974 # If not found mark as Done and fill error_msg
975 task["status"] = "DONE"
976 return True, None
977 task["status"] = "FAILED"
978 return False, None
979
980 ## Service Function Instances
981
982 def new_sfi(self, task):
983 vim_sfi_id = None
984 try:
985 params = task["params"]
986 task_id = task["instance_action_id"] + "." + str(task["task_index"])
987 depends = task.get("depends")
988 error_text = ""
989 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
990 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
991 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
992 # first ingress and first egress ports will be used to create the SFI (Port Pair).
993 port_id_list = [interfaces[0].get("vim_id")]
994 name = "sfi-%s" % task["item_id"][:8]
995 # By default no form of IETF SFC Encapsulation will be used
996 vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False)
997
998 task["extra"]["created"] = True
999 task["error_msg"] = None
1000 task["status"] = "DONE"
1001 task["vim_id"] = vim_sfi_id
1002 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1003 return True, instance_element_update
1004
1005 except (vimconn.vimconnException, VimThreadException) as e:
1006 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1007 error_text = self._format_vim_error_msg(str(e))
1008 task["error_msg"] = error_text
1009 task["status"] = "FAILED"
1010 task["vim_id"] = None
1011 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1012 return False, instance_element_update
1013
1014 def del_sfi(self, task):
1015 sfi_vim_id = task["vim_id"]
1016 try:
1017 self.vim.delete_sfi(sfi_vim_id)
1018 task["status"] = "DONE"
1019 task["error_msg"] = None
1020 return True, None
1021
1022 except vimconn.vimconnException as e:
1023 task["error_msg"] = self._format_vim_error_msg(str(e))
1024 if isinstance(e, vimconn.vimconnNotFoundException):
1025 # If not found mark as Done and fill error_msg
1026 task["status"] = "DONE"
1027 return True, None
1028 task["status"] = "FAILED"
1029 return False, None
1030
1031 def new_sf(self, task):
1032 vim_sf_id = None
1033 try:
1034 params = task["params"]
1035 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1036 depends = task.get("depends")
1037 error_text = ""
1038 #sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1039 sfis = task.get("depends").values()
1040 sfi_id_list = []
1041 for sfi in sfis:
1042 sfi_id_list.append(sfi.get("vim_id"))
1043 name = "sf-%s" % task["item_id"][:8]
1044 # By default no form of IETF SFC Encapsulation will be used
1045 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1046
1047 task["extra"]["created"] = True
1048 task["error_msg"] = None
1049 task["status"] = "DONE"
1050 task["vim_id"] = vim_sf_id
1051 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1052 return True, instance_element_update
1053
1054 except (vimconn.vimconnException, VimThreadException) as e:
1055 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1056 error_text = self._format_vim_error_msg(str(e))
1057 task["error_msg"] = error_text
1058 task["status"] = "FAILED"
1059 task["vim_id"] = None
1060 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1061 return False, instance_element_update
1062
1063 def del_sf(self, task):
1064 sf_vim_id = task["vim_id"]
1065 try:
1066 self.vim.delete_sf(sf_vim_id)
1067 task["status"] = "DONE"
1068 task["error_msg"] = None
1069 return True, None
1070
1071 except vimconn.vimconnException as e:
1072 task["error_msg"] = self._format_vim_error_msg(str(e))
1073 if isinstance(e, vimconn.vimconnNotFoundException):
1074 # If not found mark as Done and fill error_msg
1075 task["status"] = "DONE"
1076 return True, None
1077 task["status"] = "FAILED"
1078 return False, None
1079
1080 def new_classification(self, task):
1081 vim_classification_id = None
1082 try:
1083 params = task["params"]
1084 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1085 depends = task.get("depends")
1086 error_text = ""
1087 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1088 # Bear in mind that different VIM connectors might support Classifications differently.
1089 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1090 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1091 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1092 # using the IPv4 flow classifier.
1093 name = "c-%s" % task["item_id"][:8]
1094 # if not CIDR is given for the IP addresses, add /32:
1095 ip_proto = int(params.get("ip_proto"))
1096 source_ip = params.get("source_ip")
1097 destination_ip = params.get("destination_ip")
1098 if ip_proto == 1:
1099 ip_proto = 'icmp'
1100 elif ip_proto == 6:
1101 ip_proto = 'tcp'
1102 elif ip_proto == 17:
1103 ip_proto = 'udp'
1104 if '/' not in source_ip:
1105 source_ip += '/32'
1106 if '/' not in destination_ip:
1107 destination_ip += '/32'
1108 definition = {
1109 "logical_source_port": interfaces[0].get("vim_id"),
1110 "protocol": ip_proto,
1111 "source_ip_prefix": source_ip,
1112 "destination_ip_prefix": destination_ip,
1113 "source_port_range_min": params.get("source_port"),
1114 "source_port_range_max": params.get("source_port"),
1115 "destination_port_range_min": params.get("destination_port"),
1116 "destination_port_range_max": params.get("destination_port"),
1117 }
1118
1119 vim_classification_id = self.vim.new_classification(
1120 name, 'legacy_flow_classifier', definition)
1121
1122 task["extra"]["created"] = True
1123 task["error_msg"] = None
1124 task["status"] = "DONE"
1125 task["vim_id"] = vim_classification_id
1126 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1127 return True, instance_element_update
1128
1129 except (vimconn.vimconnException, VimThreadException) as e:
1130 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1131 error_text = self._format_vim_error_msg(str(e))
1132 task["error_msg"] = error_text
1133 task["status"] = "FAILED"
1134 task["vim_id"] = None
1135 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1136 return False, instance_element_update
1137
1138 def del_classification(self, task):
1139 classification_vim_id = task["vim_id"]
1140 try:
1141 self.vim.delete_classification(classification_vim_id)
1142 task["status"] = "DONE"
1143 task["error_msg"] = None
1144 return True, None
1145
1146 except vimconn.vimconnException as e:
1147 task["error_msg"] = self._format_vim_error_msg(str(e))
1148 if isinstance(e, vimconn.vimconnNotFoundException):
1149 # If not found mark as Done and fill error_msg
1150 task["status"] = "DONE"
1151 return True, None
1152 task["status"] = "FAILED"
1153 return False, None
1154
1155 def new_sfp(self, task):
1156 vim_sfp_id = None
1157 try:
1158 params = task["params"]
1159 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1160 depends = task.get("depends")
1161 error_text = ""
1162 deps = task.get("depends").values()
1163 sf_id_list = []
1164 classification_id_list = []
1165 for dep in deps:
1166 vim_id = dep.get("vim_id")
1167 resource = dep.get("item")
1168 if resource == "instance_sfs":
1169 sf_id_list.append(vim_id)
1170 elif resource == "instance_classifications":
1171 classification_id_list.append(vim_id)
1172
1173 name = "sfp-%s" % task["item_id"][:8]
1174 # By default no form of IETF SFC Encapsulation will be used
1175 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1176
1177 task["extra"]["created"] = True
1178 task["error_msg"] = None
1179 task["status"] = "DONE"
1180 task["vim_id"] = vim_sfp_id
1181 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1182 return True, instance_element_update
1183
1184 except (vimconn.vimconnException, VimThreadException) as e:
1185 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1186 error_text = self._format_vim_error_msg(str(e))
1187 task["error_msg"] = error_text
1188 task["status"] = "FAILED"
1189 task["vim_id"] = None
1190 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1191 return False, instance_element_update
1192 return
1193
1194 def del_sfp(self, task):
1195 sfp_vim_id = task["vim_id"]
1196 try:
1197 self.vim.delete_sfp(sfp_vim_id)
1198 task["status"] = "DONE"
1199 task["error_msg"] = None
1200 return True, None
1201
1202 except vimconn.vimconnException as e:
1203 task["error_msg"] = self._format_vim_error_msg(str(e))
1204 if isinstance(e, vimconn.vimconnNotFoundException):
1205 # If not found mark as Done and fill error_msg
1206 task["status"] = "DONE"
1207 return True, None
1208 task["status"] = "FAILED"
1209 return False, None