vdu scaling
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 can contain an int (single index on the same instance-action) or str (compete action ID)
41 sdn_net_id: used for net.
42 tries:
43 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
44 iface_id: uuid of intance_interfaces
45 sdn_port_id:
46 sdn_net_id:
47 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
48 created: False if the VIM element is not created by other actions, and it should not be deleted
49 vim_status: VIM status of the element. Stored also at database in the instance_XXX
50 M depends: dict with task_index(from depends_on) to task class
51 M params: same as extra[params] but with the resolved dependencies
52 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_actions
53 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_actions
54 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
55 MD created_at: task creation time
56 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
57
58 """
59
60 import threading
61 import time
62 import Queue
63 import logging
64 import vimconn
65 import yaml
66 from db_base import db_base_Exception
67 from lib_osm_openvim.ovim import ovimException
68 from copy import deepcopy
69
70 __author__ = "Alfonso Tierno, Pablo Montes"
71 __date__ = "$28-Sep-2017 12:07:15$"
72
73
74 def is_task_id(task_id):
75 return task_id.startswith("TASK-")
76
77
78 class VimThreadException(Exception):
79 pass
80
81
82 class VimThreadExceptionNotFound(VimThreadException):
83 pass
84
85
86 class vim_thread(threading.Thread):
87 REFRESH_BUILD = 5 # 5 seconds
88 REFRESH_ACTIVE = 60 # 1 minute
89
90 def __init__(self, myvimconn, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
91 db=None, db_lock=None, ovim=None):
92 """Init a thread.
93 Arguments:
94 'id' number of thead
95 'name' name of thread
96 'host','user': host ip or name to manage and user
97 'db', 'db_lock': database class and lock to use it in exclusion
98 """
99 threading.Thread.__init__(self)
100 if isinstance(myvimconn, vimconn.vimconnException):
101 self.vim = None
102 self.error_status = "Error accesing to VIM: {}".format(myvimconn)
103 else:
104 self.vim = myvimconn
105 self.error_status = None
106 self.datacenter_name = datacenter_name
107 self.datacenter_tenant_id = datacenter_tenant_id
108 self.ovim = ovim
109 if not name:
110 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
111 else:
112 self.name = name
113
114 self.logger = logging.getLogger('openmano.vim.'+self.name)
115 self.db = db
116 self.db_lock = db_lock
117
118 self.task_lock = task_lock
119 self.task_queue = Queue.Queue(2000)
120
121 self.refresh_tasks = []
122 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
123
124 self.pending_tasks = []
125 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
126
127 self.grouped_tasks = {}
128 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
129 <item><item_id>:
130 - <task1> # e.g. CREATE task
131 <task2> # e.g. DELETE task
132 """
133
134 def _reload_vim_actions(self):
135 """
136 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
137 :return: None
138 """
139 try:
140 action_completed = False
141 task_list = []
142 old_action_key = None
143
144 old_item_id = ""
145 old_item = ""
146 old_created_at = 0.0
147 database_limit = 200
148 while True:
149 # get 200 (database_limit) entries each time
150 with self.db_lock:
151 vim_actions = self.db.get_rows(FROM="vim_actions",
152 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
153 "item_id>=": old_item_id},
154 ORDER_BY=("item_id", "item", "created_at",),
155 LIMIT=database_limit)
156 for task in vim_actions:
157 item = task["item"]
158 item_id = task["item_id"]
159
160 # skip the first entries that are already processed in the previous pool of 200
161 if old_item_id:
162 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
163 old_item_id = False # next one will be a new un-processed task
164 continue
165
166 action_key = item + item_id
167 if old_action_key != action_key:
168 if not action_completed and task_list:
169 # This will fill needed task parameters into memory, and insert the task if needed in
170 # self.pending_tasks or self.refresh_tasks
171 try:
172 self._insert_pending_tasks(task_list)
173 except Exception as e:
174 self.logger.critical(
175 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
176 exc_info=True)
177 task_list = []
178 old_action_key = action_key
179 action_completed = False
180 elif action_completed:
181 continue
182
183 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
184 task_list.append(task)
185 elif task["action"] == "DELETE":
186 # action completed because deleted and status is not SCHEDULED. Not needed anything
187 action_completed = True
188 if len(vim_actions) == database_limit:
189 # update variables for get the next database iteration
190 old_item_id = item_id
191 old_item = item
192 old_created_at = task["created_at"]
193 else:
194 break
195 # Last actions group need to be inserted too
196 if not action_completed and task_list:
197 try:
198 self._insert_pending_tasks(task_list)
199 except Exception as e:
200 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
201 exc_info=True)
202 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
203 len(self.pending_tasks), len(self.refresh_tasks)))
204 except Exception as e:
205 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
206
207 def _refres_elements(self):
208 """Call VIM to get VMs and networks status until 10 elements"""
209 now = time.time()
210 nb_processed = 0
211 vm_to_refresh_list = []
212 net_to_refresh_list = []
213 vm_to_refresh_dict = {}
214 net_to_refresh_dict = {}
215 items_to_refresh = 0
216 while self.refresh_tasks:
217 task = self.refresh_tasks[0]
218 with self.task_lock:
219 if task['status'] == 'SUPERSEDED':
220 self.refresh_tasks.pop(0)
221 continue
222 if task['modified_at'] > now:
223 break
224 # task["status"] = "processing"
225 nb_processed += 1
226 self.refresh_tasks.pop(0)
227 if task["item"] == 'instance_vms':
228 if task["vim_id"] not in vm_to_refresh_dict:
229 vm_to_refresh_dict[task["vim_id"]] = [task]
230 vm_to_refresh_list.append(task["vim_id"])
231 else:
232 vm_to_refresh_dict[task["vim_id"]].append(task)
233 elif task["item"] == 'instance_nets':
234 if task["vim_id"] not in net_to_refresh_dict:
235 net_to_refresh_dict[task["vim_id"]] = [task]
236 net_to_refresh_list.append(task["vim_id"])
237 else:
238 net_to_refresh_dict[task["vim_id"]].append(task)
239 else:
240 task_id = task["instance_action_id"] + "." + str(task["task_index"])
241 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
242 items_to_refresh += 1
243 if items_to_refresh == 10:
244 break
245
246 if vm_to_refresh_list:
247 now = time.time()
248 try:
249 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
250 except vimconn.vimconnException as e:
251 # Mark all tasks at VIM_ERROR status
252 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
253 vim_dict = {}
254 for vim_id in vm_to_refresh_list:
255 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
256
257 for vim_id, vim_info in vim_dict.items():
258
259 # look for task
260 for task in vm_to_refresh_dict[vim_id]:
261 task_need_update = False
262 task_id = task["instance_action_id"] + "." + str(task["task_index"])
263 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
264
265 # check and update interfaces
266 task_warning_msg = ""
267 for interface in vim_info.get("interfaces", ()):
268 vim_interface_id = interface["vim_interface_id"]
269 if vim_interface_id not in task["extra"]["interfaces"]:
270 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
271 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
272 continue
273 task_interface = task["extra"]["interfaces"][vim_interface_id]
274 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
275 if task_vim_interface != interface:
276 # delete old port
277 if task_interface.get("sdn_port_id"):
278 try:
279 with self.db_lock:
280 self.ovim.delete_port(task_interface["sdn_port_id"])
281 task_interface["sdn_port_id"] = None
282 task_need_update = True
283 except ovimException as e:
284 error_text = "ovimException deleting external_port={}: {}".format(
285 task_interface["sdn_port_id"], e)
286 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
287 task_warning_msg += error_text
288 # TODO Set error_msg at instance_nets instead of instance VMs
289
290 # Create SDN port
291 sdn_net_id = task_interface.get("sdn_net_id")
292 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
293 sdn_port_name = sdn_net_id + "." + task["vim_id"]
294 sdn_port_name = sdn_port_name[:63]
295 try:
296 with self.db_lock:
297 sdn_port_id = self.ovim.new_external_port(
298 {"compute_node": interface["compute_node"],
299 "pci": interface["pci"],
300 "vlan": interface.get("vlan"),
301 "net_id": sdn_net_id,
302 "region": self.vim["config"]["datacenter_id"],
303 "name": sdn_port_name,
304 "mac": interface.get("mac_address")})
305 task_interface["sdn_port_id"] = sdn_port_id
306 task_need_update = True
307 except (ovimException, Exception) as e:
308 error_text = "ovimException creating new_external_port compute_node={}"\
309 " pci={} vlan={} {}".format(
310 interface["compute_node"],
311 interface["pci"],
312 interface.get("vlan"), e)
313 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
314 task_warning_msg += error_text
315 # TODO Set error_msg at instance_nets instead of instance VMs
316
317 with self.db_lock:
318 self.db.update_rows(
319 'instance_interfaces',
320 UPDATE={"mac_address": interface.get("mac_address"),
321 "ip_address": interface.get("ip_address"),
322 "vim_info": interface.get("vim_info"),
323 "sdn_port_id": task_interface.get("sdn_port_id"),
324 "compute_node": interface.get("compute_node"),
325 "pci": interface.get("pci"),
326 "vlan": interface.get("vlan")},
327 WHERE={'uuid': task_interface["iface_id"]})
328 task["vim_interfaces"][vim_interface_id] = interface
329
330 # check and update task and instance_vms database
331 vim_info_error_msg = None
332 if vim_info.get("error_msg"):
333 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
334 elif task_warning_msg:
335 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
336 task_vim_info = task.get("vim_info")
337 task_error_msg = task.get("error_msg")
338 task_vim_status = task["extra"].get("vim_status")
339 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
340 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
341 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
342 if vim_info.get("vim_info"):
343 temp_dict["vim_info"] = vim_info["vim_info"]
344 with self.db_lock:
345 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
346 task["extra"]["vim_status"] = vim_info["status"]
347 task["error_msg"] = vim_info_error_msg
348 if vim_info.get("vim_info"):
349 task["vim_info"] = vim_info["vim_info"]
350 task_need_update = True
351
352 if task_need_update:
353 with self.db_lock:
354 self.db.update_rows(
355 'vim_actions',
356 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
357 "error_msg": task.get("error_msg"), "modified_at": now},
358 WHERE={'instance_action_id': task['instance_action_id'],
359 'task_index': task['task_index']})
360 if task["extra"].get("vim_status") == "BUILD":
361 self._insert_refresh(task, now + self.REFRESH_BUILD)
362 else:
363 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
364
365 if net_to_refresh_list:
366 now = time.time()
367 try:
368 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
369 except vimconn.vimconnException as e:
370 # Mark all tasks at VIM_ERROR status
371 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
372 vim_dict = {}
373 for vim_id in net_to_refresh_list:
374 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
375
376 for vim_id, vim_info in vim_dict.items():
377 # look for task
378 for task in net_to_refresh_dict[vim_id]:
379 task_id = task["instance_action_id"] + "." + str(task["task_index"])
380 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
381
382 task_vim_info = task.get("vim_info")
383 task_vim_status = task["extra"].get("vim_status")
384 task_error_msg = task.get("error_msg")
385 task_sdn_net_id = task["extra"].get("sdn_net_id")
386
387 vim_info_status = vim_info["status"]
388 vim_info_error_msg = vim_info.get("error_msg")
389 # get ovim status
390 if task_sdn_net_id:
391 try:
392 with self.db_lock:
393 sdn_net = self.ovim.show_network(task_sdn_net_id)
394 except (ovimException, Exception) as e:
395 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
396 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
397 sdn_net = {"status": "ERROR", "error_msg": text_error}
398 if sdn_net["status"] == "ERROR":
399 if not vim_info_error_msg:
400 vim_info_error_msg = sdn_net["error_msg"]
401 else:
402 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
403 self._format_vim_error_msg(vim_info_error_msg, 1024//2-14),
404 self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
405 if vim_info_status == "VIM_ERROR":
406 vim_info_status = "VIM_SDN_ERROR"
407 else:
408 vim_info_status = "SDN_ERROR"
409
410 # update database
411 if vim_info_error_msg:
412 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
413 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
414 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
415 task["extra"]["vim_status"] = vim_info_status
416 task["error_msg"] = vim_info_error_msg
417 if vim_info.get("vim_info"):
418 task["vim_info"] = vim_info["vim_info"]
419 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
420 if vim_info.get("vim_info"):
421 temp_dict["vim_info"] = vim_info["vim_info"]
422 with self.db_lock:
423 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
424 self.db.update_rows(
425 'vim_actions',
426 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
427 "error_msg": task.get("error_msg"), "modified_at": now},
428 WHERE={'instance_action_id': task['instance_action_id'],
429 'task_index': task['task_index']})
430 if task["extra"].get("vim_status") == "BUILD":
431 self._insert_refresh(task, now + self.REFRESH_BUILD)
432 else:
433 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
434
435 return nb_processed
436
437 def _insert_refresh(self, task, threshold_time=None):
438 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
439 It is assumed that this is called inside this thread
440 """
441 if not self.vim:
442 return
443 if not threshold_time:
444 threshold_time = time.time()
445 task["modified_at"] = threshold_time
446 task_name = task["item"][9:] + "-" + task["action"]
447 task_id = task["instance_action_id"] + "." + str(task["task_index"])
448 for index in range(0, len(self.refresh_tasks)):
449 if self.refresh_tasks[index]["modified_at"] > threshold_time:
450 self.refresh_tasks.insert(index, task)
451 break
452 else:
453 index = len(self.refresh_tasks)
454 self.refresh_tasks.append(task)
455 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
456 task_id, task_name, task["modified_at"], index))
457
458 def _remove_refresh(self, task_name, vim_id):
459 """Remove a task with this name and vim_id from the list of refreshing elements.
460 It is assumed that this is called inside this thread outside _refres_elements method
461 Return True if self.refresh_list is modified, task is found
462 Return False if not found
463 """
464 index_to_delete = None
465 for index in range(0, len(self.refresh_tasks)):
466 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
467 index_to_delete = index
468 break
469 else:
470 return False
471 if not index_to_delete:
472 del self.refresh_tasks[index_to_delete]
473 return True
474
475 def _proccess_pending_tasks(self):
476 nb_created = 0
477 nb_processed = 0
478 while self.pending_tasks:
479 task = self.pending_tasks.pop(0)
480 nb_processed += 1
481 try:
482 # check if tasks that this depends on have been completed
483 dependency_not_completed = False
484 for task_index in task["extra"].get("depends_on", ()):
485 task_dependency = task["depends"].get("TASK-" + str(task_index))
486 if not task_dependency:
487 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
488 if not task_dependency:
489 raise VimThreadException(
490 "Cannot get depending net task trying to get depending task {}.{}".format(
491 task["instance_action_id"], task_index))
492 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
493 if task_dependency["status"] == "SCHEDULED":
494 dependency_not_completed = True
495 break
496 elif task_dependency["status"] == "FAILED":
497 raise VimThreadException(
498 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
499 task["action"], task["item"],
500 task["instance_action_id"], task["task_index"],
501 task_dependency["instance_action_id"], task_dependency["task_index"],
502 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
503 if dependency_not_completed:
504 # Move this task to the end.
505 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
506 if task["extra"]["tries"] <= 3:
507 self.pending_tasks.append(task)
508 continue
509 else:
510 raise VimThreadException(
511 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
512 "(task {}.{})".format(task["action"], task["item"],
513 task["instance_action_id"], task["task_index"],
514 task_dependency["instance_action_id"], task_dependency["task_index"],
515 task_dependency["action"], task_dependency["item"]))
516
517 if task["status"] == "SUPERSEDED":
518 # not needed to do anything but update database with the new status
519 result = True
520 database_update = None
521 elif not self.vim:
522 task["status"] = "ERROR"
523 task["error_msg"] = self.error_status
524 result = False
525 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
526 elif task["item"] == 'instance_vms':
527 if task["action"] == "CREATE":
528 result, database_update = self.new_vm(task)
529 nb_created += 1
530 elif task["action"] == "DELETE":
531 result, database_update = self.del_vm(task)
532 else:
533 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
534 elif task["item"] == 'instance_nets':
535 if task["action"] == "CREATE":
536 result, database_update = self.new_net(task)
537 nb_created += 1
538 elif task["action"] == "DELETE":
539 result, database_update = self.del_net(task)
540 elif task["action"] == "FIND":
541 result, database_update = self.get_net(task)
542 else:
543 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
544 elif task["item"] == 'instance_sfis':
545 if task["action"] == "CREATE":
546 result, database_update = self.new_sfi(task)
547 nb_created += 1
548 elif task["action"] == "DELETE":
549 result, database_update = self.del_sfi(task)
550 else:
551 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
552 elif task["item"] == 'instance_sfs':
553 if task["action"] == "CREATE":
554 result, database_update = self.new_sf(task)
555 nb_created += 1
556 elif task["action"] == "DELETE":
557 result, database_update = self.del_sf(task)
558 else:
559 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
560 elif task["item"] == 'instance_classifications':
561 if task["action"] == "CREATE":
562 result, database_update = self.new_classification(task)
563 nb_created += 1
564 elif task["action"] == "DELETE":
565 result, database_update = self.del_classification(task)
566 else:
567 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
568 elif task["item"] == 'instance_sfps':
569 if task["action"] == "CREATE":
570 result, database_update = self.new_sfp(task)
571 nb_created += 1
572 elif task["action"] == "DELETE":
573 result, database_update = self.del_sfp(task)
574 else:
575 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
576 else:
577 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
578 # TODO
579 except VimThreadException as e:
580 result = False
581 task["error_msg"] = str(e)
582 task["status"] = "FAILED"
583 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
584 if task["item"] == 'instance_vms':
585 database_update["vim_vm_id"] = None
586 elif task["item"] == 'instance_nets':
587 database_update["vim_net_id"] = None
588
589 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
590 'instance_classifications', 'instance_sfps']
591 if task["action"] == "DELETE":
592 action_key = task["item"] + task["item_id"]
593 del self.grouped_tasks[action_key]
594 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
595 if task["item"] not in no_refresh_tasks:
596 self._insert_refresh(task)
597
598 task_id = task["instance_action_id"] + "." + str(task["task_index"])
599 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
600 task_id, task["item"], task["action"], task["status"],
601 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
602 try:
603 now = time.time()
604 with self.db_lock:
605 self.db.update_rows(
606 table="vim_actions",
607 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
608 "error_msg": task["error_msg"],
609 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
610 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
611 if result is not None:
612 self.db.update_rows(
613 table="instance_actions",
614 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
615 "modified_at": now},
616 WHERE={"uuid": task["instance_action_id"]})
617 if database_update:
618 self.db.update_rows(table=task["item"],
619 UPDATE=database_update,
620 WHERE={"uuid": task["item_id"]})
621 except db_base_Exception as e:
622 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
623
624 if nb_created == 10:
625 break
626 return nb_processed
627
628 def _insert_pending_tasks(self, vim_actions_list):
629 for task in vim_actions_list:
630 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
631 continue
632 item = task["item"]
633 item_id = task["item_id"]
634 action_key = item + item_id
635 if action_key not in self.grouped_tasks:
636 self.grouped_tasks[action_key] = []
637 task["params"] = None
638 task["depends"] = {}
639 if task["extra"]:
640 extra = yaml.load(task["extra"])
641 task["extra"] = extra
642 task["params"] = extra.get("params")
643 depends_on_list = extra.get("depends_on")
644 if depends_on_list:
645 for dependency_task in depends_on_list:
646 if isinstance(dependency_task, int):
647 index = dependency_task
648 else:
649 instance_action_id, _, task_id = dependency_task.rpartition(".")
650 if instance_action_id != task["instance_action_id"]:
651 continue
652 index = int(task_id)
653
654 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\
655 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
656 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
657 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index]
658 if extra.get("interfaces"):
659 task["vim_interfaces"] = {}
660 else:
661 task["extra"] = {}
662 if "error_msg" not in task:
663 task["error_msg"] = None
664 if "vim_id" not in task:
665 task["vim_id"] = None
666
667 if task["action"] == "DELETE":
668 need_delete_action = False
669 for to_supersede in self.grouped_tasks.get(action_key, ()):
670 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
671 task["vim_id"] = to_supersede["vim_id"]
672 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
673 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
674 need_delete_action = True
675 task["vim_id"] = to_supersede["vim_id"]
676 if to_supersede["extra"].get("sdn_net_id"):
677 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
678 if to_supersede["extra"].get("interfaces"):
679 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
680 if to_supersede["extra"].get("created_items"):
681 if not task["extra"].get("created_items"):
682 task["extra"]["created_items"] = {}
683 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
684 # Mark task as SUPERSEDED.
685 # If task is in self.pending_tasks, it will be removed and database will be update
686 # If task is in self.refresh_tasks, it will be removed
687 to_supersede["status"] = "SUPERSEDED"
688 if not need_delete_action:
689 task["status"] = "SUPERSEDED"
690
691 self.grouped_tasks[action_key].append(task)
692 self.pending_tasks.append(task)
693 elif task["status"] == "SCHEDULED":
694 self.grouped_tasks[action_key].append(task)
695 self.pending_tasks.append(task)
696 elif task["action"] in ("CREATE", "FIND"):
697 self.grouped_tasks[action_key].append(task)
698 if task["status"] in ("DONE", "BUILD"):
699 self._insert_refresh(task)
700 # TODO add VM reset, get console, etc...
701 else:
702 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
703
704 def insert_task(self, task):
705 try:
706 self.task_queue.put(task, False)
707 return None
708 except Queue.Full:
709 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
710
711 def del_task(self, task):
712 with self.task_lock:
713 if task["status"] == "SCHEDULED":
714 task["status"] = "SUPERSEDED"
715 return True
716 else: # task["status"] == "processing"
717 self.task_lock.release()
718 return False
719
720 def run(self):
721 self.logger.debug("Starting")
722 while True:
723 self._reload_vim_actions()
724 reload_thread = False
725
726 while True:
727 try:
728 while not self.task_queue.empty():
729 task = self.task_queue.get()
730 if isinstance(task, list):
731 self._insert_pending_tasks(task)
732 elif isinstance(task, str):
733 if task == 'exit':
734 return 0
735 elif task == 'reload':
736 reload_thread = True
737 break
738 self.task_queue.task_done()
739 if reload_thread:
740 break
741 nb_processed = self._proccess_pending_tasks()
742 nb_processed += self._refres_elements()
743 if not nb_processed:
744 time.sleep(1)
745
746 except Exception as e:
747 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
748
749 self.logger.debug("Finishing")
750
751 def _look_for_task(self, instance_action_id, task_id):
752 """
753 Look for a concrete task at vim_actions database table
754 :param instance_action_id: The instance_action_id
755 :param task_id: Can have several formats:
756 <task index>: integer
757 TASK-<task index> :backward compatibility,
758 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
759 :return: Task dictionary or None if not found
760 """
761 if isinstance(task_id, int):
762 task_index = task_id
763 else:
764 if task_id.startswith("TASK-"):
765 task_id = task_id[5:]
766 ins_action_id, _, task_index = task_id.rpartition(".")
767 if ins_action_id:
768 instance_action_id = ins_action_id
769
770 with self.db_lock:
771 tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id,
772 "task_index": task_index})
773 if not tasks:
774 return None
775 task = tasks[0]
776 task["params"] = None
777 task["depends"] = {}
778 if task["extra"]:
779 extra = yaml.load(task["extra"])
780 task["extra"] = extra
781 task["params"] = extra.get("params")
782 if extra.get("interfaces"):
783 task["vim_interfaces"] = {}
784 else:
785 task["extra"] = {}
786 return task
787
788 @staticmethod
789 def _format_vim_error_msg(error_text, max_length=1024):
790 if error_text and len(error_text) >= max_length:
791 return error_text[:max_length//2-3] + " ... " + error_text[-max_length//2+3:]
792 return error_text
793
794 def new_vm(self, task):
795 task_id = task["instance_action_id"] + "." + str(task["task_index"])
796 try:
797 params = task["params"]
798 depends = task.get("depends")
799 net_list = params[5]
800 for net in net_list:
801 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
802 task_dependency = task["depends"].get(net["net_id"])
803 if not task_dependency:
804 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
805 if not task_dependency:
806 raise VimThreadException(
807 "Cannot get depending net task trying to get depending task {}.{}".format(
808 task["instance_action_id"], net["net_id"]))
809 network_id = task_dependency.get("vim_id")
810 if not network_id:
811 raise VimThreadException(
812 "Cannot create VM because depends on a network not created or found: " +
813 str(depends[net["net_id"]]["error_msg"]))
814 net["net_id"] = network_id
815 params_copy = deepcopy(params)
816 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
817
818 # fill task_interfaces. Look for snd_net_id at database for each interface
819 task_interfaces = {}
820 for iface in params_copy[5]:
821 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
822 with self.db_lock:
823 result = self.db.get_rows(
824 SELECT=('sdn_net_id',),
825 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
826 WHERE={'ii.uuid': iface["uuid"]})
827 if result:
828 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
829 else:
830 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
831 iface["uuid"]), exc_info=True)
832
833 task["vim_info"] = {}
834 task["vim_interfaces"] = {}
835 task["extra"]["interfaces"] = task_interfaces
836 task["extra"]["created"] = True
837 task["extra"]["created_items"] = created_items
838 task["error_msg"] = None
839 task["status"] = "DONE"
840 task["vim_id"] = vim_vm_id
841 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
842 return True, instance_element_update
843
844 except (vimconn.vimconnException, VimThreadException) as e:
845 self.logger.error("task={} new-VM: {}".format(task_id, e))
846 error_text = self._format_vim_error_msg(str(e))
847 task["error_msg"] = error_text
848 task["status"] = "FAILED"
849 task["vim_id"] = None
850 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
851 return False, instance_element_update
852
853 def del_vm(self, task):
854 task_id = task["instance_action_id"] + "." + str(task["task_index"])
855 vm_vim_id = task["vim_id"]
856 interfaces = task["extra"].get("interfaces", ())
857 try:
858 for iface in interfaces.values():
859 if iface.get("sdn_port_id"):
860 try:
861 with self.db_lock:
862 self.ovim.delete_port(iface["sdn_port_id"])
863 except ovimException as e:
864 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
865 task_id, iface["sdn_port_id"], e), exc_info=True)
866 # TODO Set error_msg at instance_nets
867
868 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
869 task["status"] = "DONE"
870 task["error_msg"] = None
871 return True, None
872
873 except vimconn.vimconnException as e:
874 task["error_msg"] = self._format_vim_error_msg(str(e))
875 if isinstance(e, vimconn.vimconnNotFoundException):
876 # If not found mark as Done and fill error_msg
877 task["status"] = "DONE"
878 return True, None
879 task["status"] = "FAILED"
880 return False, None
881
882 def _get_net_internal(self, task, filter_param):
883 """
884 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
885 :param task: task for this find or find-or-create action
886 :param filter_param: parameters to send to the vimconnector
887 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
888 when network is not found or found more than one
889 """
890 vim_nets = self.vim.get_network_list(filter_param)
891 if not vim_nets:
892 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
893 elif len(vim_nets) > 1:
894 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
895 vim_net_id = vim_nets[0]["id"]
896
897 # Discover if this network is managed by a sdn controller
898 sdn_net_id = None
899 with self.db_lock:
900 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
901 WHERE={'vim_net_id': vim_net_id, 'instance_scenario_id': None,
902 'datacenter_tenant_id': self.datacenter_tenant_id})
903 if result:
904 sdn_net_id = result[0]['sdn_net_id']
905
906 task["status"] = "DONE"
907 task["extra"]["vim_info"] = {}
908 task["extra"]["created"] = False
909 task["extra"]["sdn_net_id"] = sdn_net_id
910 task["error_msg"] = None
911 task["vim_id"] = vim_net_id
912 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
913 "error_msg": None, "sdn_net_id": sdn_net_id}
914 return instance_element_update
915
916 def get_net(self, task):
917 task_id = task["instance_action_id"] + "." + str(task["task_index"])
918 try:
919 params = task["params"]
920 filter_param = params[0]
921 instance_element_update = self._get_net_internal(task, filter_param)
922 return True, instance_element_update
923
924 except (vimconn.vimconnException, VimThreadException) as e:
925 self.logger.error("task={} get-net: {}".format(task_id, e))
926 task["status"] = "FAILED"
927 task["vim_id"] = None
928 task["error_msg"] = self._format_vim_error_msg(str(e))
929 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
930 "error_msg": task["error_msg"]}
931 return False, instance_element_update
932
933 def new_net(self, task):
934 vim_net_id = None
935 sdn_net_id = None
936 task_id = task["instance_action_id"] + "." + str(task["task_index"])
937 action_text = ""
938 try:
939 # FIND
940 if task["extra"].get("find"):
941 action_text = "finding"
942 filter_param = task["extra"]["find"][0]
943 try:
944 instance_element_update = self._get_net_internal(task, filter_param)
945 return True, instance_element_update
946 except VimThreadExceptionNotFound:
947 pass
948 # CREATE
949 params = task["params"]
950 action_text = "creating VIM"
951 vim_net_id = self.vim.new_network(*params)
952
953 net_name = params[0]
954 net_type = params[1]
955
956 sdn_controller = self.vim.config.get('sdn-controller')
957 if sdn_controller and (net_type == "data" or net_type == "ptp"):
958 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
959
960 vim_net = self.vim.get_network(vim_net_id)
961 if vim_net.get('encapsulation') != 'vlan':
962 raise vimconn.vimconnException(
963 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
964 net_name, net_type, vim_net['encapsulation']))
965 network["vlan"] = vim_net.get('segmentation_id')
966 action_text = "creating SDN"
967 with self.db_lock:
968 sdn_net_id = self.ovim.new_network(network)
969 task["status"] = "DONE"
970 task["extra"]["vim_info"] = {}
971 task["extra"]["sdn_net_id"] = sdn_net_id
972 task["extra"]["created"] = True
973 task["error_msg"] = None
974 task["vim_id"] = vim_net_id
975 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
976 "created": True, "error_msg": None}
977 return True, instance_element_update
978 except (vimconn.vimconnException, ovimException) as e:
979 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
980 task["status"] = "FAILED"
981 task["vim_id"] = vim_net_id
982 task["error_msg"] = self._format_vim_error_msg(str(e))
983 task["extra"]["sdn_net_id"] = sdn_net_id
984 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
985 "error_msg": task["error_msg"]}
986 return False, instance_element_update
987
988 def del_net(self, task):
989 net_vim_id = task["vim_id"]
990 sdn_net_id = task["extra"].get("sdn_net_id")
991 try:
992 if sdn_net_id:
993 # Delete any attached port to this sdn network. There can be ports associated to this network in case
994 # it was manually done using 'openmano vim-net-sdn-attach'
995 with self.db_lock:
996 port_list = self.ovim.get_ports(columns={'uuid'},
997 filter={'name': 'external_port', 'net_id': sdn_net_id})
998 for port in port_list:
999 self.ovim.delete_port(port['uuid'])
1000 self.ovim.delete_network(sdn_net_id)
1001 if net_vim_id:
1002 self.vim.delete_network(net_vim_id)
1003 task["status"] = "DONE"
1004 task["error_msg"] = None
1005 return True, None
1006 except ovimException as e:
1007 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1008 "ports for net {}: {}".format(sdn_net_id, str(e)))
1009 except vimconn.vimconnException as e:
1010 task["error_msg"] = self._format_vim_error_msg(str(e))
1011 if isinstance(e, vimconn.vimconnNotFoundException):
1012 # If not found mark as Done and fill error_msg
1013 task["status"] = "DONE"
1014 return True, None
1015 task["status"] = "FAILED"
1016 return False, None
1017
1018 ## Service Function Instances
1019
1020 def new_sfi(self, task):
1021 vim_sfi_id = None
1022 try:
1023 params = task["params"]
1024 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1025 depends = task.get("depends")
1026 error_text = ""
1027 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1028 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1029 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1030 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1031 port_id_list = [interfaces[0].get("vim_id")]
1032 name = "sfi-%s" % task["item_id"][:8]
1033 # By default no form of IETF SFC Encapsulation will be used
1034 vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False)
1035
1036 task["extra"]["created"] = True
1037 task["error_msg"] = None
1038 task["status"] = "DONE"
1039 task["vim_id"] = vim_sfi_id
1040 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1041 return True, instance_element_update
1042
1043 except (vimconn.vimconnException, VimThreadException) as e:
1044 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1045 error_text = self._format_vim_error_msg(str(e))
1046 task["error_msg"] = error_text
1047 task["status"] = "FAILED"
1048 task["vim_id"] = None
1049 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1050 return False, instance_element_update
1051
1052 def del_sfi(self, task):
1053 sfi_vim_id = task["vim_id"]
1054 try:
1055 self.vim.delete_sfi(sfi_vim_id)
1056 task["status"] = "DONE"
1057 task["error_msg"] = None
1058 return True, None
1059
1060 except vimconn.vimconnException as e:
1061 task["error_msg"] = self._format_vim_error_msg(str(e))
1062 if isinstance(e, vimconn.vimconnNotFoundException):
1063 # If not found mark as Done and fill error_msg
1064 task["status"] = "DONE"
1065 return True, None
1066 task["status"] = "FAILED"
1067 return False, None
1068
1069 def new_sf(self, task):
1070 vim_sf_id = None
1071 try:
1072 params = task["params"]
1073 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1074 depends = task.get("depends")
1075 error_text = ""
1076 #sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1077 sfis = task.get("depends").values()
1078 sfi_id_list = []
1079 for sfi in sfis:
1080 sfi_id_list.append(sfi.get("vim_id"))
1081 name = "sf-%s" % task["item_id"][:8]
1082 # By default no form of IETF SFC Encapsulation will be used
1083 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1084
1085 task["extra"]["created"] = True
1086 task["error_msg"] = None
1087 task["status"] = "DONE"
1088 task["vim_id"] = vim_sf_id
1089 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1090 return True, instance_element_update
1091
1092 except (vimconn.vimconnException, VimThreadException) as e:
1093 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1094 error_text = self._format_vim_error_msg(str(e))
1095 task["error_msg"] = error_text
1096 task["status"] = "FAILED"
1097 task["vim_id"] = None
1098 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1099 return False, instance_element_update
1100
1101 def del_sf(self, task):
1102 sf_vim_id = task["vim_id"]
1103 try:
1104 self.vim.delete_sf(sf_vim_id)
1105 task["status"] = "DONE"
1106 task["error_msg"] = None
1107 return True, None
1108
1109 except vimconn.vimconnException as e:
1110 task["error_msg"] = self._format_vim_error_msg(str(e))
1111 if isinstance(e, vimconn.vimconnNotFoundException):
1112 # If not found mark as Done and fill error_msg
1113 task["status"] = "DONE"
1114 return True, None
1115 task["status"] = "FAILED"
1116 return False, None
1117
1118 def new_classification(self, task):
1119 vim_classification_id = None
1120 try:
1121 params = task["params"]
1122 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1123 depends = task.get("depends")
1124 error_text = ""
1125 interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
1126 # Bear in mind that different VIM connectors might support Classifications differently.
1127 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1128 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1129 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1130 # using the IPv4 flow classifier.
1131 name = "c-%s" % task["item_id"][:8]
1132 # if not CIDR is given for the IP addresses, add /32:
1133 ip_proto = int(params.get("ip_proto"))
1134 source_ip = params.get("source_ip")
1135 destination_ip = params.get("destination_ip")
1136 if ip_proto == 1:
1137 ip_proto = 'icmp'
1138 elif ip_proto == 6:
1139 ip_proto = 'tcp'
1140 elif ip_proto == 17:
1141 ip_proto = 'udp'
1142 if '/' not in source_ip:
1143 source_ip += '/32'
1144 if '/' not in destination_ip:
1145 destination_ip += '/32'
1146 definition = {
1147 "logical_source_port": interfaces[0].get("vim_id"),
1148 "protocol": ip_proto,
1149 "source_ip_prefix": source_ip,
1150 "destination_ip_prefix": destination_ip,
1151 "source_port_range_min": params.get("source_port"),
1152 "source_port_range_max": params.get("source_port"),
1153 "destination_port_range_min": params.get("destination_port"),
1154 "destination_port_range_max": params.get("destination_port"),
1155 }
1156
1157 vim_classification_id = self.vim.new_classification(
1158 name, 'legacy_flow_classifier', definition)
1159
1160 task["extra"]["created"] = True
1161 task["error_msg"] = None
1162 task["status"] = "DONE"
1163 task["vim_id"] = vim_classification_id
1164 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1165 return True, instance_element_update
1166
1167 except (vimconn.vimconnException, VimThreadException) as e:
1168 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1169 error_text = self._format_vim_error_msg(str(e))
1170 task["error_msg"] = error_text
1171 task["status"] = "FAILED"
1172 task["vim_id"] = None
1173 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1174 return False, instance_element_update
1175
1176 def del_classification(self, task):
1177 classification_vim_id = task["vim_id"]
1178 try:
1179 self.vim.delete_classification(classification_vim_id)
1180 task["status"] = "DONE"
1181 task["error_msg"] = None
1182 return True, None
1183
1184 except vimconn.vimconnException as e:
1185 task["error_msg"] = self._format_vim_error_msg(str(e))
1186 if isinstance(e, vimconn.vimconnNotFoundException):
1187 # If not found mark as Done and fill error_msg
1188 task["status"] = "DONE"
1189 return True, None
1190 task["status"] = "FAILED"
1191 return False, None
1192
1193 def new_sfp(self, task):
1194 vim_sfp_id = None
1195 try:
1196 params = task["params"]
1197 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1198 depends = task.get("depends")
1199 error_text = ""
1200 deps = task.get("depends").values()
1201 sf_id_list = []
1202 classification_id_list = []
1203 for dep in deps:
1204 vim_id = dep.get("vim_id")
1205 resource = dep.get("item")
1206 if resource == "instance_sfs":
1207 sf_id_list.append(vim_id)
1208 elif resource == "instance_classifications":
1209 classification_id_list.append(vim_id)
1210
1211 name = "sfp-%s" % task["item_id"][:8]
1212 # By default no form of IETF SFC Encapsulation will be used
1213 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1214
1215 task["extra"]["created"] = True
1216 task["error_msg"] = None
1217 task["status"] = "DONE"
1218 task["vim_id"] = vim_sfp_id
1219 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1220 return True, instance_element_update
1221
1222 except (vimconn.vimconnException, VimThreadException) as e:
1223 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1224 error_text = self._format_vim_error_msg(str(e))
1225 task["error_msg"] = error_text
1226 task["status"] = "FAILED"
1227 task["vim_id"] = None
1228 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1229 return False, instance_element_update
1230 return
1231
1232 def del_sfp(self, task):
1233 sfp_vim_id = task["vim_id"]
1234 try:
1235 self.vim.delete_sfp(sfp_vim_id)
1236 task["status"] = "DONE"
1237 task["error_msg"] = None
1238 return True, None
1239
1240 except vimconn.vimconnException as e:
1241 task["error_msg"] = self._format_vim_error_msg(str(e))
1242 if isinstance(e, vimconn.vimconnNotFoundException):
1243 # If not found mark as Done and fill error_msg
1244 task["status"] = "DONE"
1245 return True, None
1246 task["status"] = "FAILED"
1247 return False, None