fix ip-profile ignored at openstack
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 can contain an int (single index on the same instance-action) or str (compete action ID)
41 sdn_net_id: used for net.
42 tries:
43 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
44 iface_id: uuid of intance_interfaces
45 sdn_port_id:
46 sdn_net_id:
47 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
48 created: False if the VIM element is not created by other actions, and it should not be deleted
49 vim_status: VIM status of the element. Stored also at database in the instance_XXX
50 M depends: dict with task_index(from depends_on) to task class
51 M params: same as extra[params] but with the resolved dependencies
52 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_wim_actions
53 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_wim_actions
54 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
55 MD created_at: task creation time
56 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
57
58 """
59
60 import threading
61 import time
62 import Queue
63 import logging
64 import vimconn
65 import vimconn_openvim
66 import vimconn_aws
67 import vimconn_opennebula
68 import vimconn_openstack
69 import vimconn_vmware
70 import yaml
71 from db_base import db_base_Exception
72 from lib_osm_openvim.ovim import ovimException
73 from copy import deepcopy
74
75 __author__ = "Alfonso Tierno, Pablo Montes"
76 __date__ = "$28-Sep-2017 12:07:15$"
77
78 vim_module = {
79 "openvim": vimconn_openvim,
80 "aws": vimconn_aws,
81 "opennebula": vimconn_opennebula,
82 "openstack": vimconn_openstack,
83 "vmware": vimconn_vmware,
84 }
85
86
87 def is_task_id(task_id):
88 return task_id.startswith("TASK-")
89
90
91 class VimThreadException(Exception):
92 pass
93
94
95 class VimThreadExceptionNotFound(VimThreadException):
96 pass
97
98
99 class vim_thread(threading.Thread):
100 REFRESH_BUILD = 5 # 5 seconds
101 REFRESH_ACTIVE = 60 # 1 minute
102
103 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
104 db=None, db_lock=None, ovim=None):
105 """Init a thread.
106 Arguments:
107 'id' number of thead
108 'name' name of thread
109 'host','user': host ip or name to manage and user
110 'db', 'db_lock': database class and lock to use it in exclusion
111 """
112 threading.Thread.__init__(self)
113 self.vim = None
114 self.error_status = None
115 self.datacenter_name = datacenter_name
116 self.datacenter_tenant_id = datacenter_tenant_id
117 self.ovim = ovim
118 if not name:
119 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
120 else:
121 self.name = name
122 self.vim_persistent_info = {}
123
124 self.logger = logging.getLogger('openmano.vim.' + self.name)
125 self.db = db
126 self.db_lock = db_lock
127
128 self.task_lock = task_lock
129 self.task_queue = Queue.Queue(2000)
130
131 self.refresh_tasks = []
132 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
133
134 self.pending_tasks = []
135 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
136
137 self.grouped_tasks = {}
138 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
139 <item><item_id>:
140 - <task1> # e.g. CREATE task
141 <task2> # e.g. DELETE task
142 """
143
144 def get_vimconnector(self):
145 try:
146 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
147 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
148 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
149 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
150 'user', 'passwd', 'dt.config as dt_config')
151 where_ = {"dt.uuid": self.datacenter_tenant_id}
152 with self.db_lock:
153 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
154 vim = vims[0]
155 vim_config = {}
156 if vim["config"]:
157 vim_config.update(yaml.load(vim["config"]))
158 if vim["dt_config"]:
159 vim_config.update(yaml.load(vim["dt_config"]))
160 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
161 vim_config['datacenter_id'] = vim.get('datacenter_id')
162
163 # get port_mapping
164 vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
165 db_filter={"region": vim_config['datacenter_id'], "pci": None})
166
167 self.vim = vim_module[vim["type"]].vimconnector(
168 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
169 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
170 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
171 user=vim['user'], passwd=vim['passwd'],
172 config=vim_config, persistent_info=self.vim_persistent_info
173 )
174 self.error_status = None
175 except Exception as e:
176 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
177 self.vim = None
178 self.error_status = "Error loading vimconnector: {}".format(e)
179
180 def _reload_vim_actions(self):
181 """
182 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
183 :return: None
184 """
185 try:
186 action_completed = False
187 task_list = []
188 old_action_key = None
189
190 old_item_id = ""
191 old_item = ""
192 old_created_at = 0.0
193 database_limit = 200
194 while True:
195 # get 200 (database_limit) entries each time
196 with self.db_lock:
197 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
198 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
199 "item_id>=": old_item_id},
200 ORDER_BY=("item_id", "item", "created_at",),
201 LIMIT=database_limit)
202 for task in vim_actions:
203 item = task["item"]
204 item_id = task["item_id"]
205
206 # skip the first entries that are already processed in the previous pool of 200
207 if old_item_id:
208 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
209 old_item_id = False # next one will be a new un-processed task
210 continue
211
212 action_key = item + item_id
213 if old_action_key != action_key:
214 if not action_completed and task_list:
215 # This will fill needed task parameters into memory, and insert the task if needed in
216 # self.pending_tasks or self.refresh_tasks
217 try:
218 self._insert_pending_tasks(task_list)
219 except Exception as e:
220 self.logger.critical(
221 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
222 exc_info=True)
223 task_list = []
224 old_action_key = action_key
225 action_completed = False
226 elif action_completed:
227 continue
228
229 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
230 task_list.append(task)
231 elif task["action"] == "DELETE":
232 # action completed because deleted and status is not SCHEDULED. Not needed anything
233 action_completed = True
234 if len(vim_actions) == database_limit:
235 # update variables for get the next database iteration
236 old_item_id = item_id
237 old_item = item
238 old_created_at = task["created_at"]
239 else:
240 break
241 # Last actions group need to be inserted too
242 if not action_completed and task_list:
243 try:
244 self._insert_pending_tasks(task_list)
245 except Exception as e:
246 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
247 exc_info=True)
248 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
249 len(self.pending_tasks), len(self.refresh_tasks)))
250 except Exception as e:
251 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
252
253 def _refres_elements(self):
254 """Call VIM to get VMs and networks status until 10 elements"""
255 now = time.time()
256 nb_processed = 0
257 vm_to_refresh_list = []
258 net_to_refresh_list = []
259 vm_to_refresh_dict = {}
260 net_to_refresh_dict = {}
261 items_to_refresh = 0
262 while self.refresh_tasks:
263 task = self.refresh_tasks[0]
264 with self.task_lock:
265 if task['status'] == 'SUPERSEDED':
266 self.refresh_tasks.pop(0)
267 continue
268 if task['modified_at'] > now:
269 break
270 # task["status"] = "processing"
271 nb_processed += 1
272 self.refresh_tasks.pop(0)
273 if task["item"] == 'instance_vms':
274 if task["vim_id"] not in vm_to_refresh_dict:
275 vm_to_refresh_dict[task["vim_id"]] = [task]
276 vm_to_refresh_list.append(task["vim_id"])
277 else:
278 vm_to_refresh_dict[task["vim_id"]].append(task)
279 elif task["item"] == 'instance_nets':
280 if task["vim_id"] not in net_to_refresh_dict:
281 net_to_refresh_dict[task["vim_id"]] = [task]
282 net_to_refresh_list.append(task["vim_id"])
283 else:
284 net_to_refresh_dict[task["vim_id"]].append(task)
285 else:
286 task_id = task["instance_action_id"] + "." + str(task["task_index"])
287 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
288 items_to_refresh += 1
289 if items_to_refresh == 10:
290 break
291
292 if vm_to_refresh_list:
293 now = time.time()
294 try:
295 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
296 except vimconn.vimconnException as e:
297 # Mark all tasks at VIM_ERROR status
298 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
299 vim_dict = {}
300 for vim_id in vm_to_refresh_list:
301 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
302
303 for vim_id, vim_info in vim_dict.items():
304
305 # look for task
306 for task in vm_to_refresh_dict[vim_id]:
307 task_need_update = False
308 task_id = task["instance_action_id"] + "." + str(task["task_index"])
309 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
310
311 # check and update interfaces
312 task_warning_msg = ""
313 for interface in vim_info.get("interfaces", ()):
314 vim_interface_id = interface["vim_interface_id"]
315 if vim_interface_id not in task["extra"]["interfaces"]:
316 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
317 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
318 continue
319 task_interface = task["extra"]["interfaces"][vim_interface_id]
320 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
321 if task_vim_interface != interface:
322 # delete old port
323 if task_interface.get("sdn_port_id"):
324 try:
325 with self.db_lock:
326 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
327 task_interface["sdn_port_id"] = None
328 task_need_update = True
329 except ovimException as e:
330 error_text = "ovimException deleting external_port={}: {}".format(
331 task_interface["sdn_port_id"], e)
332 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
333 task_warning_msg += error_text
334 # TODO Set error_msg at instance_nets instead of instance VMs
335
336 # Create SDN port
337 sdn_net_id = task_interface.get("sdn_net_id")
338 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
339 sdn_port_name = sdn_net_id + "." + task["vim_id"]
340 sdn_port_name = sdn_port_name[:63]
341 try:
342 with self.db_lock:
343 sdn_port_id = self.ovim.new_external_port(
344 {"compute_node": interface["compute_node"],
345 "pci": interface["pci"],
346 "vlan": interface.get("vlan"),
347 "net_id": sdn_net_id,
348 "region": self.vim["config"]["datacenter_id"],
349 "name": sdn_port_name,
350 "mac": interface.get("mac_address")})
351 task_interface["sdn_port_id"] = sdn_port_id
352 task_need_update = True
353 except (ovimException, Exception) as e:
354 error_text = "ovimException creating new_external_port compute_node={}" \
355 " pci={} vlan={} {}".format(
356 interface["compute_node"],
357 interface["pci"],
358 interface.get("vlan"), e)
359 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
360 task_warning_msg += error_text
361 # TODO Set error_msg at instance_nets instead of instance VMs
362
363 with self.db_lock:
364 self.db.update_rows(
365 'instance_interfaces',
366 UPDATE={"mac_address": interface.get("mac_address"),
367 "ip_address": interface.get("ip_address"),
368 "vim_interface_id": interface.get("vim_interface_id"),
369 "vim_info": interface.get("vim_info"),
370 "sdn_port_id": task_interface.get("sdn_port_id"),
371 "compute_node": interface.get("compute_node"),
372 "pci": interface.get("pci"),
373 "vlan": interface.get("vlan")},
374 WHERE={'uuid': task_interface["iface_id"]})
375 task["vim_interfaces"][vim_interface_id] = interface
376
377 # check and update task and instance_vms database
378 vim_info_error_msg = None
379 if vim_info.get("error_msg"):
380 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
381 elif task_warning_msg:
382 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
383 task_vim_info = task.get("vim_info")
384 task_error_msg = task.get("error_msg")
385 task_vim_status = task["extra"].get("vim_status")
386 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
387 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
388 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
389 if vim_info.get("vim_info"):
390 temp_dict["vim_info"] = vim_info["vim_info"]
391 with self.db_lock:
392 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
393 task["extra"]["vim_status"] = vim_info["status"]
394 task["error_msg"] = vim_info_error_msg
395 if vim_info.get("vim_info"):
396 task["vim_info"] = vim_info["vim_info"]
397 task_need_update = True
398
399 if task_need_update:
400 with self.db_lock:
401 self.db.update_rows(
402 'vim_wim_actions',
403 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
404 "error_msg": task.get("error_msg"), "modified_at": now},
405 WHERE={'instance_action_id': task['instance_action_id'],
406 'task_index': task['task_index']})
407 if task["extra"].get("vim_status") == "BUILD":
408 self._insert_refresh(task, now + self.REFRESH_BUILD)
409 else:
410 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
411
412 if net_to_refresh_list:
413 now = time.time()
414 try:
415 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
416 except vimconn.vimconnException as e:
417 # Mark all tasks at VIM_ERROR status
418 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
419 vim_dict = {}
420 for vim_id in net_to_refresh_list:
421 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
422
423 for vim_id, vim_info in vim_dict.items():
424 # look for task
425 for task in net_to_refresh_dict[vim_id]:
426 task_id = task["instance_action_id"] + "." + str(task["task_index"])
427 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
428
429 task_vim_info = task.get("vim_info")
430 task_vim_status = task["extra"].get("vim_status")
431 task_error_msg = task.get("error_msg")
432 task_sdn_net_id = task["extra"].get("sdn_net_id")
433
434 vim_info_status = vim_info["status"]
435 vim_info_error_msg = vim_info.get("error_msg")
436 # get ovim status
437 if task_sdn_net_id:
438 try:
439 with self.db_lock:
440 sdn_net = self.ovim.show_network(task_sdn_net_id)
441 except (ovimException, Exception) as e:
442 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
443 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
444 sdn_net = {"status": "ERROR", "last_error": text_error}
445 if sdn_net["status"] == "ERROR":
446 if not vim_info_error_msg:
447 vim_info_error_msg = str(sdn_net.get("last_error"))
448 else:
449 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
450 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
451 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
452 vim_info_status = "ERROR"
453 elif sdn_net["status"] == "BUILD":
454 if vim_info_status == "ACTIVE":
455 vim_info_status = "BUILD"
456
457 # update database
458 if vim_info_error_msg:
459 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
460 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
461 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
462 task["extra"]["vim_status"] = vim_info_status
463 task["error_msg"] = vim_info_error_msg
464 if vim_info.get("vim_info"):
465 task["vim_info"] = vim_info["vim_info"]
466 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
467 if vim_info.get("vim_info"):
468 temp_dict["vim_info"] = vim_info["vim_info"]
469 with self.db_lock:
470 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
471 self.db.update_rows(
472 'vim_wim_actions',
473 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
474 "error_msg": task.get("error_msg"), "modified_at": now},
475 WHERE={'instance_action_id': task['instance_action_id'],
476 'task_index': task['task_index']})
477 if task["extra"].get("vim_status") == "BUILD":
478 self._insert_refresh(task, now + self.REFRESH_BUILD)
479 else:
480 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
481
482 return nb_processed
483
484 def _insert_refresh(self, task, threshold_time=None):
485 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
486 It is assumed that this is called inside this thread
487 """
488 if not self.vim:
489 return
490 if not threshold_time:
491 threshold_time = time.time()
492 task["modified_at"] = threshold_time
493 task_name = task["item"][9:] + "-" + task["action"]
494 task_id = task["instance_action_id"] + "." + str(task["task_index"])
495 for index in range(0, len(self.refresh_tasks)):
496 if self.refresh_tasks[index]["modified_at"] > threshold_time:
497 self.refresh_tasks.insert(index, task)
498 break
499 else:
500 index = len(self.refresh_tasks)
501 self.refresh_tasks.append(task)
502 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
503 task_id, task_name, task["modified_at"], index))
504
505 def _remove_refresh(self, task_name, vim_id):
506 """Remove a task with this name and vim_id from the list of refreshing elements.
507 It is assumed that this is called inside this thread outside _refres_elements method
508 Return True if self.refresh_list is modified, task is found
509 Return False if not found
510 """
511 index_to_delete = None
512 for index in range(0, len(self.refresh_tasks)):
513 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
514 index_to_delete = index
515 break
516 else:
517 return False
518 if not index_to_delete:
519 del self.refresh_tasks[index_to_delete]
520 return True
521
522 def _proccess_pending_tasks(self):
523 nb_created = 0
524 nb_processed = 0
525 while self.pending_tasks:
526 task = self.pending_tasks.pop(0)
527 nb_processed += 1
528 try:
529 # check if tasks that this depends on have been completed
530 dependency_not_completed = False
531 for task_index in task["extra"].get("depends_on", ()):
532 task_dependency = task["depends"].get("TASK-" + str(task_index))
533 if not task_dependency:
534 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
535 if not task_dependency:
536 raise VimThreadException(
537 "Cannot get depending net task trying to get depending task {}.{}".format(
538 task["instance_action_id"], task_index))
539 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
540 if task_dependency["status"] == "SCHEDULED":
541 dependency_not_completed = True
542 break
543 elif task_dependency["status"] == "FAILED":
544 raise VimThreadException(
545 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
546 task["action"], task["item"],
547 task["instance_action_id"], task["task_index"],
548 task_dependency["instance_action_id"], task_dependency["task_index"],
549 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
550 if dependency_not_completed:
551 # Move this task to the end.
552 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
553 if task["extra"]["tries"] <= 3:
554 self.pending_tasks.append(task)
555 continue
556 else:
557 raise VimThreadException(
558 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
559 "(task {}.{})".format(task["action"], task["item"],
560 task["instance_action_id"], task["task_index"],
561 task_dependency["instance_action_id"], task_dependency["task_index"],
562 task_dependency["action"], task_dependency["item"]))
563
564 if task["status"] == "SUPERSEDED":
565 # not needed to do anything but update database with the new status
566 result = True
567 database_update = None
568 elif not self.vim:
569 task["status"] = "ERROR"
570 task["error_msg"] = self.error_status
571 result = False
572 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
573 elif task["item"] == 'instance_vms':
574 if task["action"] == "CREATE":
575 result, database_update = self.new_vm(task)
576 nb_created += 1
577 elif task["action"] == "DELETE":
578 result, database_update = self.del_vm(task)
579 else:
580 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
581 elif task["item"] == 'instance_nets':
582 if task["action"] == "CREATE":
583 result, database_update = self.new_net(task)
584 nb_created += 1
585 elif task["action"] == "DELETE":
586 result, database_update = self.del_net(task)
587 elif task["action"] == "FIND":
588 result, database_update = self.get_net(task)
589 else:
590 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
591 elif task["item"] == 'instance_sfis':
592 if task["action"] == "CREATE":
593 result, database_update = self.new_sfi(task)
594 nb_created += 1
595 elif task["action"] == "DELETE":
596 result, database_update = self.del_sfi(task)
597 else:
598 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
599 elif task["item"] == 'instance_sfs':
600 if task["action"] == "CREATE":
601 result, database_update = self.new_sf(task)
602 nb_created += 1
603 elif task["action"] == "DELETE":
604 result, database_update = self.del_sf(task)
605 else:
606 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
607 elif task["item"] == 'instance_classifications':
608 if task["action"] == "CREATE":
609 result, database_update = self.new_classification(task)
610 nb_created += 1
611 elif task["action"] == "DELETE":
612 result, database_update = self.del_classification(task)
613 else:
614 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
615 elif task["item"] == 'instance_sfps':
616 if task["action"] == "CREATE":
617 result, database_update = self.new_sfp(task)
618 nb_created += 1
619 elif task["action"] == "DELETE":
620 result, database_update = self.del_sfp(task)
621 else:
622 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
623 else:
624 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
625 # TODO
626 except VimThreadException as e:
627 result = False
628 task["error_msg"] = str(e)
629 task["status"] = "FAILED"
630 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
631 if task["item"] == 'instance_vms':
632 database_update["vim_vm_id"] = None
633 elif task["item"] == 'instance_nets':
634 database_update["vim_net_id"] = None
635
636 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
637 'instance_classifications', 'instance_sfps']
638 if task["action"] == "DELETE":
639 action_key = task["item"] + task["item_id"]
640 del self.grouped_tasks[action_key]
641 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
642 if task["item"] not in no_refresh_tasks:
643 self._insert_refresh(task)
644
645 task_id = task["instance_action_id"] + "." + str(task["task_index"])
646 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
647 task_id, task["item"], task["action"], task["status"],
648 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
649 try:
650 now = time.time()
651 with self.db_lock:
652 self.db.update_rows(
653 table="vim_wim_actions",
654 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
655 "error_msg": task["error_msg"],
656 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
657 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
658 if result is not None:
659 self.db.update_rows(
660 table="instance_actions",
661 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
662 "modified_at": now},
663 WHERE={"uuid": task["instance_action_id"]})
664 if database_update:
665 self.db.update_rows(table=task["item"],
666 UPDATE=database_update,
667 WHERE={"uuid": task["item_id"]})
668 except db_base_Exception as e:
669 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
670
671 if nb_created == 10:
672 break
673 return nb_processed
674
675 def _insert_pending_tasks(self, vim_actions_list):
676 for task in vim_actions_list:
677 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
678 continue
679 item = task["item"]
680 item_id = task["item_id"]
681 action_key = item + item_id
682 if action_key not in self.grouped_tasks:
683 self.grouped_tasks[action_key] = []
684 task["params"] = None
685 task["depends"] = {}
686 if task["extra"]:
687 extra = yaml.load(task["extra"])
688 task["extra"] = extra
689 task["params"] = extra.get("params")
690 depends_on_list = extra.get("depends_on")
691 if depends_on_list:
692 for dependency_task in depends_on_list:
693 if isinstance(dependency_task, int):
694 index = dependency_task
695 else:
696 instance_action_id, _, task_id = dependency_task.rpartition(".")
697 if instance_action_id != task["instance_action_id"]:
698 continue
699 index = int(task_id)
700
701 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and \
702 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
703 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
704 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index]
705 if extra.get("interfaces"):
706 task["vim_interfaces"] = {}
707 else:
708 task["extra"] = {}
709 if "error_msg" not in task:
710 task["error_msg"] = None
711 if "vim_id" not in task:
712 task["vim_id"] = None
713
714 if task["action"] == "DELETE":
715 need_delete_action = False
716 for to_supersede in self.grouped_tasks.get(action_key, ()):
717 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
718 task["vim_id"] = to_supersede["vim_id"]
719 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
720 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
721 need_delete_action = True
722 task["vim_id"] = to_supersede["vim_id"]
723 if to_supersede["extra"].get("sdn_net_id"):
724 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
725 if to_supersede["extra"].get("interfaces"):
726 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
727 if to_supersede["extra"].get("created_items"):
728 if not task["extra"].get("created_items"):
729 task["extra"]["created_items"] = {}
730 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
731 # Mark task as SUPERSEDED.
732 # If task is in self.pending_tasks, it will be removed and database will be update
733 # If task is in self.refresh_tasks, it will be removed
734 to_supersede["status"] = "SUPERSEDED"
735 if not need_delete_action:
736 task["status"] = "SUPERSEDED"
737
738 self.grouped_tasks[action_key].append(task)
739 self.pending_tasks.append(task)
740 elif task["status"] == "SCHEDULED":
741 self.grouped_tasks[action_key].append(task)
742 self.pending_tasks.append(task)
743 elif task["action"] in ("CREATE", "FIND"):
744 self.grouped_tasks[action_key].append(task)
745 if task["status"] in ("DONE", "BUILD"):
746 self._insert_refresh(task)
747 # TODO add VM reset, get console, etc...
748 else:
749 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
750
751 def insert_task(self, task):
752 try:
753 self.task_queue.put(task, False)
754 return None
755 except Queue.Full:
756 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
757
758 def del_task(self, task):
759 with self.task_lock:
760 if task["status"] == "SCHEDULED":
761 task["status"] = "SUPERSEDED"
762 return True
763 else: # task["status"] == "processing"
764 self.task_lock.release()
765 return False
766
767 def run(self):
768 self.logger.debug("Starting")
769 while True:
770 self.get_vimconnector()
771 self.logger.debug("Vimconnector loaded")
772 self._reload_vim_actions()
773 reload_thread = False
774
775 while True:
776 try:
777 while not self.task_queue.empty():
778 task = self.task_queue.get()
779 if isinstance(task, list):
780 self._insert_pending_tasks(task)
781 elif isinstance(task, str):
782 if task == 'exit':
783 return 0
784 elif task == 'reload':
785 reload_thread = True
786 break
787 self.task_queue.task_done()
788 if reload_thread:
789 break
790 nb_processed = self._proccess_pending_tasks()
791 nb_processed += self._refres_elements()
792 if not nb_processed:
793 time.sleep(1)
794
795 except Exception as e:
796 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
797
798 self.logger.debug("Finishing")
799
800 def _look_for_task(self, instance_action_id, task_id):
801 """
802 Look for a concrete task at vim_actions database table
803 :param instance_action_id: The instance_action_id
804 :param task_id: Can have several formats:
805 <task index>: integer
806 TASK-<task index> :backward compatibility,
807 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
808 :return: Task dictionary or None if not found
809 """
810 if isinstance(task_id, int):
811 task_index = task_id
812 else:
813 if task_id.startswith("TASK-"):
814 task_id = task_id[5:]
815 ins_action_id, _, task_index = task_id.rpartition(".")
816 if ins_action_id:
817 instance_action_id = ins_action_id
818
819 with self.db_lock:
820 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
821 "task_index": task_index})
822 if not tasks:
823 return None
824 task = tasks[0]
825 task["params"] = None
826 task["depends"] = {}
827 if task["extra"]:
828 extra = yaml.load(task["extra"])
829 task["extra"] = extra
830 task["params"] = extra.get("params")
831 if extra.get("interfaces"):
832 task["vim_interfaces"] = {}
833 else:
834 task["extra"] = {}
835 return task
836
837 @staticmethod
838 def _format_vim_error_msg(error_text, max_length=1024):
839 if error_text and len(error_text) >= max_length:
840 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
841 return error_text
842
843 def new_vm(self, task):
844 task_id = task["instance_action_id"] + "." + str(task["task_index"])
845 try:
846 params = task["params"]
847 depends = task.get("depends")
848 net_list = params[5]
849 for net in net_list:
850 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
851 task_dependency = task["depends"].get(net["net_id"])
852 if not task_dependency:
853 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
854 if not task_dependency:
855 raise VimThreadException(
856 "Cannot get depending net task trying to get depending task {}.{}".format(
857 task["instance_action_id"], net["net_id"]))
858 network_id = task_dependency.get("vim_id")
859 if not network_id:
860 raise VimThreadException(
861 "Cannot create VM because depends on a network not created or found: " +
862 str(depends[net["net_id"]]["error_msg"]))
863 net["net_id"] = network_id
864 params_copy = deepcopy(params)
865 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
866
867 # fill task_interfaces. Look for snd_net_id at database for each interface
868 task_interfaces = {}
869 for iface in params_copy[5]:
870 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
871 with self.db_lock:
872 result = self.db.get_rows(
873 SELECT=('sdn_net_id', 'interface_id'),
874 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
875 WHERE={'ii.uuid': iface["uuid"]})
876 if result:
877 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
878 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
879 else:
880 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
881 iface["uuid"]), exc_info=True)
882
883 task["vim_info"] = {}
884 task["vim_interfaces"] = {}
885 task["extra"]["interfaces"] = task_interfaces
886 task["extra"]["created"] = True
887 task["extra"]["created_items"] = created_items
888 task["error_msg"] = None
889 task["status"] = "DONE"
890 task["vim_id"] = vim_vm_id
891 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
892 return True, instance_element_update
893
894 except (vimconn.vimconnException, VimThreadException) as e:
895 self.logger.error("task={} new-VM: {}".format(task_id, e))
896 error_text = self._format_vim_error_msg(str(e))
897 task["error_msg"] = error_text
898 task["status"] = "FAILED"
899 task["vim_id"] = None
900 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
901 return False, instance_element_update
902
903 def del_vm(self, task):
904 task_id = task["instance_action_id"] + "." + str(task["task_index"])
905 vm_vim_id = task["vim_id"]
906 interfaces = task["extra"].get("interfaces", ())
907 try:
908 for iface in interfaces.values():
909 if iface.get("sdn_port_id"):
910 try:
911 with self.db_lock:
912 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
913 except ovimException as e:
914 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
915 task_id, iface["sdn_port_id"], e), exc_info=True)
916 # TODO Set error_msg at instance_nets
917
918 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
919 task["status"] = "DONE"
920 task["error_msg"] = None
921 return True, None
922
923 except vimconn.vimconnException as e:
924 task["error_msg"] = self._format_vim_error_msg(str(e))
925 if isinstance(e, vimconn.vimconnNotFoundException):
926 # If not found mark as Done and fill error_msg
927 task["status"] = "DONE"
928 return True, None
929 task["status"] = "FAILED"
930 return False, None
931
932 def _get_net_internal(self, task, filter_param):
933 """
934 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
935 :param task: task for this find or find-or-create action
936 :param filter_param: parameters to send to the vimconnector
937 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
938 when network is not found or found more than one
939 """
940 vim_nets = self.vim.get_network_list(filter_param)
941 if not vim_nets:
942 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
943 elif len(vim_nets) > 1:
944 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
945 vim_net_id = vim_nets[0]["id"]
946
947 # Discover if this network is managed by a sdn controller
948 sdn_net_id = None
949 with self.db_lock:
950 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
951 WHERE={'vim_net_id': vim_net_id,
952 'datacenter_tenant_id': self.datacenter_tenant_id},
953 ORDER="instance_scenario_id")
954 if result:
955 sdn_net_id = result[0]['sdn_net_id']
956
957 task["status"] = "DONE"
958 task["extra"]["vim_info"] = {}
959 task["extra"]["created"] = False
960 task["extra"]["sdn_net_id"] = sdn_net_id
961 task["error_msg"] = None
962 task["vim_id"] = vim_net_id
963 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
964 "error_msg": None, "sdn_net_id": sdn_net_id}
965 return instance_element_update
966
967 def get_net(self, task):
968 task_id = task["instance_action_id"] + "." + str(task["task_index"])
969 try:
970 params = task["params"]
971 filter_param = params[0]
972 instance_element_update = self._get_net_internal(task, filter_param)
973 return True, instance_element_update
974
975 except (vimconn.vimconnException, VimThreadException) as e:
976 self.logger.error("task={} get-net: {}".format(task_id, e))
977 task["status"] = "FAILED"
978 task["vim_id"] = None
979 task["error_msg"] = self._format_vim_error_msg(str(e))
980 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
981 "error_msg": task["error_msg"]}
982 return False, instance_element_update
983
984 def new_net(self, task):
985 vim_net_id = None
986 sdn_net_id = None
987 task_id = task["instance_action_id"] + "." + str(task["task_index"])
988 action_text = ""
989 try:
990 # FIND
991 if task["extra"].get("find"):
992 action_text = "finding"
993 filter_param = task["extra"]["find"][0]
994 try:
995 instance_element_update = self._get_net_internal(task, filter_param)
996 return True, instance_element_update
997 except VimThreadExceptionNotFound:
998 pass
999 # CREATE
1000 params = task["params"]
1001 action_text = "creating VIM"
1002 vim_net_id = self.vim.new_network(*params[0:3])
1003
1004 net_name = params[0]
1005 net_type = params[1]
1006 wim_account_name = None
1007 if len(params) >= 4:
1008 wim_account_name = params[3]
1009
1010 sdn_controller = self.vim.config.get('sdn-controller')
1011 if sdn_controller and (net_type == "data" or net_type == "ptp"):
1012 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
1013
1014 vim_net = self.vim.get_network(vim_net_id)
1015 if vim_net.get('encapsulation') != 'vlan':
1016 raise vimconn.vimconnException(
1017 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
1018 net_name, net_type, vim_net['encapsulation']))
1019 network["vlan"] = vim_net.get('segmentation_id')
1020 action_text = "creating SDN"
1021 with self.db_lock:
1022 sdn_net_id = self.ovim.new_network(network)
1023
1024 if wim_account_name and self.vim.config["wim_external_ports"]:
1025 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1026 action_text = "attaching external port to ovim network"
1027 sdn_port_name = sdn_net_id + "." + task["vim_id"]
1028 sdn_port_name = sdn_port_name[:63]
1029 sdn_port_data = {
1030 "compute_node": "__WIM:" + wim_account_name[0:58],
1031 "pci": None,
1032 "vlan": network["vlan"],
1033 "net_id": sdn_net_id,
1034 "region": self.vim["config"]["datacenter_id"],
1035 "name": sdn_port_name,
1036 }
1037 try:
1038 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1039 except ovimException:
1040 sdn_port_data["compute_node"] = "__WIM"
1041 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1042 self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1043 sdn_net_id))
1044
1045 task["status"] = "DONE"
1046 task["extra"]["vim_info"] = {}
1047 task["extra"]["sdn_net_id"] = sdn_net_id
1048 task["extra"]["created"] = True
1049 task["error_msg"] = None
1050 task["vim_id"] = vim_net_id
1051 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1052 "created": True, "error_msg": None}
1053 return True, instance_element_update
1054 except (vimconn.vimconnException, ovimException) as e:
1055 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1056 task["status"] = "FAILED"
1057 task["vim_id"] = vim_net_id
1058 task["error_msg"] = self._format_vim_error_msg(str(e))
1059 task["extra"]["sdn_net_id"] = sdn_net_id
1060 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1061 "error_msg": task["error_msg"]}
1062 return False, instance_element_update
1063
1064 def del_net(self, task):
1065 net_vim_id = task["vim_id"]
1066 sdn_net_id = task["extra"].get("sdn_net_id")
1067 try:
1068 if sdn_net_id:
1069 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1070 # it was manually done using 'openmano vim-net-sdn-attach'
1071 with self.db_lock:
1072 port_list = self.ovim.get_ports(columns={'uuid'},
1073 filter={'name': 'external_port', 'net_id': sdn_net_id})
1074 for port in port_list:
1075 self.ovim.delete_port(port['uuid'], idempotent=True)
1076 self.ovim.delete_network(sdn_net_id, idempotent=True)
1077 if net_vim_id:
1078 self.vim.delete_network(net_vim_id)
1079 task["status"] = "DONE"
1080 task["error_msg"] = None
1081 return True, None
1082 except ovimException as e:
1083 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1084 "ports for net {}: {}".format(sdn_net_id, str(e)))
1085 except vimconn.vimconnException as e:
1086 task["error_msg"] = self._format_vim_error_msg(str(e))
1087 if isinstance(e, vimconn.vimconnNotFoundException):
1088 # If not found mark as Done and fill error_msg
1089 task["status"] = "DONE"
1090 return True, None
1091 task["status"] = "FAILED"
1092 return False, None
1093
1094 ## Service Function Instances
1095
1096 def new_sfi(self, task):
1097 vim_sfi_id = None
1098 try:
1099 # Waits for interfaces to be ready (avoids failure)
1100 time.sleep(1)
1101 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1102 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1103 error_text = ""
1104 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1105 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1106 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1107 ingress_vim_interface_id = None
1108 egress_vim_interface_id = None
1109 for vim_interface, interface_data in interfaces.iteritems():
1110 if interface_data.get("interface_id") == ingress_interface_id:
1111 ingress_vim_interface_id = vim_interface
1112 break
1113 if ingress_interface_id != egress_interface_id:
1114 for vim_interface, interface_data in interfaces.iteritems():
1115 if interface_data.get("interface_id") == egress_interface_id:
1116 egress_vim_interface_id = vim_interface
1117 break
1118 else:
1119 egress_vim_interface_id = ingress_vim_interface_id
1120 if not ingress_vim_interface_id or not egress_vim_interface_id:
1121 self.logger.error("Error creating Service Function Instance, Ingress: %s, Egress: %s",
1122 ingress_vim_interface_id, egress_vim_interface_id)
1123 return False, None
1124 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1125 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1126 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1127 ingress_port_id_list = [ingress_vim_interface_id]
1128 egress_port_id_list = [egress_vim_interface_id]
1129 name = "sfi-%s" % task["item_id"][:8]
1130 # By default no form of IETF SFC Encapsulation will be used
1131 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1132
1133 task["extra"]["created"] = True
1134 task["error_msg"] = None
1135 task["status"] = "DONE"
1136 task["vim_id"] = vim_sfi_id
1137 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1138 return True, instance_element_update
1139
1140 except (vimconn.vimconnException, VimThreadException) as e:
1141 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1142 error_text = self._format_vim_error_msg(str(e))
1143 task["error_msg"] = error_text
1144 task["status"] = "FAILED"
1145 task["vim_id"] = None
1146 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1147 return False, instance_element_update
1148
1149 def del_sfi(self, task):
1150 sfi_vim_id = task["vim_id"]
1151 try:
1152 self.vim.delete_sfi(sfi_vim_id)
1153 task["status"] = "DONE"
1154 task["error_msg"] = None
1155 return True, None
1156
1157 except vimconn.vimconnException as e:
1158 task["error_msg"] = self._format_vim_error_msg(str(e))
1159 if isinstance(e, vimconn.vimconnNotFoundException):
1160 # If not found mark as Done and fill error_msg
1161 task["status"] = "DONE"
1162 return True, None
1163 task["status"] = "FAILED"
1164 return False, None
1165
1166 def new_sf(self, task):
1167 vim_sf_id = None
1168 try:
1169 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1170 error_text = ""
1171 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1172 # sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1173 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1174 sfi_id_list = []
1175 for sfi in sfis:
1176 sfi_id_list.append(sfi.get("vim_id"))
1177 name = "sf-%s" % task["item_id"][:8]
1178 # By default no form of IETF SFC Encapsulation will be used
1179 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1180
1181 task["extra"]["created"] = True
1182 task["error_msg"] = None
1183 task["status"] = "DONE"
1184 task["vim_id"] = vim_sf_id
1185 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1186 return True, instance_element_update
1187
1188 except (vimconn.vimconnException, VimThreadException) as e:
1189 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1190 error_text = self._format_vim_error_msg(str(e))
1191 task["error_msg"] = error_text
1192 task["status"] = "FAILED"
1193 task["vim_id"] = None
1194 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1195 return False, instance_element_update
1196
1197 def del_sf(self, task):
1198 sf_vim_id = task["vim_id"]
1199 try:
1200 self.vim.delete_sf(sf_vim_id)
1201 task["status"] = "DONE"
1202 task["error_msg"] = None
1203 return True, None
1204
1205 except vimconn.vimconnException as e:
1206 task["error_msg"] = self._format_vim_error_msg(str(e))
1207 if isinstance(e, vimconn.vimconnNotFoundException):
1208 # If not found mark as Done and fill error_msg
1209 task["status"] = "DONE"
1210 return True, None
1211 task["status"] = "FAILED"
1212 return False, None
1213
1214 def new_classification(self, task):
1215 vim_classification_id = None
1216 try:
1217 params = task["params"]
1218 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1219 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1220 error_text = ""
1221 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1222 # Bear in mind that different VIM connectors might support Classifications differently.
1223 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1224 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1225 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1226 # using the IPv4 flow classifier.
1227 name = "c-%s" % task["item_id"][:8]
1228 # if not CIDR is given for the IP addresses, add /32:
1229 ip_proto = int(params.get("ip_proto"))
1230 source_ip = params.get("source_ip")
1231 destination_ip = params.get("destination_ip")
1232 if ip_proto == 1:
1233 ip_proto = 'icmp'
1234 elif ip_proto == 6:
1235 ip_proto = 'tcp'
1236 elif ip_proto == 17:
1237 ip_proto = 'udp'
1238 if '/' not in source_ip:
1239 source_ip += '/32'
1240 if '/' not in destination_ip:
1241 destination_ip += '/32'
1242 definition = {
1243 "logical_source_port": interfaces[0],
1244 "protocol": ip_proto,
1245 "source_ip_prefix": source_ip,
1246 "destination_ip_prefix": destination_ip,
1247 "source_port_range_min": params.get("source_port"),
1248 "source_port_range_max": params.get("source_port"),
1249 "destination_port_range_min": params.get("destination_port"),
1250 "destination_port_range_max": params.get("destination_port"),
1251 }
1252
1253 vim_classification_id = self.vim.new_classification(
1254 name, 'legacy_flow_classifier', definition)
1255
1256 task["extra"]["created"] = True
1257 task["error_msg"] = None
1258 task["status"] = "DONE"
1259 task["vim_id"] = vim_classification_id
1260 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1261 return True, instance_element_update
1262
1263 except (vimconn.vimconnException, VimThreadException) as e:
1264 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1265 error_text = self._format_vim_error_msg(str(e))
1266 task["error_msg"] = error_text
1267 task["status"] = "FAILED"
1268 task["vim_id"] = None
1269 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1270 return False, instance_element_update
1271
1272 def del_classification(self, task):
1273 classification_vim_id = task["vim_id"]
1274 try:
1275 self.vim.delete_classification(classification_vim_id)
1276 task["status"] = "DONE"
1277 task["error_msg"] = None
1278 return True, None
1279
1280 except vimconn.vimconnException as e:
1281 task["error_msg"] = self._format_vim_error_msg(str(e))
1282 if isinstance(e, vimconn.vimconnNotFoundException):
1283 # If not found mark as Done and fill error_msg
1284 task["status"] = "DONE"
1285 return True, None
1286 task["status"] = "FAILED"
1287 return False, None
1288
1289 def new_sfp(self, task):
1290 vim_sfp_id = None
1291 try:
1292 params = task["params"]
1293 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1294 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in task.get("extra").get("depends_on")]
1295 error_text = ""
1296 sf_id_list = []
1297 classification_id_list = []
1298 for dep in depending_tasks:
1299 vim_id = dep.get("vim_id")
1300 resource = dep.get("item")
1301 if resource == "instance_sfs":
1302 sf_id_list.append(vim_id)
1303 elif resource == "instance_classifications":
1304 classification_id_list.append(vim_id)
1305
1306 name = "sfp-%s" % task["item_id"][:8]
1307 # By default no form of IETF SFC Encapsulation will be used
1308 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1309
1310 task["extra"]["created"] = True
1311 task["error_msg"] = None
1312 task["status"] = "DONE"
1313 task["vim_id"] = vim_sfp_id
1314 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1315 return True, instance_element_update
1316
1317 except (vimconn.vimconnException, VimThreadException) as e:
1318 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1319 error_text = self._format_vim_error_msg(str(e))
1320 task["error_msg"] = error_text
1321 task["status"] = "FAILED"
1322 task["vim_id"] = None
1323 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1324 return False, instance_element_update
1325 return
1326
1327 def del_sfp(self, task):
1328 sfp_vim_id = task["vim_id"]
1329 try:
1330 self.vim.delete_sfp(sfp_vim_id)
1331 task["status"] = "DONE"
1332 task["error_msg"] = None
1333 return True, None
1334
1335 except vimconn.vimconnException as e:
1336 task["error_msg"] = self._format_vim_error_msg(str(e))
1337 if isinstance(e, vimconn.vimconnNotFoundException):
1338 # If not found mark as Done and fill error_msg
1339 task["status"] = "DONE"
1340 return True, None
1341 task["status"] = "FAILED"
1342 return False, None