f7bc40ba0943bb7f086e12eda16c6aaaea36fd91
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 can contain an int (single index on the same instance-action) or str (compete action ID)
41 sdn_net_id: used for net.
42 tries:
43 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
44 iface_id: uuid of intance_interfaces
45 sdn_port_id:
46 sdn_net_id:
47 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
48 created: False if the VIM element is not created by other actions, and it should not be deleted
49 vim_status: VIM status of the element. Stored also at database in the instance_XXX
50 M depends: dict with task_index(from depends_on) to task class
51 M params: same as extra[params] but with the resolved dependencies
52 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_wim_actions
53 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_wim_actions
54 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
55 MD created_at: task creation time
56 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
57
58 """
59
60 import threading
61 import time
62 import Queue
63 import logging
64 import vimconn
65 import vimconn_openvim
66 import vimconn_aws
67 import vimconn_opennebula
68 import vimconn_openstack
69 import vimconn_vmware
70 import yaml
71 from db_base import db_base_Exception
72 from lib_osm_openvim.ovim import ovimException
73 from copy import deepcopy
74
75 __author__ = "Alfonso Tierno, Pablo Montes"
76 __date__ = "$28-Sep-2017 12:07:15$"
77
78 vim_module = {
79 "openvim": vimconn_openvim,
80 "aws": vimconn_aws,
81 "opennebula": vimconn_opennebula,
82 "openstack": vimconn_openstack,
83 "vmware": vimconn_vmware,
84 }
85
86
87 def is_task_id(task_id):
88 return task_id.startswith("TASK-")
89
90
91 class VimThreadException(Exception):
92 pass
93
94
95 class VimThreadExceptionNotFound(VimThreadException):
96 pass
97
98
99 class vim_thread(threading.Thread):
100 REFRESH_BUILD = 5 # 5 seconds
101 REFRESH_ACTIVE = 60 # 1 minute
102
103 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
104 db=None, db_lock=None, ovim=None):
105 """Init a thread.
106 Arguments:
107 'id' number of thead
108 'name' name of thread
109 'host','user': host ip or name to manage and user
110 'db', 'db_lock': database class and lock to use it in exclusion
111 """
112 threading.Thread.__init__(self)
113 self.vim = None
114 self.error_status = None
115 self.datacenter_name = datacenter_name
116 self.datacenter_tenant_id = datacenter_tenant_id
117 self.ovim = ovim
118 if not name:
119 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
120 else:
121 self.name = name
122 self.vim_persistent_info = {}
123
124 self.logger = logging.getLogger('openmano.vim.' + self.name)
125 self.db = db
126 self.db_lock = db_lock
127
128 self.task_lock = task_lock
129 self.task_queue = Queue.Queue(2000)
130
131 self.refresh_tasks = []
132 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
133
134 self.pending_tasks = []
135 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
136
137 self.grouped_tasks = {}
138 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
139 <item><item_id>:
140 - <task1> # e.g. CREATE task
141 <task2> # e.g. DELETE task
142 """
143
144 def get_vimconnector(self):
145 try:
146 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
147 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
148 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
149 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
150 'user', 'passwd', 'dt.config as dt_config')
151 where_ = {"dt.uuid": self.datacenter_tenant_id}
152 with self.db_lock:
153 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
154 vim = vims[0]
155 vim_config = {}
156 if vim["config"]:
157 vim_config.update(yaml.load(vim["config"]))
158 if vim["dt_config"]:
159 vim_config.update(yaml.load(vim["dt_config"]))
160 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
161 vim_config['datacenter_id'] = vim.get('datacenter_id')
162
163 self.vim = vim_module[vim["type"]].vimconnector(
164 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
165 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
166 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
167 user=vim['user'], passwd=vim['passwd'],
168 config=vim_config, persistent_info=self.vim_persistent_info
169 )
170 self.error_status = None
171 except Exception as e:
172 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
173 self.vim = None
174 self.error_status = "Error loading vimconnector: {}".format(e)
175
176 def _reload_vim_actions(self):
177 """
178 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
179 :return: None
180 """
181 try:
182 action_completed = False
183 task_list = []
184 old_action_key = None
185
186 old_item_id = ""
187 old_item = ""
188 old_created_at = 0.0
189 database_limit = 200
190 while True:
191 # get 200 (database_limit) entries each time
192 with self.db_lock:
193 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
194 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
195 "item_id>=": old_item_id},
196 ORDER_BY=("item_id", "item", "created_at",),
197 LIMIT=database_limit)
198 for task in vim_actions:
199 item = task["item"]
200 item_id = task["item_id"]
201
202 # skip the first entries that are already processed in the previous pool of 200
203 if old_item_id:
204 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
205 old_item_id = False # next one will be a new un-processed task
206 continue
207
208 action_key = item + item_id
209 if old_action_key != action_key:
210 if not action_completed and task_list:
211 # This will fill needed task parameters into memory, and insert the task if needed in
212 # self.pending_tasks or self.refresh_tasks
213 try:
214 self._insert_pending_tasks(task_list)
215 except Exception as e:
216 self.logger.critical(
217 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
218 exc_info=True)
219 task_list = []
220 old_action_key = action_key
221 action_completed = False
222 elif action_completed:
223 continue
224
225 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
226 task_list.append(task)
227 elif task["action"] == "DELETE":
228 # action completed because deleted and status is not SCHEDULED. Not needed anything
229 action_completed = True
230 if len(vim_actions) == database_limit:
231 # update variables for get the next database iteration
232 old_item_id = item_id
233 old_item = item
234 old_created_at = task["created_at"]
235 else:
236 break
237 # Last actions group need to be inserted too
238 if not action_completed and task_list:
239 try:
240 self._insert_pending_tasks(task_list)
241 except Exception as e:
242 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
243 exc_info=True)
244 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
245 len(self.pending_tasks), len(self.refresh_tasks)))
246 except Exception as e:
247 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
248
249 def _refres_elements(self):
250 """Call VIM to get VMs and networks status until 10 elements"""
251 now = time.time()
252 nb_processed = 0
253 vm_to_refresh_list = []
254 net_to_refresh_list = []
255 vm_to_refresh_dict = {}
256 net_to_refresh_dict = {}
257 items_to_refresh = 0
258 while self.refresh_tasks:
259 task = self.refresh_tasks[0]
260 with self.task_lock:
261 if task['status'] == 'SUPERSEDED':
262 self.refresh_tasks.pop(0)
263 continue
264 if task['modified_at'] > now:
265 break
266 # task["status"] = "processing"
267 nb_processed += 1
268 self.refresh_tasks.pop(0)
269 if task["item"] == 'instance_vms':
270 if task["vim_id"] not in vm_to_refresh_dict:
271 vm_to_refresh_dict[task["vim_id"]] = [task]
272 vm_to_refresh_list.append(task["vim_id"])
273 else:
274 vm_to_refresh_dict[task["vim_id"]].append(task)
275 elif task["item"] == 'instance_nets':
276 if task["vim_id"] not in net_to_refresh_dict:
277 net_to_refresh_dict[task["vim_id"]] = [task]
278 net_to_refresh_list.append(task["vim_id"])
279 else:
280 net_to_refresh_dict[task["vim_id"]].append(task)
281 else:
282 task_id = task["instance_action_id"] + "." + str(task["task_index"])
283 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
284 items_to_refresh += 1
285 if items_to_refresh == 10:
286 break
287
288 if vm_to_refresh_list:
289 now = time.time()
290 try:
291 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
292 except vimconn.vimconnException as e:
293 # Mark all tasks at VIM_ERROR status
294 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
295 vim_dict = {}
296 for vim_id in vm_to_refresh_list:
297 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
298
299 for vim_id, vim_info in vim_dict.items():
300
301 # look for task
302 for task in vm_to_refresh_dict[vim_id]:
303 task_need_update = False
304 task_id = task["instance_action_id"] + "." + str(task["task_index"])
305 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
306
307 # check and update interfaces
308 task_warning_msg = ""
309 for interface in vim_info.get("interfaces", ()):
310 vim_interface_id = interface["vim_interface_id"]
311 if vim_interface_id not in task["extra"]["interfaces"]:
312 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
313 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
314 continue
315 task_interface = task["extra"]["interfaces"][vim_interface_id]
316 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
317 if task_vim_interface != interface:
318 # delete old port
319 if task_interface.get("sdn_port_id"):
320 try:
321 with self.db_lock:
322 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
323 task_interface["sdn_port_id"] = None
324 task_need_update = True
325 except ovimException as e:
326 error_text = "ovimException deleting external_port={}: {}".format(
327 task_interface["sdn_port_id"], e)
328 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
329 task_warning_msg += error_text
330 # TODO Set error_msg at instance_nets instead of instance VMs
331
332 # Create SDN port
333 sdn_net_id = task_interface.get("sdn_net_id")
334 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
335 sdn_port_name = sdn_net_id + "." + task["vim_id"]
336 sdn_port_name = sdn_port_name[:63]
337 try:
338 with self.db_lock:
339 sdn_port_id = self.ovim.new_external_port(
340 {"compute_node": interface["compute_node"],
341 "pci": interface["pci"],
342 "vlan": interface.get("vlan"),
343 "net_id": sdn_net_id,
344 "region": self.vim["config"]["datacenter_id"],
345 "name": sdn_port_name,
346 "mac": interface.get("mac_address")})
347 task_interface["sdn_port_id"] = sdn_port_id
348 task_need_update = True
349 except (ovimException, Exception) as e:
350 error_text = "ovimException creating new_external_port compute_node={}" \
351 " pci={} vlan={} {}".format(
352 interface["compute_node"],
353 interface["pci"],
354 interface.get("vlan"), e)
355 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
356 task_warning_msg += error_text
357 # TODO Set error_msg at instance_nets instead of instance VMs
358
359 with self.db_lock:
360 self.db.update_rows(
361 'instance_interfaces',
362 UPDATE={"mac_address": interface.get("mac_address"),
363 "ip_address": interface.get("ip_address"),
364 "vim_interface_id": interface.get("vim_interface_id"),
365 "vim_info": interface.get("vim_info"),
366 "sdn_port_id": task_interface.get("sdn_port_id"),
367 "compute_node": interface.get("compute_node"),
368 "pci": interface.get("pci"),
369 "vlan": interface.get("vlan")},
370 WHERE={'uuid': task_interface["iface_id"]})
371 task["vim_interfaces"][vim_interface_id] = interface
372
373 # check and update task and instance_vms database
374 vim_info_error_msg = None
375 if vim_info.get("error_msg"):
376 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
377 elif task_warning_msg:
378 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
379 task_vim_info = task.get("vim_info")
380 task_error_msg = task.get("error_msg")
381 task_vim_status = task["extra"].get("vim_status")
382 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
383 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
384 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
385 if vim_info.get("vim_info"):
386 temp_dict["vim_info"] = vim_info["vim_info"]
387 with self.db_lock:
388 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
389 task["extra"]["vim_status"] = vim_info["status"]
390 task["error_msg"] = vim_info_error_msg
391 if vim_info.get("vim_info"):
392 task["vim_info"] = vim_info["vim_info"]
393 task_need_update = True
394
395 if task_need_update:
396 with self.db_lock:
397 self.db.update_rows(
398 'vim_wim_actions',
399 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
400 "error_msg": task.get("error_msg"), "modified_at": now},
401 WHERE={'instance_action_id': task['instance_action_id'],
402 'task_index': task['task_index']})
403 if task["extra"].get("vim_status") == "BUILD":
404 self._insert_refresh(task, now + self.REFRESH_BUILD)
405 else:
406 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
407
408 if net_to_refresh_list:
409 now = time.time()
410 try:
411 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
412 except vimconn.vimconnException as e:
413 # Mark all tasks at VIM_ERROR status
414 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
415 vim_dict = {}
416 for vim_id in net_to_refresh_list:
417 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
418
419 for vim_id, vim_info in vim_dict.items():
420 # look for task
421 for task in net_to_refresh_dict[vim_id]:
422 task_id = task["instance_action_id"] + "." + str(task["task_index"])
423 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
424
425 task_vim_info = task.get("vim_info")
426 task_vim_status = task["extra"].get("vim_status")
427 task_error_msg = task.get("error_msg")
428 task_sdn_net_id = task["extra"].get("sdn_net_id")
429
430 vim_info_status = vim_info["status"]
431 vim_info_error_msg = vim_info.get("error_msg")
432 # get ovim status
433 if task_sdn_net_id:
434 try:
435 with self.db_lock:
436 sdn_net = self.ovim.show_network(task_sdn_net_id)
437 except (ovimException, Exception) as e:
438 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
439 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
440 sdn_net = {"status": "ERROR", "last_error": text_error}
441 if sdn_net["status"] == "ERROR":
442 if not vim_info_error_msg:
443 vim_info_error_msg = str(sdn_net.get("last_error"))
444 else:
445 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
446 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
447 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
448 vim_info_status = "ERROR"
449 elif sdn_net["status"] == "BUILD":
450 if vim_info_status == "ACTIVE":
451 vim_info_status = "BUILD"
452
453 # update database
454 if vim_info_error_msg:
455 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
456 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
457 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
458 task["extra"]["vim_status"] = vim_info_status
459 task["error_msg"] = vim_info_error_msg
460 if vim_info.get("vim_info"):
461 task["vim_info"] = vim_info["vim_info"]
462 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
463 if vim_info.get("vim_info"):
464 temp_dict["vim_info"] = vim_info["vim_info"]
465 with self.db_lock:
466 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
467 self.db.update_rows(
468 'vim_wim_actions',
469 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
470 "error_msg": task.get("error_msg"), "modified_at": now},
471 WHERE={'instance_action_id': task['instance_action_id'],
472 'task_index': task['task_index']})
473 if task["extra"].get("vim_status") == "BUILD":
474 self._insert_refresh(task, now + self.REFRESH_BUILD)
475 else:
476 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
477
478 return nb_processed
479
480 def _insert_refresh(self, task, threshold_time=None):
481 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
482 It is assumed that this is called inside this thread
483 """
484 if not self.vim:
485 return
486 if not threshold_time:
487 threshold_time = time.time()
488 task["modified_at"] = threshold_time
489 task_name = task["item"][9:] + "-" + task["action"]
490 task_id = task["instance_action_id"] + "." + str(task["task_index"])
491 for index in range(0, len(self.refresh_tasks)):
492 if self.refresh_tasks[index]["modified_at"] > threshold_time:
493 self.refresh_tasks.insert(index, task)
494 break
495 else:
496 index = len(self.refresh_tasks)
497 self.refresh_tasks.append(task)
498 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
499 task_id, task_name, task["modified_at"], index))
500
501 def _remove_refresh(self, task_name, vim_id):
502 """Remove a task with this name and vim_id from the list of refreshing elements.
503 It is assumed that this is called inside this thread outside _refres_elements method
504 Return True if self.refresh_list is modified, task is found
505 Return False if not found
506 """
507 index_to_delete = None
508 for index in range(0, len(self.refresh_tasks)):
509 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
510 index_to_delete = index
511 break
512 else:
513 return False
514 if not index_to_delete:
515 del self.refresh_tasks[index_to_delete]
516 return True
517
518 def _proccess_pending_tasks(self):
519 nb_created = 0
520 nb_processed = 0
521 while self.pending_tasks:
522 task = self.pending_tasks.pop(0)
523 nb_processed += 1
524 try:
525 # check if tasks that this depends on have been completed
526 dependency_not_completed = False
527 for task_index in task["extra"].get("depends_on", ()):
528 task_dependency = task["depends"].get("TASK-" + str(task_index))
529 if not task_dependency:
530 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
531 if not task_dependency:
532 raise VimThreadException(
533 "Cannot get depending net task trying to get depending task {}.{}".format(
534 task["instance_action_id"], task_index))
535 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
536 if task_dependency["status"] == "SCHEDULED":
537 dependency_not_completed = True
538 break
539 elif task_dependency["status"] == "FAILED":
540 raise VimThreadException(
541 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
542 task["action"], task["item"],
543 task["instance_action_id"], task["task_index"],
544 task_dependency["instance_action_id"], task_dependency["task_index"],
545 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
546 if dependency_not_completed:
547 # Move this task to the end.
548 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
549 if task["extra"]["tries"] <= 3:
550 self.pending_tasks.append(task)
551 continue
552 else:
553 raise VimThreadException(
554 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
555 "(task {}.{})".format(task["action"], task["item"],
556 task["instance_action_id"], task["task_index"],
557 task_dependency["instance_action_id"], task_dependency["task_index"],
558 task_dependency["action"], task_dependency["item"]))
559
560 if task["status"] == "SUPERSEDED":
561 # not needed to do anything but update database with the new status
562 result = True
563 database_update = None
564 elif not self.vim:
565 task["status"] = "ERROR"
566 task["error_msg"] = self.error_status
567 result = False
568 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
569 elif task["item"] == 'instance_vms':
570 if task["action"] == "CREATE":
571 result, database_update = self.new_vm(task)
572 nb_created += 1
573 elif task["action"] == "DELETE":
574 result, database_update = self.del_vm(task)
575 else:
576 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
577 elif task["item"] == 'instance_nets':
578 if task["action"] == "CREATE":
579 result, database_update = self.new_net(task)
580 nb_created += 1
581 elif task["action"] == "DELETE":
582 result, database_update = self.del_net(task)
583 elif task["action"] == "FIND":
584 result, database_update = self.get_net(task)
585 else:
586 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
587 elif task["item"] == 'instance_sfis':
588 if task["action"] == "CREATE":
589 result, database_update = self.new_sfi(task)
590 nb_created += 1
591 elif task["action"] == "DELETE":
592 result, database_update = self.del_sfi(task)
593 else:
594 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
595 elif task["item"] == 'instance_sfs':
596 if task["action"] == "CREATE":
597 result, database_update = self.new_sf(task)
598 nb_created += 1
599 elif task["action"] == "DELETE":
600 result, database_update = self.del_sf(task)
601 else:
602 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
603 elif task["item"] == 'instance_classifications':
604 if task["action"] == "CREATE":
605 result, database_update = self.new_classification(task)
606 nb_created += 1
607 elif task["action"] == "DELETE":
608 result, database_update = self.del_classification(task)
609 else:
610 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
611 elif task["item"] == 'instance_sfps':
612 if task["action"] == "CREATE":
613 result, database_update = self.new_sfp(task)
614 nb_created += 1
615 elif task["action"] == "DELETE":
616 result, database_update = self.del_sfp(task)
617 else:
618 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
619 else:
620 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
621 # TODO
622 except VimThreadException as e:
623 result = False
624 task["error_msg"] = str(e)
625 task["status"] = "FAILED"
626 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
627 if task["item"] == 'instance_vms':
628 database_update["vim_vm_id"] = None
629 elif task["item"] == 'instance_nets':
630 database_update["vim_net_id"] = None
631
632 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
633 'instance_classifications', 'instance_sfps']
634 if task["action"] == "DELETE":
635 action_key = task["item"] + task["item_id"]
636 del self.grouped_tasks[action_key]
637 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
638 if task["item"] not in no_refresh_tasks:
639 self._insert_refresh(task)
640
641 task_id = task["instance_action_id"] + "." + str(task["task_index"])
642 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
643 task_id, task["item"], task["action"], task["status"],
644 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
645 try:
646 now = time.time()
647 with self.db_lock:
648 self.db.update_rows(
649 table="vim_wim_actions",
650 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
651 "error_msg": task["error_msg"],
652 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
653 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
654 if result is not None:
655 self.db.update_rows(
656 table="instance_actions",
657 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
658 "modified_at": now},
659 WHERE={"uuid": task["instance_action_id"]})
660 if database_update:
661 self.db.update_rows(table=task["item"],
662 UPDATE=database_update,
663 WHERE={"uuid": task["item_id"]})
664 except db_base_Exception as e:
665 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
666
667 if nb_created == 10:
668 break
669 return nb_processed
670
671 def _insert_pending_tasks(self, vim_actions_list):
672 for task in vim_actions_list:
673 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
674 continue
675 item = task["item"]
676 item_id = task["item_id"]
677 action_key = item + item_id
678 if action_key not in self.grouped_tasks:
679 self.grouped_tasks[action_key] = []
680 task["params"] = None
681 task["depends"] = {}
682 if task["extra"]:
683 extra = yaml.load(task["extra"])
684 task["extra"] = extra
685 task["params"] = extra.get("params")
686 depends_on_list = extra.get("depends_on")
687 if depends_on_list:
688 for dependency_task in depends_on_list:
689 if isinstance(dependency_task, int):
690 index = dependency_task
691 else:
692 instance_action_id, _, task_id = dependency_task.rpartition(".")
693 if instance_action_id != task["instance_action_id"]:
694 continue
695 index = int(task_id)
696
697 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and \
698 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
699 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
700 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index]
701 if extra.get("interfaces"):
702 task["vim_interfaces"] = {}
703 else:
704 task["extra"] = {}
705 if "error_msg" not in task:
706 task["error_msg"] = None
707 if "vim_id" not in task:
708 task["vim_id"] = None
709
710 if task["action"] == "DELETE":
711 need_delete_action = False
712 for to_supersede in self.grouped_tasks.get(action_key, ()):
713 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
714 task["vim_id"] = to_supersede["vim_id"]
715 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
716 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
717 need_delete_action = True
718 task["vim_id"] = to_supersede["vim_id"]
719 if to_supersede["extra"].get("sdn_net_id"):
720 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
721 if to_supersede["extra"].get("interfaces"):
722 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
723 if to_supersede["extra"].get("created_items"):
724 if not task["extra"].get("created_items"):
725 task["extra"]["created_items"] = {}
726 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
727 # Mark task as SUPERSEDED.
728 # If task is in self.pending_tasks, it will be removed and database will be update
729 # If task is in self.refresh_tasks, it will be removed
730 to_supersede["status"] = "SUPERSEDED"
731 if not need_delete_action:
732 task["status"] = "SUPERSEDED"
733
734 self.grouped_tasks[action_key].append(task)
735 self.pending_tasks.append(task)
736 elif task["status"] == "SCHEDULED":
737 self.grouped_tasks[action_key].append(task)
738 self.pending_tasks.append(task)
739 elif task["action"] in ("CREATE", "FIND"):
740 self.grouped_tasks[action_key].append(task)
741 if task["status"] in ("DONE", "BUILD"):
742 self._insert_refresh(task)
743 # TODO add VM reset, get console, etc...
744 else:
745 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
746
747 def insert_task(self, task):
748 try:
749 self.task_queue.put(task, False)
750 return None
751 except Queue.Full:
752 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
753
754 def del_task(self, task):
755 with self.task_lock:
756 if task["status"] == "SCHEDULED":
757 task["status"] = "SUPERSEDED"
758 return True
759 else: # task["status"] == "processing"
760 self.task_lock.release()
761 return False
762
763 def run(self):
764 self.logger.debug("Starting")
765 while True:
766 self.get_vimconnector()
767 self.logger.debug("Vimconnector loaded")
768 self._reload_vim_actions()
769 reload_thread = False
770
771 while True:
772 try:
773 while not self.task_queue.empty():
774 task = self.task_queue.get()
775 if isinstance(task, list):
776 self._insert_pending_tasks(task)
777 elif isinstance(task, str):
778 if task == 'exit':
779 return 0
780 elif task == 'reload':
781 reload_thread = True
782 break
783 self.task_queue.task_done()
784 if reload_thread:
785 break
786 nb_processed = self._proccess_pending_tasks()
787 nb_processed += self._refres_elements()
788 if not nb_processed:
789 time.sleep(1)
790
791 except Exception as e:
792 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
793
794 self.logger.debug("Finishing")
795
796 def _look_for_task(self, instance_action_id, task_id):
797 """
798 Look for a concrete task at vim_actions database table
799 :param instance_action_id: The instance_action_id
800 :param task_id: Can have several formats:
801 <task index>: integer
802 TASK-<task index> :backward compatibility,
803 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
804 :return: Task dictionary or None if not found
805 """
806 if isinstance(task_id, int):
807 task_index = task_id
808 else:
809 if task_id.startswith("TASK-"):
810 task_id = task_id[5:]
811 ins_action_id, _, task_index = task_id.rpartition(".")
812 if ins_action_id:
813 instance_action_id = ins_action_id
814
815 with self.db_lock:
816 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
817 "task_index": task_index})
818 if not tasks:
819 return None
820 task = tasks[0]
821 task["params"] = None
822 task["depends"] = {}
823 if task["extra"]:
824 extra = yaml.load(task["extra"])
825 task["extra"] = extra
826 task["params"] = extra.get("params")
827 if extra.get("interfaces"):
828 task["vim_interfaces"] = {}
829 else:
830 task["extra"] = {}
831 return task
832
833 @staticmethod
834 def _format_vim_error_msg(error_text, max_length=1024):
835 if error_text and len(error_text) >= max_length:
836 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
837 return error_text
838
839 def new_vm(self, task):
840 task_id = task["instance_action_id"] + "." + str(task["task_index"])
841 try:
842 params = task["params"]
843 depends = task.get("depends")
844 net_list = params[5]
845 for net in net_list:
846 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
847 task_dependency = task["depends"].get(net["net_id"])
848 if not task_dependency:
849 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
850 if not task_dependency:
851 raise VimThreadException(
852 "Cannot get depending net task trying to get depending task {}.{}".format(
853 task["instance_action_id"], net["net_id"]))
854 network_id = task_dependency.get("vim_id")
855 if not network_id:
856 raise VimThreadException(
857 "Cannot create VM because depends on a network not created or found: " +
858 str(depends[net["net_id"]]["error_msg"]))
859 net["net_id"] = network_id
860 params_copy = deepcopy(params)
861 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
862
863 # fill task_interfaces. Look for snd_net_id at database for each interface
864 task_interfaces = {}
865 for iface in params_copy[5]:
866 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
867 with self.db_lock:
868 result = self.db.get_rows(
869 SELECT=('sdn_net_id', 'interface_id'),
870 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
871 WHERE={'ii.uuid': iface["uuid"]})
872 if result:
873 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
874 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
875 else:
876 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
877 iface["uuid"]), exc_info=True)
878
879 task["vim_info"] = {}
880 task["vim_interfaces"] = {}
881 task["extra"]["interfaces"] = task_interfaces
882 task["extra"]["created"] = True
883 task["extra"]["created_items"] = created_items
884 task["error_msg"] = None
885 task["status"] = "DONE"
886 task["vim_id"] = vim_vm_id
887 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
888 return True, instance_element_update
889
890 except (vimconn.vimconnException, VimThreadException) as e:
891 self.logger.error("task={} new-VM: {}".format(task_id, e))
892 error_text = self._format_vim_error_msg(str(e))
893 task["error_msg"] = error_text
894 task["status"] = "FAILED"
895 task["vim_id"] = None
896 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
897 return False, instance_element_update
898
899 def del_vm(self, task):
900 task_id = task["instance_action_id"] + "." + str(task["task_index"])
901 vm_vim_id = task["vim_id"]
902 interfaces = task["extra"].get("interfaces", ())
903 try:
904 for iface in interfaces.values():
905 if iface.get("sdn_port_id"):
906 try:
907 with self.db_lock:
908 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
909 except ovimException as e:
910 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
911 task_id, iface["sdn_port_id"], e), exc_info=True)
912 # TODO Set error_msg at instance_nets
913
914 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
915 task["status"] = "DONE"
916 task["error_msg"] = None
917 return True, None
918
919 except vimconn.vimconnException as e:
920 task["error_msg"] = self._format_vim_error_msg(str(e))
921 if isinstance(e, vimconn.vimconnNotFoundException):
922 # If not found mark as Done and fill error_msg
923 task["status"] = "DONE"
924 return True, None
925 task["status"] = "FAILED"
926 return False, None
927
928 def _get_net_internal(self, task, filter_param):
929 """
930 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
931 :param task: task for this find or find-or-create action
932 :param filter_param: parameters to send to the vimconnector
933 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
934 when network is not found or found more than one
935 """
936 vim_nets = self.vim.get_network_list(filter_param)
937 if not vim_nets:
938 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
939 elif len(vim_nets) > 1:
940 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
941 vim_net_id = vim_nets[0]["id"]
942
943 # Discover if this network is managed by a sdn controller
944 sdn_net_id = None
945 with self.db_lock:
946 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
947 WHERE={'vim_net_id': vim_net_id,
948 'datacenter_tenant_id': self.datacenter_tenant_id},
949 ORDER="instance_scenario_id")
950 if result:
951 sdn_net_id = result[0]['sdn_net_id']
952
953 task["status"] = "DONE"
954 task["extra"]["vim_info"] = {}
955 task["extra"]["created"] = False
956 task["extra"]["sdn_net_id"] = sdn_net_id
957 task["error_msg"] = None
958 task["vim_id"] = vim_net_id
959 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
960 "error_msg": None, "sdn_net_id": sdn_net_id}
961 return instance_element_update
962
963 def get_net(self, task):
964 task_id = task["instance_action_id"] + "." + str(task["task_index"])
965 try:
966 params = task["params"]
967 filter_param = params[0]
968 instance_element_update = self._get_net_internal(task, filter_param)
969 return True, instance_element_update
970
971 except (vimconn.vimconnException, VimThreadException) as e:
972 self.logger.error("task={} get-net: {}".format(task_id, e))
973 task["status"] = "FAILED"
974 task["vim_id"] = None
975 task["error_msg"] = self._format_vim_error_msg(str(e))
976 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
977 "error_msg": task["error_msg"]}
978 return False, instance_element_update
979
980 def new_net(self, task):
981 vim_net_id = None
982 sdn_net_id = None
983 task_id = task["instance_action_id"] + "." + str(task["task_index"])
984 action_text = ""
985 try:
986 # FIND
987 if task["extra"].get("find"):
988 action_text = "finding"
989 filter_param = task["extra"]["find"][0]
990 try:
991 instance_element_update = self._get_net_internal(task, filter_param)
992 return True, instance_element_update
993 except VimThreadExceptionNotFound:
994 pass
995 # CREATE
996 params = task["params"]
997 action_text = "creating VIM"
998 vim_net_id = self.vim.new_network(*params)
999
1000 net_name = params[0]
1001 net_type = params[1]
1002
1003 sdn_controller = self.vim.config.get('sdn-controller')
1004 if sdn_controller and (net_type == "data" or net_type == "ptp"):
1005 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
1006
1007 vim_net = self.vim.get_network(vim_net_id)
1008 if vim_net.get('encapsulation') != 'vlan':
1009 raise vimconn.vimconnException(
1010 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
1011 net_name, net_type, vim_net['encapsulation']))
1012 network["vlan"] = vim_net.get('segmentation_id')
1013 action_text = "creating SDN"
1014 with self.db_lock:
1015 sdn_net_id = self.ovim.new_network(network)
1016 task["status"] = "DONE"
1017 task["extra"]["vim_info"] = {}
1018 task["extra"]["sdn_net_id"] = sdn_net_id
1019 task["extra"]["created"] = True
1020 task["error_msg"] = None
1021 task["vim_id"] = vim_net_id
1022 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1023 "created": True, "error_msg": None}
1024 return True, instance_element_update
1025 except (vimconn.vimconnException, ovimException) as e:
1026 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1027 task["status"] = "FAILED"
1028 task["vim_id"] = vim_net_id
1029 task["error_msg"] = self._format_vim_error_msg(str(e))
1030 task["extra"]["sdn_net_id"] = sdn_net_id
1031 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1032 "error_msg": task["error_msg"]}
1033 return False, instance_element_update
1034
1035 def del_net(self, task):
1036 net_vim_id = task["vim_id"]
1037 sdn_net_id = task["extra"].get("sdn_net_id")
1038 try:
1039 if sdn_net_id:
1040 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1041 # it was manually done using 'openmano vim-net-sdn-attach'
1042 with self.db_lock:
1043 port_list = self.ovim.get_ports(columns={'uuid'},
1044 filter={'name': 'external_port', 'net_id': sdn_net_id})
1045 for port in port_list:
1046 self.ovim.delete_port(port['uuid'], idempotent=True)
1047 self.ovim.delete_network(sdn_net_id, idempotent=True)
1048 if net_vim_id:
1049 self.vim.delete_network(net_vim_id)
1050 task["status"] = "DONE"
1051 task["error_msg"] = None
1052 return True, None
1053 except ovimException as e:
1054 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1055 "ports for net {}: {}".format(sdn_net_id, str(e)))
1056 except vimconn.vimconnException as e:
1057 task["error_msg"] = self._format_vim_error_msg(str(e))
1058 if isinstance(e, vimconn.vimconnNotFoundException):
1059 # If not found mark as Done and fill error_msg
1060 task["status"] = "DONE"
1061 return True, None
1062 task["status"] = "FAILED"
1063 return False, None
1064
1065 ## Service Function Instances
1066
1067 def new_sfi(self, task):
1068 vim_sfi_id = None
1069 try:
1070 # Waits for interfaces to be ready (avoids failure)
1071 time.sleep(1)
1072 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1073 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1074 error_text = ""
1075 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1076 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1077 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1078 ingress_vim_interface_id = None
1079 egress_vim_interface_id = None
1080 for vim_interface, interface_data in interfaces.iteritems():
1081 if interface_data.get("interface_id") == ingress_interface_id:
1082 ingress_vim_interface_id = vim_interface
1083 break
1084 if ingress_interface_id != egress_interface_id:
1085 for vim_interface, interface_data in interfaces.iteritems():
1086 if interface_data.get("interface_id") == egress_interface_id:
1087 egress_vim_interface_id = vim_interface
1088 break
1089 else:
1090 egress_vim_interface_id = ingress_vim_interface_id
1091 if not ingress_vim_interface_id or not egress_vim_interface_id:
1092 self.logger.error("Error creating Service Function Instance, Ingress: %s, Egress: %s",
1093 ingress_vim_interface_id, egress_vim_interface_id)
1094 return False, None
1095 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1096 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1097 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1098 ingress_port_id_list = [ingress_vim_interface_id]
1099 egress_port_id_list = [egress_vim_interface_id]
1100 name = "sfi-%s" % task["item_id"][:8]
1101 # By default no form of IETF SFC Encapsulation will be used
1102 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1103
1104 task["extra"]["created"] = True
1105 task["error_msg"] = None
1106 task["status"] = "DONE"
1107 task["vim_id"] = vim_sfi_id
1108 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1109 return True, instance_element_update
1110
1111 except (vimconn.vimconnException, VimThreadException) as e:
1112 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1113 error_text = self._format_vim_error_msg(str(e))
1114 task["error_msg"] = error_text
1115 task["status"] = "FAILED"
1116 task["vim_id"] = None
1117 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1118 return False, instance_element_update
1119
1120 def del_sfi(self, task):
1121 sfi_vim_id = task["vim_id"]
1122 try:
1123 self.vim.delete_sfi(sfi_vim_id)
1124 task["status"] = "DONE"
1125 task["error_msg"] = None
1126 return True, None
1127
1128 except vimconn.vimconnException as e:
1129 task["error_msg"] = self._format_vim_error_msg(str(e))
1130 if isinstance(e, vimconn.vimconnNotFoundException):
1131 # If not found mark as Done and fill error_msg
1132 task["status"] = "DONE"
1133 return True, None
1134 task["status"] = "FAILED"
1135 return False, None
1136
1137 def new_sf(self, task):
1138 vim_sf_id = None
1139 try:
1140 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1141 error_text = ""
1142 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1143 # sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1144 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1145 sfi_id_list = []
1146 for sfi in sfis:
1147 sfi_id_list.append(sfi.get("vim_id"))
1148 name = "sf-%s" % task["item_id"][:8]
1149 # By default no form of IETF SFC Encapsulation will be used
1150 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1151
1152 task["extra"]["created"] = True
1153 task["error_msg"] = None
1154 task["status"] = "DONE"
1155 task["vim_id"] = vim_sf_id
1156 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1157 return True, instance_element_update
1158
1159 except (vimconn.vimconnException, VimThreadException) as e:
1160 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1161 error_text = self._format_vim_error_msg(str(e))
1162 task["error_msg"] = error_text
1163 task["status"] = "FAILED"
1164 task["vim_id"] = None
1165 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1166 return False, instance_element_update
1167
1168 def del_sf(self, task):
1169 sf_vim_id = task["vim_id"]
1170 try:
1171 self.vim.delete_sf(sf_vim_id)
1172 task["status"] = "DONE"
1173 task["error_msg"] = None
1174 return True, None
1175
1176 except vimconn.vimconnException as e:
1177 task["error_msg"] = self._format_vim_error_msg(str(e))
1178 if isinstance(e, vimconn.vimconnNotFoundException):
1179 # If not found mark as Done and fill error_msg
1180 task["status"] = "DONE"
1181 return True, None
1182 task["status"] = "FAILED"
1183 return False, None
1184
1185 def new_classification(self, task):
1186 vim_classification_id = None
1187 try:
1188 params = task["params"]
1189 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1190 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1191 error_text = ""
1192 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1193 # Bear in mind that different VIM connectors might support Classifications differently.
1194 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1195 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1196 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1197 # using the IPv4 flow classifier.
1198 name = "c-%s" % task["item_id"][:8]
1199 # if not CIDR is given for the IP addresses, add /32:
1200 ip_proto = int(params.get("ip_proto"))
1201 source_ip = params.get("source_ip")
1202 destination_ip = params.get("destination_ip")
1203 if ip_proto == 1:
1204 ip_proto = 'icmp'
1205 elif ip_proto == 6:
1206 ip_proto = 'tcp'
1207 elif ip_proto == 17:
1208 ip_proto = 'udp'
1209 if '/' not in source_ip:
1210 source_ip += '/32'
1211 if '/' not in destination_ip:
1212 destination_ip += '/32'
1213 definition = {
1214 "logical_source_port": interfaces[0],
1215 "protocol": ip_proto,
1216 "source_ip_prefix": source_ip,
1217 "destination_ip_prefix": destination_ip,
1218 "source_port_range_min": params.get("source_port"),
1219 "source_port_range_max": params.get("source_port"),
1220 "destination_port_range_min": params.get("destination_port"),
1221 "destination_port_range_max": params.get("destination_port"),
1222 }
1223
1224 vim_classification_id = self.vim.new_classification(
1225 name, 'legacy_flow_classifier', definition)
1226
1227 task["extra"]["created"] = True
1228 task["error_msg"] = None
1229 task["status"] = "DONE"
1230 task["vim_id"] = vim_classification_id
1231 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1232 return True, instance_element_update
1233
1234 except (vimconn.vimconnException, VimThreadException) as e:
1235 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1236 error_text = self._format_vim_error_msg(str(e))
1237 task["error_msg"] = error_text
1238 task["status"] = "FAILED"
1239 task["vim_id"] = None
1240 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1241 return False, instance_element_update
1242
1243 def del_classification(self, task):
1244 classification_vim_id = task["vim_id"]
1245 try:
1246 self.vim.delete_classification(classification_vim_id)
1247 task["status"] = "DONE"
1248 task["error_msg"] = None
1249 return True, None
1250
1251 except vimconn.vimconnException as e:
1252 task["error_msg"] = self._format_vim_error_msg(str(e))
1253 if isinstance(e, vimconn.vimconnNotFoundException):
1254 # If not found mark as Done and fill error_msg
1255 task["status"] = "DONE"
1256 return True, None
1257 task["status"] = "FAILED"
1258 return False, None
1259
1260 def new_sfp(self, task):
1261 vim_sfp_id = None
1262 try:
1263 params = task["params"]
1264 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1265 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in task.get("extra").get("depends_on")]
1266 error_text = ""
1267 sf_id_list = []
1268 classification_id_list = []
1269 for dep in depending_tasks:
1270 vim_id = dep.get("vim_id")
1271 resource = dep.get("item")
1272 if resource == "instance_sfs":
1273 sf_id_list.append(vim_id)
1274 elif resource == "instance_classifications":
1275 classification_id_list.append(vim_id)
1276
1277 name = "sfp-%s" % task["item_id"][:8]
1278 # By default no form of IETF SFC Encapsulation will be used
1279 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1280
1281 task["extra"]["created"] = True
1282 task["error_msg"] = None
1283 task["status"] = "DONE"
1284 task["vim_id"] = vim_sfp_id
1285 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1286 return True, instance_element_update
1287
1288 except (vimconn.vimconnException, VimThreadException) as e:
1289 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1290 error_text = self._format_vim_error_msg(str(e))
1291 task["error_msg"] = error_text
1292 task["status"] = "FAILED"
1293 task["vim_id"] = None
1294 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1295 return False, instance_element_update
1296 return
1297
1298 def del_sfp(self, task):
1299 sfp_vim_id = task["vim_id"]
1300 try:
1301 self.vim.delete_sfp(sfp_vim_id)
1302 task["status"] = "DONE"
1303 task["error_msg"] = None
1304 return True, None
1305
1306 except vimconn.vimconnException as e:
1307 task["error_msg"] = self._format_vim_error_msg(str(e))
1308 if isinstance(e, vimconn.vimconnNotFoundException):
1309 # If not found mark as Done and fill error_msg
1310 task["status"] = "DONE"
1311 return True, None
1312 task["status"] = "FAILED"
1313 return False, None