80024365a3ebc18b37133def435d484cf8f10879
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 The task content is (M: stored at memory, D: stored at database):
28 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
29 MD task_index: index number of the task. This together with the previous forms a unique key identifier
30 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
31 MD vim_id: id of the vm,net,etc at VIM
32 MD action: CREATE, DELETE, FIND
33 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
34 MD item_id: uuid of the referenced entry in the previous table
35 MD status: SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
36 MD extra: text with yaml format at database, dict at memory with:
37 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks
38 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params
39 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation
40 can contain an int (single index on the same instance-action) or str (compete action ID)
41 sdn_net_id: used for net.
42 tries:
43 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
44 iface_id: uuid of intance_interfaces
45 sdn_port_id:
46 sdn_net_id:
47 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
48 created: False if the VIM element is not created by other actions, and it should not be deleted
49 vim_status: VIM status of the element. Stored also at database in the instance_XXX
50 M depends: dict with task_index(from depends_on) to task class
51 M params: same as extra[params] but with the resolved dependencies
52 M vim_interfaces: similar to extra[interfaces] but with VIM information. Stored at database in the instance_XXX but not at vim_wim_actions
53 M vim_info: Detailed information of a vm,net from the VIM. Stored at database in the instance_XXX but not at vim_wim_actions
54 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
55 MD created_at: task creation time
56 MD modified_at: last task update time. On refresh it contains when this task need to be refreshed
57
58 """
59
60 import threading
61 import time
62 import Queue
63 import logging
64 import vimconn
65 import vimconn_openvim
66 import vimconn_aws
67 import vimconn_opennebula
68 import vimconn_openstack
69 import vimconn_vmware
70 import yaml
71 from db_base import db_base_Exception
72 from lib_osm_openvim.ovim import ovimException
73 from copy import deepcopy
74
75 __author__ = "Alfonso Tierno, Pablo Montes"
76 __date__ = "$28-Sep-2017 12:07:15$"
77
78 vim_module = {
79 "openvim": vimconn_openvim,
80 "aws": vimconn_aws,
81 "opennebula": vimconn_opennebula,
82 "openstack": vimconn_openstack,
83 "vmware": vimconn_vmware,
84 }
85
86
87 def is_task_id(task_id):
88 return task_id.startswith("TASK-")
89
90
91 class VimThreadException(Exception):
92 pass
93
94
95 class VimThreadExceptionNotFound(VimThreadException):
96 pass
97
98
99 class vim_thread(threading.Thread):
100 REFRESH_BUILD = 5 # 5 seconds
101 REFRESH_ACTIVE = 60 # 1 minute
102
103 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
104 db=None, db_lock=None, ovim=None):
105 """Init a thread.
106 Arguments:
107 'id' number of thead
108 'name' name of thread
109 'host','user': host ip or name to manage and user
110 'db', 'db_lock': database class and lock to use it in exclusion
111 """
112 threading.Thread.__init__(self)
113 self.vim = None
114 self.error_status = None
115 self.datacenter_name = datacenter_name
116 self.datacenter_tenant_id = datacenter_tenant_id
117 self.ovim = ovim
118 if not name:
119 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
120 else:
121 self.name = name
122 self.vim_persistent_info = {}
123
124 self.logger = logging.getLogger('openmano.vim.' + self.name)
125 self.db = db
126 self.db_lock = db_lock
127
128 self.task_lock = task_lock
129 self.task_queue = Queue.Queue(2000)
130
131 self.refresh_tasks = []
132 """Contains time ordered task list for refreshing the status of VIM VMs and nets"""
133
134 self.pending_tasks = []
135 """Contains time ordered task list for creation, deletion of VIM VMs and nets"""
136
137 self.grouped_tasks = {}
138 """ It contains all the creation/deletion pending tasks grouped by its concrete vm, net, etc
139 <item><item_id>:
140 - <task1> # e.g. CREATE task
141 <task2> # e.g. DELETE task
142 """
143
144 def get_vimconnector(self):
145 try:
146 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
147 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
148 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
149 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
150 'user', 'passwd', 'dt.config as dt_config')
151 where_ = {"dt.uuid": self.datacenter_tenant_id}
152 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
153 vim = vims[0]
154 vim_config = {}
155 if vim["config"]:
156 vim_config.update(yaml.load(vim["config"]))
157 if vim["dt_config"]:
158 vim_config.update(yaml.load(vim["dt_config"]))
159 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
160 vim_config['datacenter_id'] = vim.get('datacenter_id')
161
162 # get port_mapping
163 with self.db_lock:
164 vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
165 db_filter={"region": vim_config['datacenter_id'], "pci": None})
166
167 self.vim = vim_module[vim["type"]].vimconnector(
168 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
169 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
170 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
171 user=vim['user'], passwd=vim['passwd'],
172 config=vim_config, persistent_info=self.vim_persistent_info
173 )
174 self.error_status = None
175 except Exception as e:
176 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
177 self.vim = None
178 self.error_status = "Error loading vimconnector: {}".format(e)
179
180 def _reload_vim_actions(self):
181 """
182 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
183 :return: None
184 """
185 try:
186 action_completed = False
187 task_list = []
188 old_action_key = None
189
190 old_item_id = ""
191 old_item = ""
192 old_created_at = 0.0
193 database_limit = 200
194 while True:
195 # get 200 (database_limit) entries each time
196 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
197 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
198 "item_id>=": old_item_id},
199 ORDER_BY=("item_id", "item", "created_at",),
200 LIMIT=database_limit)
201 for task in vim_actions:
202 item = task["item"]
203 item_id = task["item_id"]
204
205 # skip the first entries that are already processed in the previous pool of 200
206 if old_item_id:
207 if item_id == old_item_id and item == old_item and task["created_at"] == old_created_at:
208 old_item_id = False # next one will be a new un-processed task
209 continue
210
211 action_key = item + item_id
212 if old_action_key != action_key:
213 if not action_completed and task_list:
214 # This will fill needed task parameters into memory, and insert the task if needed in
215 # self.pending_tasks or self.refresh_tasks
216 try:
217 self._insert_pending_tasks(task_list)
218 except Exception as e:
219 self.logger.critical(
220 "Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
221 exc_info=True)
222 task_list = []
223 old_action_key = action_key
224 action_completed = False
225 elif action_completed:
226 continue
227
228 if task["status"] == "SCHEDULED" or task["action"] == "CREATE" or task["action"] == "FIND":
229 task_list.append(task)
230 elif task["action"] == "DELETE":
231 # action completed because deleted and status is not SCHEDULED. Not needed anything
232 action_completed = True
233 if len(vim_actions) == database_limit:
234 # update variables for get the next database iteration
235 old_item_id = item_id
236 old_item = item
237 old_created_at = task["created_at"]
238 else:
239 break
240 # Last actions group need to be inserted too
241 if not action_completed and task_list:
242 try:
243 self._insert_pending_tasks(task_list)
244 except Exception as e:
245 self.logger.critical("Unexpected exception at _reload_vim_actions:_insert_pending_tasks: " + str(e),
246 exc_info=True)
247 self.logger.debug("reloaded vim actions pending:{} refresh:{}".format(
248 len(self.pending_tasks), len(self.refresh_tasks)))
249 except Exception as e:
250 self.logger.critical("Unexpected exception at _reload_vim_actions: " + str(e), exc_info=True)
251
252 def _refres_elements(self):
253 """Call VIM to get VMs and networks status until 10 elements"""
254 now = time.time()
255 nb_processed = 0
256 vm_to_refresh_list = []
257 net_to_refresh_list = []
258 vm_to_refresh_dict = {}
259 net_to_refresh_dict = {}
260 items_to_refresh = 0
261 while self.refresh_tasks:
262 task = self.refresh_tasks[0]
263 with self.task_lock:
264 if task['status'] == 'SUPERSEDED':
265 self.refresh_tasks.pop(0)
266 continue
267 if task['modified_at'] > now:
268 break
269 # task["status"] = "processing"
270 nb_processed += 1
271 self.refresh_tasks.pop(0)
272 if task["item"] == 'instance_vms':
273 if task["vim_id"] not in vm_to_refresh_dict:
274 vm_to_refresh_dict[task["vim_id"]] = [task]
275 vm_to_refresh_list.append(task["vim_id"])
276 else:
277 vm_to_refresh_dict[task["vim_id"]].append(task)
278 elif task["item"] == 'instance_nets':
279 if task["vim_id"] not in net_to_refresh_dict:
280 net_to_refresh_dict[task["vim_id"]] = [task]
281 net_to_refresh_list.append(task["vim_id"])
282 else:
283 net_to_refresh_dict[task["vim_id"]].append(task)
284 else:
285 task_id = task["instance_action_id"] + "." + str(task["task_index"])
286 self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
287 items_to_refresh += 1
288 if items_to_refresh == 10:
289 break
290
291 if vm_to_refresh_list:
292 now = time.time()
293 try:
294 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
295 except vimconn.vimconnException as e:
296 # Mark all tasks at VIM_ERROR status
297 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
298 vim_dict = {}
299 for vim_id in vm_to_refresh_list:
300 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
301
302 for vim_id, vim_info in vim_dict.items():
303
304 # look for task
305 for task in vm_to_refresh_dict[vim_id]:
306 task_need_update = False
307 task_id = task["instance_action_id"] + "." + str(task["task_index"])
308 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
309
310 # check and update interfaces
311 task_warning_msg = ""
312 for interface in vim_info.get("interfaces", ()):
313 vim_interface_id = interface["vim_interface_id"]
314 if vim_interface_id not in task["extra"]["interfaces"]:
315 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
316 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
317 continue
318 task_interface = task["extra"]["interfaces"][vim_interface_id]
319 task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
320 if task_vim_interface != interface:
321 # delete old port
322 if task_interface.get("sdn_port_id"):
323 try:
324 with self.db_lock:
325 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
326 task_interface["sdn_port_id"] = None
327 task_need_update = True
328 except ovimException as e:
329 error_text = "ovimException deleting external_port={}: {}".format(
330 task_interface["sdn_port_id"], e)
331 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
332 task_warning_msg += error_text
333 # TODO Set error_msg at instance_nets instead of instance VMs
334
335 # Create SDN port
336 sdn_net_id = task_interface.get("sdn_net_id")
337 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
338 sdn_port_name = sdn_net_id + "." + task["vim_id"]
339 sdn_port_name = sdn_port_name[:63]
340 try:
341 with self.db_lock:
342 sdn_port_id = self.ovim.new_external_port(
343 {"compute_node": interface["compute_node"],
344 "pci": interface["pci"],
345 "vlan": interface.get("vlan"),
346 "net_id": sdn_net_id,
347 "region": self.vim["config"]["datacenter_id"],
348 "name": sdn_port_name,
349 "mac": interface.get("mac_address")})
350 task_interface["sdn_port_id"] = sdn_port_id
351 task_need_update = True
352 except (ovimException, Exception) as e:
353 error_text = "ovimException creating new_external_port compute_node={}" \
354 " pci={} vlan={} {}".format(
355 interface["compute_node"],
356 interface["pci"],
357 interface.get("vlan"), e)
358 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
359 task_warning_msg += error_text
360 # TODO Set error_msg at instance_nets instead of instance VMs
361
362 self.db.update_rows(
363 'instance_interfaces',
364 UPDATE={"mac_address": interface.get("mac_address"),
365 "ip_address": interface.get("ip_address"),
366 "vim_interface_id": interface.get("vim_interface_id"),
367 "vim_info": interface.get("vim_info"),
368 "sdn_port_id": task_interface.get("sdn_port_id"),
369 "compute_node": interface.get("compute_node"),
370 "pci": interface.get("pci"),
371 "vlan": interface.get("vlan")},
372 WHERE={'uuid': task_interface["iface_id"]})
373 task["vim_interfaces"][vim_interface_id] = interface
374
375 # check and update task and instance_vms database
376 vim_info_error_msg = None
377 if vim_info.get("error_msg"):
378 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
379 elif task_warning_msg:
380 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
381 task_vim_info = task.get("vim_info")
382 task_error_msg = task.get("error_msg")
383 task_vim_status = task["extra"].get("vim_status")
384 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
385 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
386 temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
387 if vim_info.get("vim_info"):
388 temp_dict["vim_info"] = vim_info["vim_info"]
389 self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
390 task["extra"]["vim_status"] = vim_info["status"]
391 task["error_msg"] = vim_info_error_msg
392 if vim_info.get("vim_info"):
393 task["vim_info"] = vim_info["vim_info"]
394 task_need_update = True
395
396 if task_need_update:
397 self.db.update_rows(
398 'vim_wim_actions',
399 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
400 "error_msg": task.get("error_msg"), "modified_at": now},
401 WHERE={'instance_action_id': task['instance_action_id'],
402 'task_index': task['task_index']})
403 if task["extra"].get("vim_status") == "BUILD":
404 self._insert_refresh(task, now + self.REFRESH_BUILD)
405 else:
406 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
407
408 if net_to_refresh_list:
409 now = time.time()
410 try:
411 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
412 except vimconn.vimconnException as e:
413 # Mark all tasks at VIM_ERROR status
414 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
415 vim_dict = {}
416 for vim_id in net_to_refresh_list:
417 vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
418
419 for vim_id, vim_info in vim_dict.items():
420 # look for task
421 for task in net_to_refresh_dict[vim_id]:
422 task_id = task["instance_action_id"] + "." + str(task["task_index"])
423 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
424
425 task_vim_info = task.get("vim_info")
426 task_vim_status = task["extra"].get("vim_status")
427 task_error_msg = task.get("error_msg")
428 task_sdn_net_id = task["extra"].get("sdn_net_id")
429
430 vim_info_status = vim_info["status"]
431 vim_info_error_msg = vim_info.get("error_msg")
432 # get ovim status
433 if task_sdn_net_id:
434 try:
435 with self.db_lock:
436 sdn_net = self.ovim.show_network(task_sdn_net_id)
437 except (ovimException, Exception) as e:
438 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
439 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
440 sdn_net = {"status": "ERROR", "last_error": text_error}
441 if sdn_net["status"] == "ERROR":
442 if not vim_info_error_msg:
443 vim_info_error_msg = str(sdn_net.get("last_error"))
444 else:
445 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
446 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
447 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
448 vim_info_status = "ERROR"
449 elif sdn_net["status"] == "BUILD":
450 if vim_info_status == "ACTIVE":
451 vim_info_status = "BUILD"
452
453 # update database
454 if vim_info_error_msg:
455 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
456 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
457 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
458 task["extra"]["vim_status"] = vim_info_status
459 task["error_msg"] = vim_info_error_msg
460 if vim_info.get("vim_info"):
461 task["vim_info"] = vim_info["vim_info"]
462 temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
463 if vim_info.get("vim_info"):
464 temp_dict["vim_info"] = vim_info["vim_info"]
465 self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
466 self.db.update_rows(
467 'vim_wim_actions',
468 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
469 "error_msg": task.get("error_msg"), "modified_at": now},
470 WHERE={'instance_action_id': task['instance_action_id'],
471 'task_index': task['task_index']})
472 if task["extra"].get("vim_status") == "BUILD":
473 self._insert_refresh(task, now + self.REFRESH_BUILD)
474 else:
475 self._insert_refresh(task, now + self.REFRESH_ACTIVE)
476
477 return nb_processed
478
479 def _insert_refresh(self, task, threshold_time=None):
480 """Insert a task at list of refreshing elements. The refreshing list is ordered by threshold_time (task['modified_at']
481 It is assumed that this is called inside this thread
482 """
483 if not self.vim:
484 return
485 if not threshold_time:
486 threshold_time = time.time()
487 task["modified_at"] = threshold_time
488 task_name = task["item"][9:] + "-" + task["action"]
489 task_id = task["instance_action_id"] + "." + str(task["task_index"])
490 for index in range(0, len(self.refresh_tasks)):
491 if self.refresh_tasks[index]["modified_at"] > threshold_time:
492 self.refresh_tasks.insert(index, task)
493 break
494 else:
495 index = len(self.refresh_tasks)
496 self.refresh_tasks.append(task)
497 self.logger.debug("task={} new refresh name={}, modified_at={} index={}".format(
498 task_id, task_name, task["modified_at"], index))
499
500 def _remove_refresh(self, task_name, vim_id):
501 """Remove a task with this name and vim_id from the list of refreshing elements.
502 It is assumed that this is called inside this thread outside _refres_elements method
503 Return True if self.refresh_list is modified, task is found
504 Return False if not found
505 """
506 index_to_delete = None
507 for index in range(0, len(self.refresh_tasks)):
508 if self.refresh_tasks[index]["name"] == task_name and self.refresh_tasks[index]["vim_id"] == vim_id:
509 index_to_delete = index
510 break
511 else:
512 return False
513 if not index_to_delete:
514 del self.refresh_tasks[index_to_delete]
515 return True
516
517 def _proccess_pending_tasks(self):
518 nb_created = 0
519 nb_processed = 0
520 while self.pending_tasks:
521 task = self.pending_tasks.pop(0)
522 nb_processed += 1
523 try:
524 # check if tasks that this depends on have been completed
525 dependency_not_completed = False
526 for task_index in task["extra"].get("depends_on", ()):
527 task_dependency = task["depends"].get("TASK-" + str(task_index))
528 if not task_dependency:
529 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
530 if not task_dependency:
531 raise VimThreadException(
532 "Cannot get depending net task trying to get depending task {}.{}".format(
533 task["instance_action_id"], task_index))
534 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so database must be look again
535 if task_dependency["status"] == "SCHEDULED":
536 dependency_not_completed = True
537 break
538 elif task_dependency["status"] == "FAILED":
539 raise VimThreadException(
540 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
541 task["action"], task["item"],
542 task["instance_action_id"], task["task_index"],
543 task_dependency["instance_action_id"], task_dependency["task_index"],
544 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
545 if dependency_not_completed:
546 # Move this task to the end.
547 task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
548 if task["extra"]["tries"] <= 3:
549 self.pending_tasks.append(task)
550 continue
551 else:
552 raise VimThreadException(
553 "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
554 "(task {}.{})".format(task["action"], task["item"],
555 task["instance_action_id"], task["task_index"],
556 task_dependency["instance_action_id"], task_dependency["task_index"],
557 task_dependency["action"], task_dependency["item"]))
558
559 if task["status"] == "SUPERSEDED":
560 # not needed to do anything but update database with the new status
561 result = True
562 database_update = None
563 elif not self.vim:
564 task["status"] = "ERROR"
565 task["error_msg"] = self.error_status
566 result = False
567 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
568 elif task["item"] == 'instance_vms':
569 if task["action"] == "CREATE":
570 result, database_update = self.new_vm(task)
571 nb_created += 1
572 elif task["action"] == "DELETE":
573 result, database_update = self.del_vm(task)
574 else:
575 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
576 elif task["item"] == 'instance_nets':
577 if task["action"] == "CREATE":
578 result, database_update = self.new_net(task)
579 nb_created += 1
580 elif task["action"] == "DELETE":
581 result, database_update = self.del_net(task)
582 elif task["action"] == "FIND":
583 result, database_update = self.get_net(task)
584 else:
585 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
586 elif task["item"] == 'instance_sfis':
587 if task["action"] == "CREATE":
588 result, database_update = self.new_sfi(task)
589 nb_created += 1
590 elif task["action"] == "DELETE":
591 result, database_update = self.del_sfi(task)
592 else:
593 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
594 elif task["item"] == 'instance_sfs':
595 if task["action"] == "CREATE":
596 result, database_update = self.new_sf(task)
597 nb_created += 1
598 elif task["action"] == "DELETE":
599 result, database_update = self.del_sf(task)
600 else:
601 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
602 elif task["item"] == 'instance_classifications':
603 if task["action"] == "CREATE":
604 result, database_update = self.new_classification(task)
605 nb_created += 1
606 elif task["action"] == "DELETE":
607 result, database_update = self.del_classification(task)
608 else:
609 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
610 elif task["item"] == 'instance_sfps':
611 if task["action"] == "CREATE":
612 result, database_update = self.new_sfp(task)
613 nb_created += 1
614 elif task["action"] == "DELETE":
615 result, database_update = self.del_sfp(task)
616 else:
617 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
618 else:
619 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
620 # TODO
621 except VimThreadException as e:
622 result = False
623 task["error_msg"] = str(e)
624 task["status"] = "FAILED"
625 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
626 if task["item"] == 'instance_vms':
627 database_update["vim_vm_id"] = None
628 elif task["item"] == 'instance_nets':
629 database_update["vim_net_id"] = None
630
631 no_refresh_tasks = ['instance_sfis', 'instance_sfs',
632 'instance_classifications', 'instance_sfps']
633 if task["action"] == "DELETE":
634 action_key = task["item"] + task["item_id"]
635 del self.grouped_tasks[action_key]
636 elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
637 if task["item"] not in no_refresh_tasks:
638 self._insert_refresh(task)
639
640 task_id = task["instance_action_id"] + "." + str(task["task_index"])
641 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
642 task_id, task["item"], task["action"], task["status"],
643 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
644 try:
645 now = time.time()
646 self.db.update_rows(
647 table="vim_wim_actions",
648 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
649 "error_msg": task["error_msg"],
650 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
651 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
652 if result is not None:
653 self.db.update_rows(
654 table="instance_actions",
655 UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
656 "modified_at": now},
657 WHERE={"uuid": task["instance_action_id"]})
658 if database_update:
659 self.db.update_rows(table=task["item"],
660 UPDATE=database_update,
661 WHERE={"uuid": task["item_id"]})
662 except db_base_Exception as e:
663 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
664
665 if nb_created == 10:
666 break
667 return nb_processed
668
669 def _insert_pending_tasks(self, vim_actions_list):
670 for task in vim_actions_list:
671 if task["datacenter_vim_id"] != self.datacenter_tenant_id:
672 continue
673 item = task["item"]
674 item_id = task["item_id"]
675 action_key = item + item_id
676 if action_key not in self.grouped_tasks:
677 self.grouped_tasks[action_key] = []
678 task["params"] = None
679 task["depends"] = {}
680 if task["extra"]:
681 extra = yaml.load(task["extra"])
682 task["extra"] = extra
683 task["params"] = extra.get("params")
684 depends_on_list = extra.get("depends_on")
685 if depends_on_list:
686 for dependency_task in depends_on_list:
687 if isinstance(dependency_task, int):
688 index = dependency_task
689 else:
690 instance_action_id, _, task_id = dependency_task.rpartition(".")
691 if instance_action_id != task["instance_action_id"]:
692 continue
693 index = int(task_id)
694
695 if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and \
696 vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]:
697 task["depends"]["TASK-" + str(index)] = vim_actions_list[index]
698 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index]
699 if extra.get("interfaces"):
700 task["vim_interfaces"] = {}
701 else:
702 task["extra"] = {}
703 if "error_msg" not in task:
704 task["error_msg"] = None
705 if "vim_id" not in task:
706 task["vim_id"] = None
707
708 if task["action"] == "DELETE":
709 need_delete_action = False
710 for to_supersede in self.grouped_tasks.get(action_key, ()):
711 if to_supersede["action"] == "FIND" and to_supersede.get("vim_id"):
712 task["vim_id"] = to_supersede["vim_id"]
713 if to_supersede["action"] == "CREATE" and to_supersede["extra"].get("created", True) and \
714 (to_supersede.get("vim_id") or to_supersede["extra"].get("sdn_net_id")):
715 need_delete_action = True
716 task["vim_id"] = to_supersede["vim_id"]
717 if to_supersede["extra"].get("sdn_net_id"):
718 task["extra"]["sdn_net_id"] = to_supersede["extra"]["sdn_net_id"]
719 if to_supersede["extra"].get("interfaces"):
720 task["extra"]["interfaces"] = to_supersede["extra"]["interfaces"]
721 if to_supersede["extra"].get("created_items"):
722 if not task["extra"].get("created_items"):
723 task["extra"]["created_items"] = {}
724 task["extra"]["created_items"].update(to_supersede["extra"]["created_items"])
725 # Mark task as SUPERSEDED.
726 # If task is in self.pending_tasks, it will be removed and database will be update
727 # If task is in self.refresh_tasks, it will be removed
728 to_supersede["status"] = "SUPERSEDED"
729 if not need_delete_action:
730 task["status"] = "SUPERSEDED"
731
732 self.grouped_tasks[action_key].append(task)
733 self.pending_tasks.append(task)
734 elif task["status"] == "SCHEDULED":
735 self.grouped_tasks[action_key].append(task)
736 self.pending_tasks.append(task)
737 elif task["action"] in ("CREATE", "FIND"):
738 self.grouped_tasks[action_key].append(task)
739 if task["status"] in ("DONE", "BUILD"):
740 self._insert_refresh(task)
741 # TODO add VM reset, get console, etc...
742 else:
743 raise vimconn.vimconnException(self.name + "unknown vim_action action {}".format(task["action"]))
744
745 def insert_task(self, task):
746 try:
747 self.task_queue.put(task, False)
748 return None
749 except Queue.Full:
750 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
751
752 def del_task(self, task):
753 with self.task_lock:
754 if task["status"] == "SCHEDULED":
755 task["status"] = "SUPERSEDED"
756 return True
757 else: # task["status"] == "processing"
758 self.task_lock.release()
759 return False
760
761 def run(self):
762 self.logger.debug("Starting")
763 while True:
764 self.get_vimconnector()
765 self.logger.debug("Vimconnector loaded")
766 self._reload_vim_actions()
767 reload_thread = False
768
769 while True:
770 try:
771 while not self.task_queue.empty():
772 task = self.task_queue.get()
773 if isinstance(task, list):
774 self._insert_pending_tasks(task)
775 elif isinstance(task, str):
776 if task == 'exit':
777 return 0
778 elif task == 'reload':
779 reload_thread = True
780 break
781 self.task_queue.task_done()
782 if reload_thread:
783 break
784 nb_processed = self._proccess_pending_tasks()
785 nb_processed += self._refres_elements()
786 if not nb_processed:
787 time.sleep(1)
788
789 except Exception as e:
790 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
791
792 self.logger.debug("Finishing")
793
794 def _look_for_task(self, instance_action_id, task_id):
795 """
796 Look for a concrete task at vim_actions database table
797 :param instance_action_id: The instance_action_id
798 :param task_id: Can have several formats:
799 <task index>: integer
800 TASK-<task index> :backward compatibility,
801 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
802 :return: Task dictionary or None if not found
803 """
804 if isinstance(task_id, int):
805 task_index = task_id
806 else:
807 if task_id.startswith("TASK-"):
808 task_id = task_id[5:]
809 ins_action_id, _, task_index = task_id.rpartition(".")
810 if ins_action_id:
811 instance_action_id = ins_action_id
812
813 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
814 "task_index": task_index})
815 if not tasks:
816 return None
817 task = tasks[0]
818 task["params"] = None
819 task["depends"] = {}
820 if task["extra"]:
821 extra = yaml.load(task["extra"])
822 task["extra"] = extra
823 task["params"] = extra.get("params")
824 if extra.get("interfaces"):
825 task["vim_interfaces"] = {}
826 else:
827 task["extra"] = {}
828 return task
829
830 @staticmethod
831 def _format_vim_error_msg(error_text, max_length=1024):
832 if error_text and len(error_text) >= max_length:
833 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
834 return error_text
835
836 def new_vm(self, task):
837 task_id = task["instance_action_id"] + "." + str(task["task_index"])
838 try:
839 params = task["params"]
840 depends = task.get("depends")
841 net_list = params[5]
842 for net in net_list:
843 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
844 task_dependency = task["depends"].get(net["net_id"])
845 if not task_dependency:
846 task_dependency = self._look_for_task(task["instance_action_id"], net["net_id"])
847 if not task_dependency:
848 raise VimThreadException(
849 "Cannot get depending net task trying to get depending task {}.{}".format(
850 task["instance_action_id"], net["net_id"]))
851 network_id = task_dependency.get("vim_id")
852 if not network_id:
853 raise VimThreadException(
854 "Cannot create VM because depends on a network not created or found: " +
855 str(depends[net["net_id"]]["error_msg"]))
856 net["net_id"] = network_id
857 params_copy = deepcopy(params)
858 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
859
860 # fill task_interfaces. Look for snd_net_id at database for each interface
861 task_interfaces = {}
862 for iface in params_copy[5]:
863 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
864 result = self.db.get_rows(
865 SELECT=('sdn_net_id', 'interface_id'),
866 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
867 WHERE={'ii.uuid': iface["uuid"]})
868 if result:
869 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
870 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
871 else:
872 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
873 iface["uuid"]), exc_info=True)
874
875 task["vim_info"] = {}
876 task["vim_interfaces"] = {}
877 task["extra"]["interfaces"] = task_interfaces
878 task["extra"]["created"] = True
879 task["extra"]["created_items"] = created_items
880 task["error_msg"] = None
881 task["status"] = "DONE"
882 task["vim_id"] = vim_vm_id
883 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
884 return True, instance_element_update
885
886 except (vimconn.vimconnException, VimThreadException) as e:
887 self.logger.error("task={} new-VM: {}".format(task_id, e))
888 error_text = self._format_vim_error_msg(str(e))
889 task["error_msg"] = error_text
890 task["status"] = "FAILED"
891 task["vim_id"] = None
892 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
893 return False, instance_element_update
894
895 def del_vm(self, task):
896 task_id = task["instance_action_id"] + "." + str(task["task_index"])
897 vm_vim_id = task["vim_id"]
898 interfaces = task["extra"].get("interfaces", ())
899 try:
900 for iface in interfaces.values():
901 if iface.get("sdn_port_id"):
902 try:
903 with self.db_lock:
904 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
905 except ovimException as e:
906 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
907 task_id, iface["sdn_port_id"], e), exc_info=True)
908 # TODO Set error_msg at instance_nets
909
910 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
911 task["status"] = "DONE"
912 task["error_msg"] = None
913 return True, None
914
915 except vimconn.vimconnException as e:
916 task["error_msg"] = self._format_vim_error_msg(str(e))
917 if isinstance(e, vimconn.vimconnNotFoundException):
918 # If not found mark as Done and fill error_msg
919 task["status"] = "DONE"
920 return True, None
921 task["status"] = "FAILED"
922 return False, None
923
924 def _get_net_internal(self, task, filter_param):
925 """
926 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
927 :param task: task for this find or find-or-create action
928 :param filter_param: parameters to send to the vimconnector
929 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
930 when network is not found or found more than one
931 """
932 vim_nets = self.vim.get_network_list(filter_param)
933 if not vim_nets:
934 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
935 elif len(vim_nets) > 1:
936 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
937 vim_net_id = vim_nets[0]["id"]
938
939 # Discover if this network is managed by a sdn controller
940 sdn_net_id = None
941 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
942 WHERE={'vim_net_id': vim_net_id,
943 'datacenter_tenant_id': self.datacenter_tenant_id},
944 ORDER="instance_scenario_id")
945 if result:
946 sdn_net_id = result[0]['sdn_net_id']
947
948 task["status"] = "DONE"
949 task["extra"]["vim_info"] = {}
950 task["extra"]["created"] = False
951 task["extra"]["sdn_net_id"] = sdn_net_id
952 task["error_msg"] = None
953 task["vim_id"] = vim_net_id
954 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
955 "error_msg": None, "sdn_net_id": sdn_net_id}
956 return instance_element_update
957
958 def get_net(self, task):
959 task_id = task["instance_action_id"] + "." + str(task["task_index"])
960 try:
961 params = task["params"]
962 filter_param = params[0]
963 instance_element_update = self._get_net_internal(task, filter_param)
964 return True, instance_element_update
965
966 except (vimconn.vimconnException, VimThreadException) as e:
967 self.logger.error("task={} get-net: {}".format(task_id, e))
968 task["status"] = "FAILED"
969 task["vim_id"] = None
970 task["error_msg"] = self._format_vim_error_msg(str(e))
971 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
972 "error_msg": task["error_msg"]}
973 return False, instance_element_update
974
975 def new_net(self, task):
976 vim_net_id = None
977 sdn_net_id = None
978 task_id = task["instance_action_id"] + "." + str(task["task_index"])
979 action_text = ""
980 try:
981 # FIND
982 if task["extra"].get("find"):
983 action_text = "finding"
984 filter_param = task["extra"]["find"][0]
985 try:
986 instance_element_update = self._get_net_internal(task, filter_param)
987 return True, instance_element_update
988 except VimThreadExceptionNotFound:
989 pass
990 # CREATE
991 params = task["params"]
992 action_text = "creating VIM"
993 vim_net_id = self.vim.new_network(*params[0:3])
994
995 net_name = params[0]
996 net_type = params[1]
997 wim_account_name = None
998 if len(params) >= 4:
999 wim_account_name = params[3]
1000
1001 sdn_controller = self.vim.config.get('sdn-controller')
1002 if sdn_controller and (net_type == "data" or net_type == "ptp"):
1003 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
1004
1005 vim_net = self.vim.get_network(vim_net_id)
1006 if vim_net.get('encapsulation') != 'vlan':
1007 raise vimconn.vimconnException(
1008 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
1009 net_name, net_type, vim_net['encapsulation']))
1010 network["vlan"] = vim_net.get('segmentation_id')
1011 action_text = "creating SDN"
1012 with self.db_lock:
1013 sdn_net_id = self.ovim.new_network(network)
1014
1015 if wim_account_name and self.vim.config["wim_external_ports"]:
1016 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1017 action_text = "attaching external port to ovim network"
1018 sdn_port_name = sdn_net_id + "." + task["vim_id"]
1019 sdn_port_name = sdn_port_name[:63]
1020 sdn_port_data = {
1021 "compute_node": "__WIM:" + wim_account_name[0:58],
1022 "pci": None,
1023 "vlan": network["vlan"],
1024 "net_id": sdn_net_id,
1025 "region": self.vim["config"]["datacenter_id"],
1026 "name": sdn_port_name,
1027 }
1028 try:
1029 with self.db_lock:
1030 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1031 except ovimException:
1032 sdn_port_data["compute_node"] = "__WIM"
1033 with self.db_lock:
1034 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1035 self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1036 sdn_net_id))
1037
1038 task["status"] = "DONE"
1039 task["extra"]["vim_info"] = {}
1040 task["extra"]["sdn_net_id"] = sdn_net_id
1041 task["extra"]["created"] = True
1042 task["error_msg"] = None
1043 task["vim_id"] = vim_net_id
1044 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1045 "created": True, "error_msg": None}
1046 return True, instance_element_update
1047 except (vimconn.vimconnException, ovimException) as e:
1048 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1049 task["status"] = "FAILED"
1050 task["vim_id"] = vim_net_id
1051 task["error_msg"] = self._format_vim_error_msg(str(e))
1052 task["extra"]["sdn_net_id"] = sdn_net_id
1053 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1054 "error_msg": task["error_msg"]}
1055 return False, instance_element_update
1056
1057 def del_net(self, task):
1058 net_vim_id = task["vim_id"]
1059 sdn_net_id = task["extra"].get("sdn_net_id")
1060 try:
1061 if sdn_net_id:
1062 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1063 # it was manually done using 'openmano vim-net-sdn-attach'
1064 with self.db_lock:
1065 port_list = self.ovim.get_ports(columns={'uuid'},
1066 filter={'name': 'external_port', 'net_id': sdn_net_id})
1067 for port in port_list:
1068 self.ovim.delete_port(port['uuid'], idempotent=True)
1069 self.ovim.delete_network(sdn_net_id, idempotent=True)
1070 if net_vim_id:
1071 self.vim.delete_network(net_vim_id)
1072 task["status"] = "DONE"
1073 task["error_msg"] = None
1074 return True, None
1075 except ovimException as e:
1076 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1077 "ports for net {}: {}".format(sdn_net_id, str(e)))
1078 except vimconn.vimconnException as e:
1079 task["error_msg"] = self._format_vim_error_msg(str(e))
1080 if isinstance(e, vimconn.vimconnNotFoundException):
1081 # If not found mark as Done and fill error_msg
1082 task["status"] = "DONE"
1083 return True, None
1084 task["status"] = "FAILED"
1085 return False, None
1086
1087 ## Service Function Instances
1088
1089 def new_sfi(self, task):
1090 vim_sfi_id = None
1091 try:
1092 # Waits for interfaces to be ready (avoids failure)
1093 time.sleep(1)
1094 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1095 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1096 error_text = ""
1097 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1098 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1099 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1100 ingress_vim_interface_id = None
1101 egress_vim_interface_id = None
1102 for vim_interface, interface_data in interfaces.iteritems():
1103 if interface_data.get("interface_id") == ingress_interface_id:
1104 ingress_vim_interface_id = vim_interface
1105 break
1106 if ingress_interface_id != egress_interface_id:
1107 for vim_interface, interface_data in interfaces.iteritems():
1108 if interface_data.get("interface_id") == egress_interface_id:
1109 egress_vim_interface_id = vim_interface
1110 break
1111 else:
1112 egress_vim_interface_id = ingress_vim_interface_id
1113 if not ingress_vim_interface_id or not egress_vim_interface_id:
1114 self.logger.error("Error creating Service Function Instance, Ingress: %s, Egress: %s",
1115 ingress_vim_interface_id, egress_vim_interface_id)
1116 return False, None
1117 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1118 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
1119 # first ingress and first egress ports will be used to create the SFI (Port Pair).
1120 ingress_port_id_list = [ingress_vim_interface_id]
1121 egress_port_id_list = [egress_vim_interface_id]
1122 name = "sfi-%s" % task["item_id"][:8]
1123 # By default no form of IETF SFC Encapsulation will be used
1124 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1125
1126 task["extra"]["created"] = True
1127 task["error_msg"] = None
1128 task["status"] = "DONE"
1129 task["vim_id"] = vim_sfi_id
1130 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1131 return True, instance_element_update
1132
1133 except (vimconn.vimconnException, VimThreadException) as e:
1134 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1135 error_text = self._format_vim_error_msg(str(e))
1136 task["error_msg"] = error_text
1137 task["status"] = "FAILED"
1138 task["vim_id"] = None
1139 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1140 return False, instance_element_update
1141
1142 def del_sfi(self, task):
1143 sfi_vim_id = task["vim_id"]
1144 try:
1145 self.vim.delete_sfi(sfi_vim_id)
1146 task["status"] = "DONE"
1147 task["error_msg"] = None
1148 return True, None
1149
1150 except vimconn.vimconnException as e:
1151 task["error_msg"] = self._format_vim_error_msg(str(e))
1152 if isinstance(e, vimconn.vimconnNotFoundException):
1153 # If not found mark as Done and fill error_msg
1154 task["status"] = "DONE"
1155 return True, None
1156 task["status"] = "FAILED"
1157 return False, None
1158
1159 def new_sf(self, task):
1160 vim_sf_id = None
1161 try:
1162 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1163 error_text = ""
1164 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1165 # sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1166 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1167 sfi_id_list = []
1168 for sfi in sfis:
1169 sfi_id_list.append(sfi.get("vim_id"))
1170 name = "sf-%s" % task["item_id"][:8]
1171 # By default no form of IETF SFC Encapsulation will be used
1172 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1173
1174 task["extra"]["created"] = True
1175 task["error_msg"] = None
1176 task["status"] = "DONE"
1177 task["vim_id"] = vim_sf_id
1178 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1179 return True, instance_element_update
1180
1181 except (vimconn.vimconnException, VimThreadException) as e:
1182 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1183 error_text = self._format_vim_error_msg(str(e))
1184 task["error_msg"] = error_text
1185 task["status"] = "FAILED"
1186 task["vim_id"] = None
1187 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1188 return False, instance_element_update
1189
1190 def del_sf(self, task):
1191 sf_vim_id = task["vim_id"]
1192 try:
1193 self.vim.delete_sf(sf_vim_id)
1194 task["status"] = "DONE"
1195 task["error_msg"] = None
1196 return True, None
1197
1198 except vimconn.vimconnException as e:
1199 task["error_msg"] = self._format_vim_error_msg(str(e))
1200 if isinstance(e, vimconn.vimconnNotFoundException):
1201 # If not found mark as Done and fill error_msg
1202 task["status"] = "DONE"
1203 return True, None
1204 task["status"] = "FAILED"
1205 return False, None
1206
1207 def new_classification(self, task):
1208 vim_classification_id = None
1209 try:
1210 params = task["params"]
1211 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1212 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1213 error_text = ""
1214 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1215 # Bear in mind that different VIM connectors might support Classifications differently.
1216 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1217 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1218 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1219 # using the IPv4 flow classifier.
1220 name = "c-%s" % task["item_id"][:8]
1221 # if not CIDR is given for the IP addresses, add /32:
1222 ip_proto = int(params.get("ip_proto"))
1223 source_ip = params.get("source_ip")
1224 destination_ip = params.get("destination_ip")
1225 if ip_proto == 1:
1226 ip_proto = 'icmp'
1227 elif ip_proto == 6:
1228 ip_proto = 'tcp'
1229 elif ip_proto == 17:
1230 ip_proto = 'udp'
1231 if '/' not in source_ip:
1232 source_ip += '/32'
1233 if '/' not in destination_ip:
1234 destination_ip += '/32'
1235 definition = {
1236 "logical_source_port": interfaces[0],
1237 "protocol": ip_proto,
1238 "source_ip_prefix": source_ip,
1239 "destination_ip_prefix": destination_ip,
1240 "source_port_range_min": params.get("source_port"),
1241 "source_port_range_max": params.get("source_port"),
1242 "destination_port_range_min": params.get("destination_port"),
1243 "destination_port_range_max": params.get("destination_port"),
1244 }
1245
1246 vim_classification_id = self.vim.new_classification(
1247 name, 'legacy_flow_classifier', definition)
1248
1249 task["extra"]["created"] = True
1250 task["error_msg"] = None
1251 task["status"] = "DONE"
1252 task["vim_id"] = vim_classification_id
1253 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
1254 return True, instance_element_update
1255
1256 except (vimconn.vimconnException, VimThreadException) as e:
1257 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1258 error_text = self._format_vim_error_msg(str(e))
1259 task["error_msg"] = error_text
1260 task["status"] = "FAILED"
1261 task["vim_id"] = None
1262 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1263 return False, instance_element_update
1264
1265 def del_classification(self, task):
1266 classification_vim_id = task["vim_id"]
1267 try:
1268 self.vim.delete_classification(classification_vim_id)
1269 task["status"] = "DONE"
1270 task["error_msg"] = None
1271 return True, None
1272
1273 except vimconn.vimconnException as e:
1274 task["error_msg"] = self._format_vim_error_msg(str(e))
1275 if isinstance(e, vimconn.vimconnNotFoundException):
1276 # If not found mark as Done and fill error_msg
1277 task["status"] = "DONE"
1278 return True, None
1279 task["status"] = "FAILED"
1280 return False, None
1281
1282 def new_sfp(self, task):
1283 vim_sfp_id = None
1284 try:
1285 params = task["params"]
1286 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1287 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in task.get("extra").get("depends_on")]
1288 error_text = ""
1289 sf_id_list = []
1290 classification_id_list = []
1291 for dep in depending_tasks:
1292 vim_id = dep.get("vim_id")
1293 resource = dep.get("item")
1294 if resource == "instance_sfs":
1295 sf_id_list.append(vim_id)
1296 elif resource == "instance_classifications":
1297 classification_id_list.append(vim_id)
1298
1299 name = "sfp-%s" % task["item_id"][:8]
1300 # By default no form of IETF SFC Encapsulation will be used
1301 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1302
1303 task["extra"]["created"] = True
1304 task["error_msg"] = None
1305 task["status"] = "DONE"
1306 task["vim_id"] = vim_sfp_id
1307 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1308 return True, instance_element_update
1309
1310 except (vimconn.vimconnException, VimThreadException) as e:
1311 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1312 error_text = self._format_vim_error_msg(str(e))
1313 task["error_msg"] = error_text
1314 task["status"] = "FAILED"
1315 task["vim_id"] = None
1316 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1317 return False, instance_element_update
1318 return
1319
1320 def del_sfp(self, task):
1321 sfp_vim_id = task["vim_id"]
1322 try:
1323 self.vim.delete_sfp(sfp_vim_id)
1324 task["status"] = "DONE"
1325 task["error_msg"] = None
1326 return True, None
1327
1328 except vimconn.vimconnException as e:
1329 task["error_msg"] = self._format_vim_error_msg(str(e))
1330 if isinstance(e, vimconn.vimconnNotFoundException):
1331 # If not found mark as Done and fill error_msg
1332 task["status"] = "DONE"
1333 return True, None
1334 task["status"] = "FAILED"
1335 return False, None