Fix bug, check VF is active before destroy
[osm/openvim.git] / osm_openvim / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 from jsonschema import validate as js_v, exceptions as js_e
38 #import libvirt
39 import imp
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 import os
43 import logging
44
45
46 class host_thread(threading.Thread):
47 lvirt_module = None
48
49 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
50 develop_bridge_iface, logger_name=None, debug=None):
51 '''Init a thread.
52 Arguments:
53 'id' number of thead
54 'name' name of thread
55 'host','user': host ip or name to manage and user
56 'db', 'db_lock': database class and lock to use it in exclusion
57 '''
58 threading.Thread.__init__(self)
59 self.name = name
60 self.host = host
61 self.user = user
62 self.db = db
63 self.db_lock = db_lock
64 self.test = test
65 self.localinfo_dirty = False
66
67 if not test and not host_thread.lvirt_module:
68 try:
69 module_info = imp.find_module("libvirt")
70 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
71 except (IOError, ImportError) as e:
72 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
73 if logger_name:
74 self.logger_name = logger_name
75 else:
76 self.logger_name = "openvim.host."+name
77 self.logger = logging.getLogger(self.logger_name)
78 if debug:
79 self.logger.setLevel(getattr(logging, debug))
80
81
82 self.develop_mode = develop_mode
83 self.develop_bridge_iface = develop_bridge_iface
84 self.image_path = image_path
85 self.host_id = host_id
86 self.version = version
87
88 self.xml_level = 0
89 #self.pending ={}
90
91 self.server_status = {} #dictionary with pairs server_uuid:server_status
92 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
93 self.next_update_server_status = 0 #time when must be check servers status
94
95 self.hostinfo = None
96
97 self.queueLock = threading.Lock()
98 self.taskQueue = Queue.Queue(2000)
99 self.ssh_conn = None
100
101 def ssh_connect(self):
102 try:
103 #Connect SSH
104 self.ssh_conn = paramiko.SSHClient()
105 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
106 self.ssh_conn.load_system_host_keys()
107 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
108 except paramiko.ssh_exception.SSHException as e:
109 text = e.args[0]
110 self.logger.error("ssh_connect ssh Exception: " + text)
111
112 def load_localinfo(self):
113 if not self.test:
114 try:
115 #Connect SSH
116 self.ssh_connect()
117
118 command = 'mkdir -p ' + self.image_path
119 #print self.name, ': command:', command
120 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
121 content = stderr.read()
122 if len(content) > 0:
123 self.logger.error("command: '%s' stderr: '%s'", command, content)
124
125 command = 'cat ' + self.image_path + '/.openvim.yaml'
126 #print self.name, ': command:', command
127 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
128 content = stdout.read()
129 if len(content) == 0:
130 self.logger.error("command: '%s' stderr='%s'", command, stderr.read())
131 raise paramiko.ssh_exception.SSHException("Error empty file, command: '{}'".format(command))
132 self.localinfo = yaml.load(content)
133 js_v(self.localinfo, localinfo_schema)
134 self.localinfo_dirty = False
135 if 'server_files' not in self.localinfo:
136 self.localinfo['server_files'] = {}
137 self.logger.debug("localinfo load from host")
138 return
139
140 except paramiko.ssh_exception.SSHException as e:
141 text = e.args[0]
142 self.logger.error("load_localinfo ssh Exception: " + text)
143 except host_thread.lvirt_module.libvirtError as e:
144 text = e.get_error_message()
145 self.logger.error("load_localinfo libvirt Exception: " + text)
146 except yaml.YAMLError as exc:
147 text = ""
148 if hasattr(exc, 'problem_mark'):
149 mark = exc.problem_mark
150 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
151 self.logger.error("load_localinfo yaml format Exception " + text)
152 except js_e.ValidationError as e:
153 text = ""
154 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
155 self.logger.error("load_localinfo format Exception: %s %s", text, str(e))
156 except Exception as e:
157 text = str(e)
158 self.logger.error("load_localinfo Exception: " + text)
159
160 #not loaded, insert a default data and force saving by activating dirty flag
161 self.localinfo = {'files':{}, 'server_files':{} }
162 #self.localinfo_dirty=True
163 self.localinfo_dirty=False
164
165 def load_hostinfo(self):
166 if self.test:
167 return;
168 try:
169 #Connect SSH
170 self.ssh_connect()
171
172
173 command = 'cat ' + self.image_path + '/hostinfo.yaml'
174 #print self.name, ': command:', command
175 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
176 content = stdout.read()
177 if len(content) == 0:
178 self.logger.error("command: '%s' stderr: '%s'", command, stderr.read())
179 raise paramiko.ssh_exception.SSHException("Error empty file ")
180 self.hostinfo = yaml.load(content)
181 js_v(self.hostinfo, hostinfo_schema)
182 self.logger.debug("hostlinfo load from host " + str(self.hostinfo))
183 return
184
185 except paramiko.ssh_exception.SSHException as e:
186 text = e.args[0]
187 self.logger.error("load_hostinfo ssh Exception: " + text)
188 except host_thread.lvirt_module.libvirtError as e:
189 text = e.get_error_message()
190 self.logger.error("load_hostinfo libvirt Exception: " + text)
191 except yaml.YAMLError as exc:
192 text = ""
193 if hasattr(exc, 'problem_mark'):
194 mark = exc.problem_mark
195 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
196 self.logger.error("load_hostinfo yaml format Exception " + text)
197 except js_e.ValidationError as e:
198 text = ""
199 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
200 self.logger.error("load_hostinfo format Exception: %s %s", text, e.message)
201 except Exception as e:
202 text = str(e)
203 self.logger.error("load_hostinfo Exception: " + text)
204
205 #not loaded, insert a default data
206 self.hostinfo = None
207
208 def save_localinfo(self, tries=3):
209 if self.test:
210 self.localinfo_dirty = False
211 return
212
213 while tries>=0:
214 tries-=1
215
216 try:
217 command = 'cat > ' + self.image_path + '/.openvim.yaml'
218 self.logger.debug("command:" + command)
219 (stdin, _, _) = self.ssh_conn.exec_command(command)
220 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
221 self.localinfo_dirty = False
222 break #while tries
223
224 except paramiko.ssh_exception.SSHException as e:
225 text = e.args[0]
226 self.logger.error("save_localinfo ssh Exception: " + text)
227 if "SSH session not active" in text:
228 self.ssh_connect()
229 except host_thread.lvirt_module.libvirtError as e:
230 text = e.get_error_message()
231 self.logger.error("save_localinfo libvirt Exception: " + text)
232 except yaml.YAMLError as exc:
233 text = ""
234 if hasattr(exc, 'problem_mark'):
235 mark = exc.problem_mark
236 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
237 self.logger.error("save_localinfo yaml format Exception " + text)
238 except Exception as e:
239 text = str(e)
240 self.logger.error("save_localinfo Exception: " + text)
241
242 def load_servers_from_db(self):
243 self.db_lock.acquire()
244 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
245 self.db_lock.release()
246
247 self.server_status = {}
248 if r<0:
249 self.logger.error("Error getting data from database: " + c)
250 return
251 for server in c:
252 self.server_status[ server['uuid'] ] = server['status']
253
254 #convert from old version to new one
255 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
256 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
257 if server_files_dict['source file'][-5:] == 'qcow2':
258 server_files_dict['file format'] = 'qcow2'
259
260 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
261 if 'inc_files' in self.localinfo:
262 del self.localinfo['inc_files']
263 self.localinfo_dirty = True
264
265 def delete_unused_files(self):
266 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
267 Deletes unused entries at self.loacalinfo and the corresponding local files.
268 The only reason for this mismatch is the manual deletion of instances (VM) at database
269 '''
270 if self.test:
271 return
272 for uuid,images in self.localinfo['server_files'].items():
273 if uuid not in self.server_status:
274 for localfile in images.values():
275 try:
276 self.logger.debug("deleting file '%s' of unused server '%s'", localfile['source file'], uuid)
277 self.delete_file(localfile['source file'])
278 except paramiko.ssh_exception.SSHException as e:
279 self.logger.error("Exception deleting file '%s': %s", localfile['source file'], str(e))
280 del self.localinfo['server_files'][uuid]
281 self.localinfo_dirty = True
282
283 def insert_task(self, task, *aditional):
284 try:
285 self.queueLock.acquire()
286 task = self.taskQueue.put( (task,) + aditional, timeout=5)
287 self.queueLock.release()
288 return 1, None
289 except Queue.Full:
290 return -1, "timeout inserting a task over host " + self.name
291
292 def run(self):
293 while True:
294 self.load_localinfo()
295 self.load_hostinfo()
296 self.load_servers_from_db()
297 self.delete_unused_files()
298 while True:
299 try:
300 self.queueLock.acquire()
301 if not self.taskQueue.empty():
302 task = self.taskQueue.get()
303 else:
304 task = None
305 self.queueLock.release()
306
307 if task is None:
308 now=time.time()
309 if self.localinfo_dirty:
310 self.save_localinfo()
311 elif self.next_update_server_status < now:
312 self.update_servers_status()
313 self.next_update_server_status = now + 5
314 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
315 self.server_forceoff()
316 else:
317 time.sleep(1)
318 continue
319
320 if task[0] == 'instance':
321 self.logger.debug("processing task instance " + str(task[1]['action']))
322 retry = 0
323 while retry < 2:
324 retry += 1
325 r = self.action_on_server(task[1], retry==2)
326 if r >= 0:
327 break
328 elif task[0] == 'image':
329 pass
330 elif task[0] == 'exit':
331 self.logger.debug("processing task exit")
332 self.terminate()
333 return 0
334 elif task[0] == 'reload':
335 self.logger.debug("processing task reload terminating and relaunching")
336 self.terminate()
337 break
338 elif task[0] == 'edit-iface':
339 self.logger.debug("processing task edit-iface port={}, old_net={}, new_net={}".format(
340 task[1], task[2], task[3]))
341 self.edit_iface(task[1], task[2], task[3])
342 elif task[0] == 'restore-iface':
343 self.logger.debug("processing task restore-iface={} mac={}".format(task[1], task[2]))
344 self.restore_iface(task[1], task[2])
345 elif task[0] == 'new-ovsbridge':
346 self.logger.debug("Creating compute OVS bridge")
347 self.create_ovs_bridge()
348 elif task[0] == 'new-vxlan':
349 self.logger.debug("Creating vxlan tunnel='{}', remote ip='{}'".format(task[1], task[2]))
350 self.create_ovs_vxlan_tunnel(task[1], task[2])
351 elif task[0] == 'del-ovsbridge':
352 self.logger.debug("Deleting OVS bridge")
353 self.delete_ovs_bridge()
354 elif task[0] == 'del-vxlan':
355 self.logger.debug("Deleting vxlan {} tunnel".format(task[1]))
356 self.delete_ovs_vxlan_tunnel(task[1])
357 elif task[0] == 'create-ovs-bridge-port':
358 self.logger.debug("Adding port ovim-{} to OVS bridge".format(task[1]))
359 self.create_ovs_bridge_port(task[1])
360 elif task[0] == 'del-ovs-port':
361 self.logger.debug("Delete bridge attached to ovs port vlan {} net {}".format(task[1], task[2]))
362 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
363 else:
364 self.logger.debug("unknown task " + str(task))
365
366 except Exception as e:
367 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
368
369 def server_forceoff(self, wait_until_finished=False):
370 while len(self.pending_terminate_server)>0:
371 now = time.time()
372 if self.pending_terminate_server[0][0]>now:
373 if wait_until_finished:
374 time.sleep(1)
375 continue
376 else:
377 return
378 req={'uuid':self.pending_terminate_server[0][1],
379 'action':{'terminate':'force'},
380 'status': None
381 }
382 self.action_on_server(req)
383 self.pending_terminate_server.pop(0)
384
385 def terminate(self):
386 try:
387 self.server_forceoff(True)
388 if self.localinfo_dirty:
389 self.save_localinfo()
390 if not self.test:
391 self.ssh_conn.close()
392 except Exception as e:
393 text = str(e)
394 self.logger.error("terminate Exception: " + text)
395 self.logger.debug("exit from host_thread")
396
397 def get_local_iface_name(self, generic_name):
398 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
399 return self.hostinfo["iface_names"][generic_name]
400 return generic_name
401
402 def create_xml_server(self, server, dev_list, server_metadata={}):
403 """Function that implements the generation of the VM XML definition.
404 Additional devices are in dev_list list
405 The main disk is upon dev_list[0]"""
406
407 #get if operating system is Windows
408 windows_os = False
409 os_type = server_metadata.get('os_type', None)
410 if os_type == None and 'metadata' in dev_list[0]:
411 os_type = dev_list[0]['metadata'].get('os_type', None)
412 if os_type != None and os_type.lower() == "windows":
413 windows_os = True
414 #get type of hard disk bus
415 bus_ide = True if windows_os else False
416 bus = server_metadata.get('bus', None)
417 if bus == None and 'metadata' in dev_list[0]:
418 bus = dev_list[0]['metadata'].get('bus', None)
419 if bus != None:
420 bus_ide = True if bus=='ide' else False
421
422 self.xml_level = 0
423
424 text = "<domain type='kvm'>"
425 #get topology
426 topo = server_metadata.get('topology', None)
427 if topo == None and 'metadata' in dev_list[0]:
428 topo = dev_list[0]['metadata'].get('topology', None)
429 #name
430 name = server.get('name','') + "_" + server['uuid']
431 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
432 text += self.inc_tab() + "<name>" + name+ "</name>"
433 #uuid
434 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
435
436 numa={}
437 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
438 numa = server['extended']['numas'][0]
439 #memory
440 use_huge = False
441 memory = int(numa.get('memory',0))*1024*1024 #in KiB
442 if memory==0:
443 memory = int(server['ram'])*1024;
444 else:
445 if not self.develop_mode:
446 use_huge = True
447 if memory==0:
448 return -1, 'No memory assigned to instance'
449 memory = str(memory)
450 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
451 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
452 if use_huge:
453 text += self.tab()+'<memoryBacking>'+ \
454 self.inc_tab() + '<hugepages/>'+ \
455 self.dec_tab()+ '</memoryBacking>'
456
457 #cpu
458 use_cpu_pinning=False
459 vcpus = int(server.get("vcpus",0))
460 cpu_pinning = []
461 if 'cores-source' in numa:
462 use_cpu_pinning=True
463 for index in range(0, len(numa['cores-source'])):
464 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
465 vcpus += 1
466 if 'threads-source' in numa:
467 use_cpu_pinning=True
468 for index in range(0, len(numa['threads-source'])):
469 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
470 vcpus += 1
471 if 'paired-threads-source' in numa:
472 use_cpu_pinning=True
473 for index in range(0, len(numa['paired-threads-source'])):
474 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
475 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
476 vcpus += 2
477
478 if use_cpu_pinning and not self.develop_mode:
479 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
480 self.tab()+'<cputune>'
481 self.xml_level += 1
482 for i in range(0, len(cpu_pinning)):
483 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
484 text += self.dec_tab()+'</cputune>'+ \
485 self.tab() + '<numatune>' +\
486 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
487 self.dec_tab() + '</numatune>'
488 else:
489 if vcpus==0:
490 return -1, "Instance without number of cpus"
491 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
492
493 #boot
494 boot_cdrom = False
495 for dev in dev_list:
496 if dev['type']=='cdrom' :
497 boot_cdrom = True
498 break
499 text += self.tab()+ '<os>' + \
500 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
501 if boot_cdrom:
502 text += self.tab() + "<boot dev='cdrom'/>"
503 text += self.tab() + "<boot dev='hd'/>" + \
504 self.dec_tab()+'</os>'
505 #features
506 text += self.tab()+'<features>'+\
507 self.inc_tab()+'<acpi/>' +\
508 self.tab()+'<apic/>' +\
509 self.tab()+'<pae/>'+ \
510 self.dec_tab() +'</features>'
511 if topo == "oneSocket:hyperthreading":
512 if vcpus % 2 != 0:
513 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
514 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % vcpus/2
515 elif windows_os or topo == "oneSocket":
516 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
517 else:
518 text += self.tab() + "<cpu mode='host-model'></cpu>"
519 text += self.tab() + "<clock offset='utc'/>" +\
520 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
521 self.tab() + "<on_reboot>restart</on_reboot>" + \
522 self.tab() + "<on_crash>restart</on_crash>"
523 text += self.tab() + "<devices>" + \
524 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
525 self.tab() + "<serial type='pty'>" +\
526 self.inc_tab() + "<target port='0'/>" + \
527 self.dec_tab() + "</serial>" +\
528 self.tab() + "<console type='pty'>" + \
529 self.inc_tab()+ "<target type='serial' port='0'/>" + \
530 self.dec_tab()+'</console>'
531 if windows_os:
532 text += self.tab() + "<controller type='usb' index='0'/>" + \
533 self.tab() + "<controller type='ide' index='0'/>" + \
534 self.tab() + "<input type='mouse' bus='ps2'/>" + \
535 self.tab() + "<sound model='ich6'/>" + \
536 self.tab() + "<video>" + \
537 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
538 self.dec_tab() + "</video>" + \
539 self.tab() + "<memballoon model='virtio'/>" + \
540 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
541
542 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
543 #> self.dec_tab()+'</hostdev>\n' +\
544 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
545 if windows_os:
546 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
547 else:
548 #If image contains 'GRAPH' include graphics
549 #if 'GRAPH' in image:
550 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
551 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
552 self.dec_tab() + "</graphics>"
553
554 vd_index = 'a'
555 for dev in dev_list:
556 bus_ide_dev = bus_ide
557 if dev['type']=='cdrom' or dev['type']=='disk':
558 if dev['type']=='cdrom':
559 bus_ide_dev = True
560 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
561 if 'file format' in dev:
562 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
563 if 'source file' in dev:
564 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
565 #elif v['type'] == 'block':
566 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
567 #else:
568 # return -1, 'Unknown disk type ' + v['type']
569 vpci = dev.get('vpci',None)
570 if vpci == None:
571 vpci = dev['metadata'].get('vpci',None)
572 text += self.pci2xml(vpci)
573
574 if bus_ide_dev:
575 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
576 else:
577 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
578 text += self.dec_tab() + '</disk>'
579 vd_index = chr(ord(vd_index)+1)
580 elif dev['type']=='xml':
581 dev_text = dev['xml']
582 if 'vpci' in dev:
583 dev_text = dev_text.replace('__vpci__', dev['vpci'])
584 if 'source file' in dev:
585 dev_text = dev_text.replace('__file__', dev['source file'])
586 if 'file format' in dev:
587 dev_text = dev_text.replace('__format__', dev['source file'])
588 if '__dev__' in dev_text:
589 dev_text = dev_text.replace('__dev__', vd_index)
590 vd_index = chr(ord(vd_index)+1)
591 text += dev_text
592 else:
593 return -1, 'Unknown device type ' + dev['type']
594
595 net_nb=0
596 bridge_interfaces = server.get('networks', [])
597 for v in bridge_interfaces:
598 #Get the brifge name
599 self.db_lock.acquire()
600 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
601 self.db_lock.release()
602 if result <= 0:
603 self.logger.error("create_xml_server ERROR %d getting nets %s", result, content)
604 return -1, content
605 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
606 #I know it is not secure
607 #for v in sorted(desc['network interfaces'].itervalues()):
608 model = v.get("model", None)
609 if content[0]['provider']=='default':
610 text += self.tab() + "<interface type='network'>" + \
611 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
612 elif content[0]['provider'][0:7]=='macvtap':
613 text += self.tab()+"<interface type='direct'>" + \
614 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
615 self.tab() + "<target dev='macvtap0'/>"
616 if windows_os:
617 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
618 elif model==None:
619 model = "virtio"
620 elif content[0]['provider'][0:6]=='bridge':
621 text += self.tab() + "<interface type='bridge'>" + \
622 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
623 if windows_os:
624 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
625 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
626 elif model==None:
627 model = "virtio"
628 elif content[0]['provider'][0:3] == "OVS":
629 vlan = content[0]['provider'].replace('OVS:', '')
630 text += self.tab() + "<interface type='bridge'>" + \
631 self.inc_tab() + "<source bridge='ovim-" + str(vlan) + "'/>"
632 else:
633 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
634 if model!=None:
635 text += self.tab() + "<model type='" +model+ "'/>"
636 if v.get('mac_address', None) != None:
637 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
638 text += self.pci2xml(v.get('vpci',None))
639 text += self.dec_tab()+'</interface>'
640
641 net_nb += 1
642
643 interfaces = numa.get('interfaces', [])
644
645 net_nb=0
646 for v in interfaces:
647 if self.develop_mode: #map these interfaces to bridges
648 text += self.tab() + "<interface type='bridge'>" + \
649 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
650 if windows_os:
651 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
652 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
653 else:
654 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
655 if v.get('mac_address', None) != None:
656 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
657 text += self.pci2xml(v.get('vpci',None))
658 text += self.dec_tab()+'</interface>'
659 continue
660
661 if v['dedicated'] == 'yes': #passthrought
662 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
663 self.inc_tab() + "<source>"
664 self.inc_tab()
665 text += self.pci2xml(v['source'])
666 text += self.dec_tab()+'</source>'
667 text += self.pci2xml(v.get('vpci',None))
668 if windows_os:
669 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
670 text += self.dec_tab()+'</hostdev>'
671 net_nb += 1
672 else: #sriov_interfaces
673 #skip not connected interfaces
674 if v.get("net_id") == None:
675 continue
676 text += self.tab() + "<interface type='hostdev' managed='yes'>"
677 self.inc_tab()
678 if v.get('mac_address', None) != None:
679 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
680 text+= self.tab()+'<source>'
681 self.inc_tab()
682 text += self.pci2xml(v['source'])
683 text += self.dec_tab()+'</source>'
684 if v.get('vlan',None) != None:
685 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
686 text += self.pci2xml(v.get('vpci',None))
687 if windows_os:
688 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
689 text += self.dec_tab()+'</interface>'
690
691
692 text += self.dec_tab()+'</devices>'+\
693 self.dec_tab()+'</domain>'
694 return 0, text
695
696 def pci2xml(self, pci):
697 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
698 alows an empty pci text'''
699 if pci is None:
700 return ""
701 first_part = pci.split(':')
702 second_part = first_part[2].split('.')
703 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
704 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
705 "' function='0x" + second_part[1] + "'/>"
706
707 def tab(self):
708 """Return indentation according to xml_level"""
709 return "\n" + (' '*self.xml_level)
710
711 def inc_tab(self):
712 """Increment and return indentation according to xml_level"""
713 self.xml_level += 1
714 return self.tab()
715
716 def dec_tab(self):
717 """Decrement and return indentation according to xml_level"""
718 self.xml_level -= 1
719 return self.tab()
720
721 def create_ovs_bridge(self):
722 """
723 Create a bridge in compute OVS to allocate VMs
724 :return: True if success
725 """
726 if self.test:
727 return True
728 try:
729 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
730 self.logger.debug("command: " + command)
731 (_, stdout, _) = self.ssh_conn.exec_command(command)
732 content = stdout.read()
733 if len(content) == 0:
734 return True
735 else:
736 return False
737 except paramiko.ssh_exception.SSHException as e:
738 self.logger.error("create_ovs_bridge ssh Exception: " + str(e))
739 if "SSH session not active" in str(e):
740 self.ssh_connect()
741 return False
742
743 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
744 """
745 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
746 :param vlan: vlan port id
747 :param net_uuid: network id
748 :return:
749 """
750
751 if self.test:
752 return True
753 try:
754 port_name = 'ovim-' + str(vlan)
755 command = 'sudo ovs-vsctl del-port br-int ' + port_name
756 self.logger.debug("command: " + command)
757 (_, stdout, _) = self.ssh_conn.exec_command(command)
758 content = stdout.read()
759 if len(content) == 0:
760 return True
761 else:
762 return False
763 except paramiko.ssh_exception.SSHException as e:
764 self.logger.error("delete_port_to_ovs_bridge ssh Exception: " + str(e))
765 if "SSH session not active" in str(e):
766 self.ssh_connect()
767 return False
768
769 def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
770 """
771 Delete dhcp server process lining in namespace
772 :param vlan: segmentation id
773 :param net_uuid: network uuid
774 :param dhcp_path: conf fiel path that live in namespace side
775 :return:
776 """
777 if self.test:
778 return True
779 if not self.is_dhcp_port_free(vlan, net_uuid):
780 return True
781 try:
782 net_namespace = 'ovim-' + str(vlan)
783 dhcp_path = os.path.join(dhcp_path, net_namespace)
784 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
785
786 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file
787 self.logger.debug("command: " + command)
788 (_, stdout, _) = self.ssh_conn.exec_command(command)
789 content = stdout.read()
790
791 command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content
792 self.logger.debug("command: " + command)
793 (_, stdout, _) = self.ssh_conn.exec_command(command)
794 content = stdout.read()
795
796 # if len(content) == 0:
797 # return True
798 # else:
799 # return False
800 except paramiko.ssh_exception.SSHException as e:
801 self.logger.error("delete_dhcp_server ssh Exception: " + str(e))
802 if "SSH session not active" in str(e):
803 self.ssh_connect()
804 return False
805
806 def is_dhcp_port_free(self, host_id, net_uuid):
807 """
808 Check if any port attached to the a net in a vxlan mesh across computes nodes
809 :param host_id: host id
810 :param net_uuid: network id
811 :return: True if is not free
812 """
813 self.db_lock.acquire()
814 result, content = self.db.get_table(
815 FROM='ports',
816 WHERE={'p.type': 'instance:ovs', 'p.net_id': net_uuid}
817 )
818 self.db_lock.release()
819
820 if len(content) > 0:
821 return False
822 else:
823 return True
824
825 def is_port_free(self, host_id, net_uuid):
826 """
827 Check if there not ovs ports of a network in a compute host.
828 :param host_id: host id
829 :param net_uuid: network id
830 :return: True if is not free
831 """
832
833 self.db_lock.acquire()
834 result, content = self.db.get_table(
835 FROM='ports as p join instances as i on p.instance_id=i.uuid',
836 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
837 )
838 self.db_lock.release()
839
840 if len(content) > 0:
841 return False
842 else:
843 return True
844
845 def add_port_to_ovs_bridge(self, vlan):
846 """
847 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
848 :param vlan: vlan port id
849 :return: True if success
850 """
851
852 if self.test:
853 return True
854 try:
855 port_name = 'ovim-' + str(vlan)
856 command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + str(vlan)
857 self.logger.debug("command: " + command)
858 (_, stdout, _) = self.ssh_conn.exec_command(command)
859 content = stdout.read()
860 if len(content) == 0:
861 return True
862 else:
863 return False
864 except paramiko.ssh_exception.SSHException as e:
865 self.logger.error("add_port_to_ovs_bridge ssh Exception: " + str(e))
866 if "SSH session not active" in str(e):
867 self.ssh_connect()
868 return False
869
870 def delete_dhcp_port(self, vlan, net_uuid):
871 """
872 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
873 :param vlan: segmentation id
874 :param net_uuid: network id
875 :return: True if success
876 """
877
878 if self.test:
879 return True
880
881 if not self.is_dhcp_port_free(vlan, net_uuid):
882 return True
883 self.delete_dhcp_interfaces(vlan)
884 return True
885
886 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
887 """
888 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
889 :param vlan:
890 :param net_uuid:
891 :return: True if success
892 """
893 if self.test:
894 return
895
896 if not self.is_port_free(vlan, net_uuid):
897 return True
898 self.delete_port_to_ovs_bridge(vlan, net_uuid)
899 self.delete_linux_bridge(vlan)
900 return True
901
902 def delete_linux_bridge(self, vlan):
903 """
904 Delete a linux bridge in a scpecific compute.
905 :param vlan: vlan port id
906 :return: True if success
907 """
908
909 if self.test:
910 return True
911 try:
912 port_name = 'ovim-' + str(vlan)
913 command = 'sudo ip link set dev veth0-' + str(vlan) + ' down'
914 self.logger.debug("command: " + command)
915 (_, stdout, _) = self.ssh_conn.exec_command(command)
916 # content = stdout.read()
917 #
918 # if len(content) != 0:
919 # return False
920 command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name
921 self.logger.debug("command: " + command)
922 (_, stdout, _) = self.ssh_conn.exec_command(command)
923 content = stdout.read()
924 if len(content) == 0:
925 return True
926 else:
927 return False
928 except paramiko.ssh_exception.SSHException as e:
929 self.logger.error("delete_linux_bridge ssh Exception: " + str(e))
930 if "SSH session not active" in str(e):
931 self.ssh_connect()
932 return False
933
934 def create_ovs_bridge_port(self, vlan):
935 """
936 Generate a linux bridge and attache the port to a OVS bridge
937 :param vlan: vlan port id
938 :return:
939 """
940 if self.test:
941 return
942 self.create_linux_bridge(vlan)
943 self.add_port_to_ovs_bridge(vlan)
944
945 def create_linux_bridge(self, vlan):
946 """
947 Create a linux bridge with STP active
948 :param vlan: netowrk vlan id
949 :return:
950 """
951
952 if self.test:
953 return True
954 try:
955 port_name = 'ovim-' + str(vlan)
956 command = 'sudo brctl show | grep ' + port_name
957 self.logger.debug("command: " + command)
958 (_, stdout, _) = self.ssh_conn.exec_command(command)
959 content = stdout.read()
960
961 # if exist nothing to create
962 # if len(content) == 0:
963 # return False
964
965 command = 'sudo brctl addbr ' + port_name
966 self.logger.debug("command: " + command)
967 (_, stdout, _) = self.ssh_conn.exec_command(command)
968 content = stdout.read()
969
970 # if len(content) == 0:
971 # return True
972 # else:
973 # return False
974
975 command = 'sudo brctl stp ' + port_name + ' on'
976 self.logger.debug("command: " + command)
977 (_, stdout, _) = self.ssh_conn.exec_command(command)
978 content = stdout.read()
979
980 # if len(content) == 0:
981 # return True
982 # else:
983 # return False
984 command = 'sudo ip link set dev ' + port_name + ' up'
985 self.logger.debug("command: " + command)
986 (_, stdout, _) = self.ssh_conn.exec_command(command)
987 content = stdout.read()
988
989 if len(content) == 0:
990 return True
991 else:
992 return False
993 except paramiko.ssh_exception.SSHException as e:
994 self.logger.error("create_linux_bridge ssh Exception: " + str(e))
995 if "SSH session not active" in str(e):
996 self.ssh_connect()
997 return False
998
999 def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path):
1000 """
1001 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
1002 :param ip: IP address asigned to a VM
1003 :param mac: VM vnic mac to be macthed with the IP received
1004 :param vlan: Segmentation id
1005 :param netmask: netmask value
1006 :param path: dhcp conf file path that live in namespace side
1007 :return: True if success
1008 """
1009
1010 if self.test:
1011 return True
1012
1013 net_namespace = 'ovim-' + str(vlan)
1014 dhcp_path = os.path.join(dhcp_path, net_namespace)
1015 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1016
1017 if not ip:
1018 return False
1019 try:
1020 ip_data = mac.upper() + ',' + ip
1021
1022 command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir
1023 self.logger.debug("command: " + command)
1024 (_, stdout, _) = self.ssh_conn.exec_command(command)
1025 content = stdout.read()
1026
1027 command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"'
1028
1029 self.logger.debug("command: " + command)
1030 (_, stdout, _) = self.ssh_conn.exec_command(command)
1031 content = stdout.read()
1032
1033 if len(content) == 0:
1034 return True
1035 else:
1036 return False
1037 except paramiko.ssh_exception.SSHException as e:
1038 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1039 if "SSH session not active" in str(e):
1040 self.ssh_connect()
1041 return False
1042
1043 def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path):
1044 """
1045 Delete into dhcp conf file the ip assigned to a specific MAC address
1046
1047 :param ip: IP address asigned to a VM
1048 :param mac: VM vnic mac to be macthed with the IP received
1049 :param vlan: Segmentation id
1050 :param dhcp_path: dhcp conf file path that live in namespace side
1051 :return:
1052 """
1053
1054 if self.test:
1055 return False
1056 try:
1057 net_namespace = 'ovim-' + str(vlan)
1058 dhcp_path = os.path.join(dhcp_path, net_namespace)
1059 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1060
1061 if not ip:
1062 return False
1063
1064 ip_data = mac.upper() + ',' + ip
1065
1066 command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir
1067 self.logger.debug("command: " + command)
1068 (_, stdout, _) = self.ssh_conn.exec_command(command)
1069 content = stdout.read()
1070
1071 if len(content) == 0:
1072 return True
1073 else:
1074 return False
1075
1076 except paramiko.ssh_exception.SSHException as e:
1077 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1078 if "SSH session not active" in str(e):
1079 self.ssh_connect()
1080 return False
1081
1082 def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path, gateway):
1083 """
1084 Generate a linux bridge and attache the port to a OVS bridge
1085 :param self:
1086 :param vlan: Segmentation id
1087 :param ip_range: IP dhcp range
1088 :param netmask: network netmask
1089 :param dhcp_path: dhcp conf file path that live in namespace side
1090 :param gateway: Gateway address for dhcp net
1091 :return: True if success
1092 """
1093
1094 if self.test:
1095 return True
1096 try:
1097 interface = 'tap-' + str(vlan)
1098 net_namespace = 'ovim-' + str(vlan)
1099 dhcp_path = os.path.join(dhcp_path, net_namespace)
1100 leases_path = os.path.join(dhcp_path, "dnsmasq.leases")
1101 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
1102
1103 dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask
1104
1105 command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path
1106 self.logger.debug("command: " + command)
1107 (_, stdout, _) = self.ssh_conn.exec_command(command)
1108 content = stdout.read()
1109
1110 pid_path = os.path.join(dhcp_path, 'dnsmasq.pid')
1111 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path
1112 self.logger.debug("command: " + command)
1113 (_, stdout, _) = self.ssh_conn.exec_command(command)
1114 content = stdout.read()
1115 # check if pid is runing
1116 pid_status_path = content
1117 if content:
1118 command = "ps aux | awk '{print $2 }' | grep " + pid_status_path
1119 self.logger.debug("command: " + command)
1120 (_, stdout, _) = self.ssh_conn.exec_command(command)
1121 content = stdout.read()
1122 if not content:
1123 command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1124 '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \
1125 ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + \
1126 ' --listen-address ' + gateway
1127
1128 self.logger.debug("command: " + command)
1129 (_, stdout, _) = self.ssh_conn.exec_command(command)
1130 content = stdout.readline()
1131
1132 if len(content) == 0:
1133 return True
1134 else:
1135 return False
1136 except paramiko.ssh_exception.SSHException as e:
1137 self.logger.error("launch_dhcp_server ssh Exception: " + str(e))
1138 if "SSH session not active" in str(e):
1139 self.ssh_connect()
1140 return False
1141
1142 def delete_dhcp_interfaces(self, vlan):
1143 """
1144 Create a linux bridge with STP active
1145 :param vlan: netowrk vlan id
1146 :return:
1147 """
1148
1149 if self.test:
1150 return True
1151 try:
1152 net_namespace = 'ovim-' + str(vlan)
1153 command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + str(vlan)
1154 self.logger.debug("command: " + command)
1155 (_, stdout, _) = self.ssh_conn.exec_command(command)
1156 content = stdout.read()
1157
1158 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' down'
1159 self.logger.debug("command: " + command)
1160 (_, stdout, _) = self.ssh_conn.exec_command(command)
1161 content = stdout.read()
1162
1163 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' down'
1164 self.logger.debug("command: " + command)
1165 (_, stdout, _) = self.ssh_conn.exec_command(command)
1166 content = stdout.read()
1167 except paramiko.ssh_exception.SSHException as e:
1168 self.logger.error("delete_dhcp_interfaces ssh Exception: " + str(e))
1169 if "SSH session not active" in str(e):
1170 self.ssh_connect()
1171 return False
1172
1173 def create_dhcp_interfaces(self, vlan, ip_listen_address, netmask):
1174 """
1175 Create a linux bridge with STP active
1176 :param vlan: segmentation id
1177 :param ip_listen_address: Listen Ip address for the dhcp service, the tap interface living in namesapce side
1178 :param netmask: dhcp net CIDR
1179 :return: True if success
1180 """
1181
1182 if self.test:
1183 return True
1184 try:
1185 net_namespace = 'ovim-' + str(vlan)
1186 namespace_interface = 'tap-' + str(vlan)
1187
1188 command = 'sudo ip netns add ' + net_namespace
1189 self.logger.debug("command: " + command)
1190 (_, stdout, _) = self.ssh_conn.exec_command(command)
1191 content = stdout.read()
1192
1193 command = 'sudo ip link add tap-' + str(vlan) + ' type veth peer name ovs-tap-' + str(vlan)
1194 self.logger.debug("command: " + command)
1195 (_, stdout, _) = self.ssh_conn.exec_command(command)
1196 content = stdout.read()
1197
1198 command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + str(vlan) + ' tag=' + str(vlan)
1199 self.logger.debug("command: " + command)
1200 (_, stdout, _) = self.ssh_conn.exec_command(command)
1201 content = stdout.read()
1202
1203 command = 'sudo ip link set tap-' + str(vlan) + ' netns ' + net_namespace
1204 self.logger.debug("command: " + command)
1205 (_, stdout, _) = self.ssh_conn.exec_command(command)
1206 content = stdout.read()
1207
1208 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' up'
1209 self.logger.debug("command: " + command)
1210 (_, stdout, _) = self.ssh_conn.exec_command(command)
1211 content = stdout.read()
1212
1213 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' up'
1214 self.logger.debug("command: " + command)
1215 (_, stdout, _) = self.ssh_conn.exec_command(command)
1216 content = stdout.read()
1217
1218 command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \
1219 + ' ' + ip_listen_address + ' netmask ' + netmask
1220 self.logger.debug("command: " + command)
1221 (_, stdout, _) = self.ssh_conn.exec_command(command)
1222 content = stdout.read()
1223
1224 if len(content) == 0:
1225 return True
1226 else:
1227 return False
1228 except paramiko.ssh_exception.SSHException as e:
1229 self.logger.error("create_dhcp_interfaces ssh Exception: " + str(e))
1230 if "SSH session not active" in str(e):
1231 self.ssh_connect()
1232 return False
1233
1234
1235 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
1236 """
1237 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1238 :param vxlan_interface: vlxan inteface name.
1239 :param remote_ip: tunnel endpoint remote compute ip.
1240 :return:
1241 """
1242 if self.test:
1243 return True
1244 try:
1245 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
1246 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
1247 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
1248 self.logger.debug("command: " + command)
1249 (_, stdout, _) = self.ssh_conn.exec_command(command)
1250 content = stdout.read()
1251 # print content
1252 if len(content) == 0:
1253 return True
1254 else:
1255 return False
1256 except paramiko.ssh_exception.SSHException as e:
1257 self.logger.error("create_ovs_vxlan_tunnel ssh Exception: " + str(e))
1258 if "SSH session not active" in str(e):
1259 self.ssh_connect()
1260 return False
1261
1262 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
1263 """
1264 Delete a vlxan tunnel port from a OVS brdige.
1265 :param vxlan_interface: vlxan name to be delete it.
1266 :return: True if success.
1267 """
1268 if self.test:
1269 return True
1270 try:
1271 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1272 self.logger.debug("command: " + command)
1273 (_, stdout, _) = self.ssh_conn.exec_command(command)
1274 content = stdout.read()
1275 # print content
1276 if len(content) == 0:
1277 return True
1278 else:
1279 return False
1280 except paramiko.ssh_exception.SSHException as e:
1281 self.logger.error("delete_ovs_vxlan_tunnel ssh Exception: " + str(e))
1282 if "SSH session not active" in str(e):
1283 self.ssh_connect()
1284 return False
1285
1286 def delete_ovs_bridge(self):
1287 """
1288 Delete a OVS bridge from a compute.
1289 :return: True if success
1290 """
1291 if self.test:
1292 return True
1293 try:
1294 command = 'sudo ovs-vsctl del-br br-int'
1295 self.logger.debug("command: " + command)
1296 (_, stdout, _) = self.ssh_conn.exec_command(command)
1297 content = stdout.read()
1298 if len(content) == 0:
1299 return True
1300 else:
1301 return False
1302 except paramiko.ssh_exception.SSHException as e:
1303 self.logger.error("delete_ovs_bridge ssh Exception: " + str(e))
1304 if "SSH session not active" in str(e):
1305 self.ssh_connect()
1306 return False
1307
1308 def get_file_info(self, path):
1309 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1310 self.logger.debug("command: " + command)
1311 (_, stdout, _) = self.ssh_conn.exec_command(command)
1312 content = stdout.read()
1313 if len(content) == 0:
1314 return None # file does not exist
1315 else:
1316 return content.split(" ") # (permission, 1, owner, group, size, date, file)
1317
1318 def qemu_get_info(self, path):
1319 command = 'qemu-img info ' + path
1320 self.logger.debug("command: " + command)
1321 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
1322 content = stdout.read()
1323 if len(content) == 0:
1324 error = stderr.read()
1325 self.logger.error("get_qemu_info error " + error)
1326 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
1327 else:
1328 try:
1329 return yaml.load(content)
1330 except yaml.YAMLError as exc:
1331 text = ""
1332 if hasattr(exc, 'problem_mark'):
1333 mark = exc.problem_mark
1334 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
1335 self.logger.error("get_qemu_info yaml format Exception " + text)
1336 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
1337
1338 def qemu_change_backing(self, inc_file, new_backing_file):
1339 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
1340 self.logger.debug("command: " + command)
1341 (_, _, stderr) = self.ssh_conn.exec_command(command)
1342 content = stderr.read()
1343 if len(content) == 0:
1344 return 0
1345 else:
1346 self.logger.error("qemu_change_backing error: " + content)
1347 return -1
1348
1349 def get_notused_filename(self, proposed_name, suffix=''):
1350 '''Look for a non existing file_name in the host
1351 proposed_name: proposed file name, includes path
1352 suffix: suffix to be added to the name, before the extention
1353 '''
1354 extension = proposed_name.rfind(".")
1355 slash = proposed_name.rfind("/")
1356 if extension < 0 or extension < slash: # no extension
1357 extension = len(proposed_name)
1358 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
1359 info = self.get_file_info(target_name)
1360 if info is None:
1361 return target_name
1362
1363 index=0
1364 while info is not None:
1365 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
1366 index+=1
1367 info = self.get_file_info(target_name)
1368 return target_name
1369
1370 def get_notused_path(self, proposed_path, suffix=''):
1371 '''Look for a non existing path at database for images
1372 proposed_path: proposed file name, includes path
1373 suffix: suffix to be added to the name, before the extention
1374 '''
1375 extension = proposed_path.rfind(".")
1376 if extension < 0:
1377 extension = len(proposed_path)
1378 if suffix != None:
1379 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
1380 index=0
1381 while True:
1382 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
1383 if r<=0:
1384 return target_path
1385 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
1386 index+=1
1387
1388
1389 def delete_file(self, file_name):
1390 command = 'rm -f '+file_name
1391 self.logger.debug("command: " + command)
1392 (_, _, stderr) = self.ssh_conn.exec_command(command)
1393 error_msg = stderr.read()
1394 if len(error_msg) > 0:
1395 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
1396
1397 def copy_file(self, source, destination, perserve_time=True):
1398 if source[0:4]=="http":
1399 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1400 dst=destination, src=source, dst_result=destination + ".result" )
1401 else:
1402 command = 'cp --no-preserve=mode'
1403 if perserve_time:
1404 command += ' --preserve=timestamps'
1405 command += " '{}' '{}'".format(source, destination)
1406 self.logger.debug("command: " + command)
1407 (_, _, stderr) = self.ssh_conn.exec_command(command)
1408 error_msg = stderr.read()
1409 if len(error_msg) > 0:
1410 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1411
1412 def copy_remote_file(self, remote_file, use_incremental):
1413 ''' Copy a file from the repository to local folder and recursively
1414 copy the backing files in case the remote file is incremental
1415 Read and/or modified self.localinfo['files'] that contain the
1416 unmodified copies of images in the local path
1417 params:
1418 remote_file: path of remote file
1419 use_incremental: None (leave the decision to this function), True, False
1420 return:
1421 local_file: name of local file
1422 qemu_info: dict with quemu information of local file
1423 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1424 '''
1425
1426 use_incremental_out = use_incremental
1427 new_backing_file = None
1428 local_file = None
1429 file_from_local = True
1430
1431 #in case incremental use is not decided, take the decision depending on the image
1432 #avoid the use of incremental if this image is already incremental
1433 if remote_file[0:4] == "http":
1434 file_from_local = False
1435 if file_from_local:
1436 qemu_remote_info = self.qemu_get_info(remote_file)
1437 if use_incremental_out==None:
1438 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1439 #copy recursivelly the backing files
1440 if file_from_local and 'backing file' in qemu_remote_info:
1441 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1442
1443 #check if remote file is present locally
1444 if use_incremental_out and remote_file in self.localinfo['files']:
1445 local_file = self.localinfo['files'][remote_file]
1446 local_file_info = self.get_file_info(local_file)
1447 if file_from_local:
1448 remote_file_info = self.get_file_info(remote_file)
1449 if local_file_info == None:
1450 local_file = None
1451 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1452 #local copy of file not valid because date or size are different.
1453 #TODO DELETE local file if this file is not used by any active virtual machine
1454 try:
1455 self.delete_file(local_file)
1456 del self.localinfo['files'][remote_file]
1457 except Exception:
1458 pass
1459 local_file = None
1460 else: #check that the local file has the same backing file, or there are not backing at all
1461 qemu_info = self.qemu_get_info(local_file)
1462 if new_backing_file != qemu_info.get('backing file'):
1463 local_file = None
1464
1465
1466 if local_file == None: #copy the file
1467 img_name= remote_file.split('/') [-1]
1468 img_local = self.image_path + '/' + img_name
1469 local_file = self.get_notused_filename(img_local)
1470 self.copy_file(remote_file, local_file, use_incremental_out)
1471
1472 if use_incremental_out:
1473 self.localinfo['files'][remote_file] = local_file
1474 if new_backing_file:
1475 self.qemu_change_backing(local_file, new_backing_file)
1476 qemu_info = self.qemu_get_info(local_file)
1477
1478 return local_file, qemu_info, use_incremental_out
1479
1480 def launch_server(self, conn, server, rebuild=False, domain=None):
1481 if self.test:
1482 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1483 return 0, 'Success'
1484
1485 server_id = server['uuid']
1486 paused = server.get('paused','no')
1487 try:
1488 if domain!=None and rebuild==False:
1489 domain.resume()
1490 #self.server_status[server_id] = 'ACTIVE'
1491 return 0, 'Success'
1492
1493 self.db_lock.acquire()
1494 result, server_data = self.db.get_instance(server_id)
1495 self.db_lock.release()
1496 if result <= 0:
1497 self.logger.error("launch_server ERROR getting server from DB %d %s", result, server_data)
1498 return result, server_data
1499
1500 #0: get image metadata
1501 server_metadata = server.get('metadata', {})
1502 use_incremental = None
1503
1504 if "use_incremental" in server_metadata:
1505 use_incremental = False if server_metadata["use_incremental"] == "no" else True
1506
1507 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1508 if rebuild:
1509 #delete previous incremental files
1510 for file_ in server_host_files.values():
1511 self.delete_file(file_['source file'] )
1512 server_host_files={}
1513
1514 #1: obtain aditional devices (disks)
1515 #Put as first device the main disk
1516 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1517 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1518 devices += server_data['extended']['devices']
1519
1520 for dev in devices:
1521 if dev['image_id'] == None:
1522 continue
1523
1524 self.db_lock.acquire()
1525 result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'),
1526 WHERE={'uuid': dev['image_id']})
1527 self.db_lock.release()
1528 if result <= 0:
1529 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1530 self.logger.error("launch_server " + error_text)
1531 return -1, error_text
1532 if content[0]['metadata'] is not None:
1533 dev['metadata'] = json.loads(content[0]['metadata'])
1534 else:
1535 dev['metadata'] = {}
1536
1537 if dev['image_id'] in server_host_files:
1538 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1539 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1540 continue
1541
1542 #2: copy image to host
1543 remote_file = content[0]['path']
1544 use_incremental_image = use_incremental
1545 if dev['metadata'].get("use_incremental") == "no":
1546 use_incremental_image = False
1547 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1548
1549 #create incremental image
1550 if use_incremental_image:
1551 local_file_inc = self.get_notused_filename(local_file, '.inc')
1552 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1553 self.logger.debug("command: " + command)
1554 (_, _, stderr) = self.ssh_conn.exec_command(command)
1555 error_msg = stderr.read()
1556 if len(error_msg) > 0:
1557 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1558 local_file = local_file_inc
1559 qemu_info = {'file format':'qcow2'}
1560
1561 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1562
1563 dev['source file'] = local_file
1564 dev['file format'] = qemu_info['file format']
1565
1566 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1567 self.localinfo_dirty = True
1568
1569 #3 Create XML
1570 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1571 if result <0:
1572 self.logger.error("create xml server error: " + xml)
1573 return -2, xml
1574 self.logger.debug("create xml: " + xml)
1575 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1576 #4 Start the domain
1577 if not rebuild: #ensures that any pending destroying server is done
1578 self.server_forceoff(True)
1579 #self.logger.debug("launching instance " + xml)
1580 conn.createXML(xml, atribute)
1581 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1582
1583 return 0, 'Success'
1584
1585 except paramiko.ssh_exception.SSHException as e:
1586 text = e.args[0]
1587 self.logger.error("launch_server id='%s' ssh Exception: %s", server_id, text)
1588 if "SSH session not active" in text:
1589 self.ssh_connect()
1590 except host_thread.lvirt_module.libvirtError as e:
1591 text = e.get_error_message()
1592 self.logger.error("launch_server id='%s' libvirt Exception: %s", server_id, text)
1593 except Exception as e:
1594 text = str(e)
1595 self.logger.error("launch_server id='%s' Exception: %s", server_id, text)
1596 return -1, text
1597
1598 def update_servers_status(self):
1599 # # virDomainState
1600 # VIR_DOMAIN_NOSTATE = 0
1601 # VIR_DOMAIN_RUNNING = 1
1602 # VIR_DOMAIN_BLOCKED = 2
1603 # VIR_DOMAIN_PAUSED = 3
1604 # VIR_DOMAIN_SHUTDOWN = 4
1605 # VIR_DOMAIN_SHUTOFF = 5
1606 # VIR_DOMAIN_CRASHED = 6
1607 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1608
1609 if self.test or len(self.server_status)==0:
1610 return
1611
1612 try:
1613 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1614 domains= conn.listAllDomains()
1615 domain_dict={}
1616 for domain in domains:
1617 uuid = domain.UUIDString() ;
1618 libvirt_status = domain.state()
1619 #print libvirt_status
1620 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1621 new_status = "ACTIVE"
1622 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1623 new_status = "PAUSED"
1624 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1625 new_status = "INACTIVE"
1626 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1627 new_status = "ERROR"
1628 else:
1629 new_status = None
1630 domain_dict[uuid] = new_status
1631 conn.close()
1632 except host_thread.lvirt_module.libvirtError as e:
1633 self.logger.error("get_state() Exception " + e.get_error_message())
1634 return
1635
1636 for server_id, current_status in self.server_status.iteritems():
1637 new_status = None
1638 if server_id in domain_dict:
1639 new_status = domain_dict[server_id]
1640 else:
1641 new_status = "INACTIVE"
1642
1643 if new_status == None or new_status == current_status:
1644 continue
1645 if new_status == 'INACTIVE' and current_status == 'ERROR':
1646 continue #keep ERROR status, because obviously this machine is not running
1647 #change status
1648 self.logger.debug("server id='%s' status change from '%s' to '%s'", server_id, current_status, new_status)
1649 STATUS={'progress':100, 'status':new_status}
1650 if new_status == 'ERROR':
1651 STATUS['last_error'] = 'machine has crashed'
1652 self.db_lock.acquire()
1653 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1654 self.db_lock.release()
1655 if r>=0:
1656 self.server_status[server_id] = new_status
1657
1658 def action_on_server(self, req, last_retry=True):
1659 '''Perform an action on a req
1660 Attributes:
1661 req: dictionary that contain:
1662 server properties: 'uuid','name','tenant_id','status'
1663 action: 'action'
1664 host properties: 'user', 'ip_name'
1665 return (error, text)
1666 0: No error. VM is updated to new state,
1667 -1: Invalid action, as trying to pause a PAUSED VM
1668 -2: Error accessing host
1669 -3: VM nor present
1670 -4: Error at DB access
1671 -5: Error while trying to perform action. VM is updated to ERROR
1672 '''
1673 server_id = req['uuid']
1674 conn = None
1675 new_status = None
1676 old_status = req['status']
1677 last_error = None
1678
1679 if self.test:
1680 if 'terminate' in req['action']:
1681 new_status = 'deleted'
1682 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1683 if req['status']!='ERROR':
1684 time.sleep(5)
1685 new_status = 'INACTIVE'
1686 elif 'start' in req['action'] and req['status']!='ERROR':
1687 new_status = 'ACTIVE'
1688 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE':
1689 new_status = 'ACTIVE'
1690 elif 'pause' in req['action'] and req['status']!='ERROR':
1691 new_status = 'PAUSED'
1692 elif 'reboot' in req['action'] and req['status']!='ERROR':
1693 new_status = 'ACTIVE'
1694 elif 'rebuild' in req['action']:
1695 time.sleep(random.randint(20,150))
1696 new_status = 'ACTIVE'
1697 elif 'createImage' in req['action']:
1698 time.sleep(5)
1699 self.create_image(None, req)
1700 else:
1701 try:
1702 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1703 try:
1704 dom = conn.lookupByUUIDString(server_id)
1705 except host_thread.lvirt_module.libvirtError as e:
1706 text = e.get_error_message()
1707 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1708 dom = None
1709 else:
1710 self.logger.error("action_on_server id='%s' libvirt exception: %s", server_id, text)
1711 raise e
1712
1713 if 'forceOff' in req['action']:
1714 if dom == None:
1715 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1716 else:
1717 try:
1718 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1719 dom.destroy()
1720 except Exception as e:
1721 if "domain is not running" not in e.get_error_message():
1722 self.logger.error("action_on_server id='%s' Exception while sending force off: %s",
1723 server_id, e.get_error_message())
1724 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1725 new_status = 'ERROR'
1726
1727 elif 'terminate' in req['action']:
1728 if dom == None:
1729 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1730 new_status = 'deleted'
1731 else:
1732 try:
1733 if req['action']['terminate'] == 'force':
1734 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1735 dom.destroy()
1736 new_status = 'deleted'
1737 else:
1738 self.logger.debug("sending SHUTDOWN to server id='%s'", server_id)
1739 dom.shutdown()
1740 self.pending_terminate_server.append( (time.time()+10,server_id) )
1741 except Exception as e:
1742 self.logger.error("action_on_server id='%s' Exception while destroy: %s",
1743 server_id, e.get_error_message())
1744 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1745 new_status = 'ERROR'
1746 if "domain is not running" in e.get_error_message():
1747 try:
1748 dom.undefine()
1749 new_status = 'deleted'
1750 except Exception:
1751 self.logger.error("action_on_server id='%s' Exception while undefine: %s",
1752 server_id, e.get_error_message())
1753 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1754 #Exception: 'virDomainDetachDevice() failed'
1755 if new_status=='deleted':
1756 if server_id in self.server_status:
1757 del self.server_status[server_id]
1758 if req['uuid'] in self.localinfo['server_files']:
1759 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1760 try:
1761 self.delete_file(file_['source file'])
1762 except Exception:
1763 pass
1764 del self.localinfo['server_files'][ req['uuid'] ]
1765 self.localinfo_dirty = True
1766
1767 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1768 try:
1769 if dom == None:
1770 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1771 else:
1772 dom.shutdown()
1773 # new_status = 'INACTIVE'
1774 #TODO: check status for changing at database
1775 except Exception as e:
1776 new_status = 'ERROR'
1777 self.logger.error("action_on_server id='%s' Exception while shutdown: %s",
1778 server_id, e.get_error_message())
1779 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1780
1781 elif 'rebuild' in req['action']:
1782 if dom != None:
1783 dom.destroy()
1784 r = self.launch_server(conn, req, True, None)
1785 if r[0] <0:
1786 new_status = 'ERROR'
1787 last_error = r[1]
1788 else:
1789 new_status = 'ACTIVE'
1790 elif 'start' in req['action']:
1791 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1792 rebuild = True if req['action']['start'] == 'rebuild' else False
1793 r = self.launch_server(conn, req, rebuild, dom)
1794 if r[0] <0:
1795 new_status = 'ERROR'
1796 last_error = r[1]
1797 else:
1798 new_status = 'ACTIVE'
1799
1800 elif 'resume' in req['action']:
1801 try:
1802 if dom == None:
1803 pass
1804 else:
1805 dom.resume()
1806 # new_status = 'ACTIVE'
1807 except Exception as e:
1808 self.logger.error("action_on_server id='%s' Exception while resume: %s",
1809 server_id, e.get_error_message())
1810
1811 elif 'pause' in req['action']:
1812 try:
1813 if dom == None:
1814 pass
1815 else:
1816 dom.suspend()
1817 # new_status = 'PAUSED'
1818 except Exception as e:
1819 self.logger.error("action_on_server id='%s' Exception while pause: %s",
1820 server_id, e.get_error_message())
1821
1822 elif 'reboot' in req['action']:
1823 try:
1824 if dom == None:
1825 pass
1826 else:
1827 dom.reboot()
1828 self.logger.debug("action_on_server id='%s' reboot:", server_id)
1829 #new_status = 'ACTIVE'
1830 except Exception as e:
1831 self.logger.error("action_on_server id='%s' Exception while reboot: %s",
1832 server_id, e.get_error_message())
1833 elif 'createImage' in req['action']:
1834 self.create_image(dom, req)
1835
1836
1837 conn.close()
1838 except host_thread.lvirt_module.libvirtError as e:
1839 if conn is not None: conn.close()
1840 text = e.get_error_message()
1841 new_status = "ERROR"
1842 last_error = text
1843 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1844 self.logger.debug("action_on_server id='%s' Exception removed from host", server_id)
1845 else:
1846 self.logger.error("action_on_server id='%s' Exception %s", server_id, text)
1847 #end of if self.test
1848 if new_status == None:
1849 return 1
1850
1851 self.logger.debug("action_on_server id='%s' new status=%s %s",server_id, new_status, last_error)
1852 UPDATE = {'progress':100, 'status':new_status}
1853
1854 if new_status=='ERROR':
1855 if not last_retry: #if there will be another retry do not update database
1856 return -1
1857 elif 'terminate' in req['action']:
1858 #PUT a log in the database
1859 self.logger.error("PANIC deleting server id='%s' %s", server_id, last_error)
1860 self.db_lock.acquire()
1861 self.db.new_row('logs',
1862 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1863 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1864 )
1865 self.db_lock.release()
1866 if server_id in self.server_status:
1867 del self.server_status[server_id]
1868 return -1
1869 else:
1870 UPDATE['last_error'] = last_error
1871 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1872 self.db_lock.acquire()
1873 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1874 self.server_status[server_id] = new_status
1875 self.db_lock.release()
1876 if new_status == 'ERROR':
1877 return -1
1878 return 1
1879
1880
1881 def restore_iface(self, name, mac, lib_conn=None):
1882 ''' make an ifdown, ifup to restore default parameter of na interface
1883 Params:
1884 mac: mac address of the interface
1885 lib_conn: connection to the libvirt, if None a new connection is created
1886 Return 0,None if ok, -1,text if fails
1887 '''
1888 conn=None
1889 ret = 0
1890 error_text=None
1891 if self.test:
1892 self.logger.debug("restore_iface '%s' %s", name, mac)
1893 return 0, None
1894 try:
1895 if not lib_conn:
1896 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1897 else:
1898 conn = lib_conn
1899
1900 #wait to the pending VM deletion
1901 #TODO.Revise self.server_forceoff(True)
1902
1903 iface = conn.interfaceLookupByMACString(mac)
1904 if iface.isActive():
1905 iface.destroy()
1906 iface.create()
1907 self.logger.debug("restore_iface '%s' %s", name, mac)
1908 except host_thread.lvirt_module.libvirtError as e:
1909 error_text = e.get_error_message()
1910 self.logger.error("restore_iface '%s' '%s' libvirt exception: %s", name, mac, error_text)
1911 ret=-1
1912 finally:
1913 if lib_conn is None and conn is not None:
1914 conn.close()
1915 return ret, error_text
1916
1917
1918 def create_image(self,dom, req):
1919 if self.test:
1920 if 'path' in req['action']['createImage']:
1921 file_dst = req['action']['createImage']['path']
1922 else:
1923 createImage=req['action']['createImage']
1924 img_name= createImage['source']['path']
1925 index=img_name.rfind('/')
1926 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1927 image_status='ACTIVE'
1928 else:
1929 for retry in (0,1):
1930 try:
1931 server_id = req['uuid']
1932 createImage=req['action']['createImage']
1933 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1934 if 'path' in req['action']['createImage']:
1935 file_dst = req['action']['createImage']['path']
1936 else:
1937 img_name= createImage['source']['path']
1938 index=img_name.rfind('/')
1939 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1940
1941 self.copy_file(file_orig, file_dst)
1942 qemu_info = self.qemu_get_info(file_orig)
1943 if 'backing file' in qemu_info:
1944 for k,v in self.localinfo['files'].items():
1945 if v==qemu_info['backing file']:
1946 self.qemu_change_backing(file_dst, k)
1947 break
1948 image_status='ACTIVE'
1949 break
1950 except paramiko.ssh_exception.SSHException as e:
1951 image_status='ERROR'
1952 error_text = e.args[0]
1953 self.logger.error("create_image id='%s' ssh Exception: %s", server_id, error_text)
1954 if "SSH session not active" in error_text and retry==0:
1955 self.ssh_connect()
1956 except Exception as e:
1957 image_status='ERROR'
1958 error_text = str(e)
1959 self.logger.error("create_image id='%s' Exception: %s", server_id, error_text)
1960
1961 #TODO insert a last_error at database
1962 self.db_lock.acquire()
1963 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1964 {'uuid':req['new_image']['uuid']}, log=True)
1965 self.db_lock.release()
1966
1967 def edit_iface(self, port_id, old_net, new_net):
1968 #This action imply remove and insert interface to put proper parameters
1969 if self.test:
1970 time.sleep(1)
1971 else:
1972 #get iface details
1973 self.db_lock.acquire()
1974 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1975 WHERE={'port_id': port_id})
1976 self.db_lock.release()
1977 if r<0:
1978 self.logger.error("edit_iface %s DDBB error: %s", port_id, c)
1979 return
1980 elif r==0:
1981 self.logger.error("edit_iface %s port not found", port_id)
1982 return
1983 port=c[0]
1984 if port["model"]!="VF":
1985 self.logger.error("edit_iface %s ERROR model must be VF", port_id)
1986 return
1987 #create xml detach file
1988 xml=[]
1989 self.xml_level = 2
1990 xml.append("<interface type='hostdev' managed='yes'>")
1991 xml.append(" <mac address='" +port['mac']+ "'/>")
1992 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1993 xml.append('</interface>')
1994
1995
1996 try:
1997 conn=None
1998 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1999 dom = conn.lookupByUUIDString(port["instance_id"])
2000 if old_net:
2001 text="\n".join(xml)
2002 self.logger.debug("edit_iface detaching SRIOV interface " + text)
2003 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2004 if new_net:
2005 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
2006 self.xml_level = 1
2007 xml.append(self.pci2xml(port.get('vpci',None)) )
2008 xml.append('</interface>')
2009 text="\n".join(xml)
2010 self.logger.debug("edit_iface attaching SRIOV interface " + text)
2011 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2012
2013 except host_thread.lvirt_module.libvirtError as e:
2014 text = e.get_error_message()
2015 self.logger.error("edit_iface %s libvirt exception: %s", port["instance_id"], text)
2016
2017 finally:
2018 if conn is not None: conn.close()
2019
2020
2021 def create_server(server, db, db_lock, only_of_ports):
2022 extended = server.get('extended', None)
2023 requirements={}
2024 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
2025 requirements['ram'] = server['flavor'].get('ram', 0)
2026 if requirements['ram']== None:
2027 requirements['ram'] = 0
2028 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
2029 if requirements['vcpus']== None:
2030 requirements['vcpus'] = 0
2031 #If extended is not defined get requirements from flavor
2032 if extended is None:
2033 #If extended is defined in flavor convert to dictionary and use it
2034 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
2035 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
2036 extended = json.loads(json_acceptable_string)
2037 else:
2038 extended = None
2039 #print json.dumps(extended, indent=4)
2040
2041 #For simplicity only one numa VM are supported in the initial implementation
2042 if extended != None:
2043 numas = extended.get('numas', [])
2044 if len(numas)>1:
2045 return (-2, "Multi-NUMA VMs are not supported yet")
2046 #elif len(numas)<1:
2047 # return (-1, "At least one numa must be specified")
2048
2049 #a for loop is used in order to be ready to multi-NUMA VMs
2050 request = []
2051 for numa in numas:
2052 numa_req = {}
2053 numa_req['memory'] = numa.get('memory', 0)
2054 if 'cores' in numa:
2055 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
2056 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
2057 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
2058 elif 'paired-threads' in numa:
2059 numa_req['proc_req_nb'] = numa['paired-threads']
2060 numa_req['proc_req_type'] = 'paired-threads'
2061 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
2062 elif 'threads' in numa:
2063 numa_req['proc_req_nb'] = numa['threads']
2064 numa_req['proc_req_type'] = 'threads'
2065 numa_req['proc_req_list'] = numa.get('threads-id', None)
2066 else:
2067 numa_req['proc_req_nb'] = 0 # by default
2068 numa_req['proc_req_type'] = 'threads'
2069
2070
2071
2072 #Generate a list of sriov and another for physical interfaces
2073 interfaces = numa.get('interfaces', [])
2074 sriov_list = []
2075 port_list = []
2076 for iface in interfaces:
2077 iface['bandwidth'] = int(iface['bandwidth'])
2078 if iface['dedicated'][:3]=='yes':
2079 port_list.append(iface)
2080 else:
2081 sriov_list.append(iface)
2082
2083 #Save lists ordered from more restrictive to less bw requirements
2084 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
2085 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
2086
2087
2088 request.append(numa_req)
2089
2090 # print "----------\n"+json.dumps(request[0], indent=4)
2091 # print '----------\n\n'
2092
2093 #Search in db for an appropriate numa for each requested numa
2094 #at the moment multi-NUMA VMs are not supported
2095 if len(request)>0:
2096 requirements['numa'].update(request[0])
2097 if requirements['numa']['memory']>0:
2098 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2099 elif requirements['ram']==0:
2100 return (-1, "Memory information not set neither at extended field not at ram")
2101 if requirements['numa']['proc_req_nb']>0:
2102 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2103 elif requirements['vcpus']==0:
2104 return (-1, "Processor information not set neither at extended field not at vcpus")
2105
2106
2107 db_lock.acquire()
2108 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
2109 db_lock.release()
2110
2111 if result == -1:
2112 return (-1, content)
2113
2114 numa_id = content['numa_id']
2115 host_id = content['host_id']
2116
2117 #obtain threads_id and calculate pinning
2118 cpu_pinning = []
2119 reserved_threads=[]
2120 if requirements['numa']['proc_req_nb']>0:
2121 db_lock.acquire()
2122 result, content = db.get_table(FROM='resources_core',
2123 SELECT=('id','core_id','thread_id'),
2124 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
2125 db_lock.release()
2126 if result <= 0:
2127 #print content
2128 return -1, content
2129
2130 #convert rows to a dictionary indexed by core_id
2131 cores_dict = {}
2132 for row in content:
2133 if not row['core_id'] in cores_dict:
2134 cores_dict[row['core_id']] = []
2135 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
2136
2137 #In case full cores are requested
2138 paired = 'N'
2139 if requirements['numa']['proc_req_type'] == 'cores':
2140 #Get/create the list of the vcpu_ids
2141 vcpu_id_list = requirements['numa']['proc_req_list']
2142 if vcpu_id_list == None:
2143 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2144
2145 for threads in cores_dict.itervalues():
2146 #we need full cores
2147 if len(threads) != 2:
2148 continue
2149
2150 #set pinning for the first thread
2151 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
2152
2153 #reserve so it is not used the second thread
2154 reserved_threads.append(threads[1][1])
2155
2156 if len(vcpu_id_list) == 0:
2157 break
2158
2159 #In case paired threads are requested
2160 elif requirements['numa']['proc_req_type'] == 'paired-threads':
2161 paired = 'Y'
2162 #Get/create the list of the vcpu_ids
2163 if requirements['numa']['proc_req_list'] != None:
2164 vcpu_id_list = []
2165 for pair in requirements['numa']['proc_req_list']:
2166 if len(pair)!=2:
2167 return -1, "Field paired-threads-id not properly specified"
2168 return
2169 vcpu_id_list.append(pair[0])
2170 vcpu_id_list.append(pair[1])
2171 else:
2172 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
2173
2174 for threads in cores_dict.itervalues():
2175 #we need full cores
2176 if len(threads) != 2:
2177 continue
2178 #set pinning for the first thread
2179 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2180
2181 #set pinning for the second thread
2182 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2183
2184 if len(vcpu_id_list) == 0:
2185 break
2186
2187 #In case normal threads are requested
2188 elif requirements['numa']['proc_req_type'] == 'threads':
2189 #Get/create the list of the vcpu_ids
2190 vcpu_id_list = requirements['numa']['proc_req_list']
2191 if vcpu_id_list == None:
2192 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2193
2194 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
2195 threads = cores_dict[threads_index]
2196 #set pinning for the first thread
2197 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2198
2199 #if exists, set pinning for the second thread
2200 if len(threads) == 2 and len(vcpu_id_list) != 0:
2201 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2202
2203 if len(vcpu_id_list) == 0:
2204 break
2205
2206 #Get the source pci addresses for the selected numa
2207 used_sriov_ports = []
2208 for port in requirements['numa']['sriov_list']:
2209 db_lock.acquire()
2210 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2211 db_lock.release()
2212 if result <= 0:
2213 #print content
2214 return -1, content
2215 for row in content:
2216 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2217 continue
2218 port['pci'] = row['pci']
2219 if 'mac_address' not in port:
2220 port['mac_address'] = row['mac']
2221 del port['mac']
2222 port['port_id']=row['id']
2223 port['Mbps_used'] = port['bandwidth']
2224 used_sriov_ports.append(row['id'])
2225 break
2226
2227 for port in requirements['numa']['port_list']:
2228 port['Mbps_used'] = None
2229 if port['dedicated'] != "yes:sriov":
2230 port['mac_address'] = port['mac']
2231 del port['mac']
2232 continue
2233 db_lock.acquire()
2234 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2235 db_lock.release()
2236 if result <= 0:
2237 #print content
2238 return -1, content
2239 port['Mbps_used'] = content[0]['Mbps']
2240 for row in content:
2241 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2242 continue
2243 port['pci'] = row['pci']
2244 if 'mac_address' not in port:
2245 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
2246 del port['mac']
2247 port['port_id']=row['id']
2248 used_sriov_ports.append(row['id'])
2249 break
2250
2251 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2252 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2253
2254 server['host_id'] = host_id
2255
2256 #Generate dictionary for saving in db the instance resources
2257 resources = {}
2258 resources['bridged-ifaces'] = []
2259
2260 numa_dict = {}
2261 numa_dict['interfaces'] = []
2262
2263 numa_dict['interfaces'] += requirements['numa']['port_list']
2264 numa_dict['interfaces'] += requirements['numa']['sriov_list']
2265
2266 #Check bridge information
2267 unified_dataplane_iface=[]
2268 unified_dataplane_iface += requirements['numa']['port_list']
2269 unified_dataplane_iface += requirements['numa']['sriov_list']
2270
2271 for control_iface in server.get('networks', []):
2272 control_iface['net_id']=control_iface.pop('uuid')
2273 #Get the brifge name
2274 db_lock.acquire()
2275 result, content = db.get_table(FROM='nets',
2276 SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2277 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2278 WHERE={'uuid': control_iface['net_id']})
2279 db_lock.release()
2280 if result < 0:
2281 pass
2282 elif result==0:
2283 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
2284 else:
2285 network=content[0]
2286 if control_iface.get("type", 'virtual') == 'virtual':
2287 if network['type']!='bridge_data' and network['type']!='bridge_man':
2288 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
2289 resources['bridged-ifaces'].append(control_iface)
2290 if network.get("provider") and network["provider"][0:3] == "OVS":
2291 control_iface["type"] = "instance:ovs"
2292 else:
2293 control_iface["type"] = "instance:bridge"
2294 if network.get("vlan"):
2295 control_iface["vlan"] = network["vlan"]
2296
2297 if network.get("enable_dhcp") == 'true':
2298 control_iface["enable_dhcp"] = network.get("enable_dhcp")
2299 control_iface["dhcp_first_ip"] = network["dhcp_first_ip"]
2300 control_iface["dhcp_last_ip"] = network["dhcp_last_ip"]
2301 control_iface["cidr"] = network["cidr"]
2302 else:
2303 if network['type']!='data' and network['type']!='ptp':
2304 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
2305 #dataplane interface, look for it in the numa tree and asign this network
2306 iface_found=False
2307 for dataplane_iface in numa_dict['interfaces']:
2308 if dataplane_iface['name'] == control_iface.get("name"):
2309 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
2310 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
2311 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
2312 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2313 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
2314 dataplane_iface['uuid'] = control_iface['net_id']
2315 if dataplane_iface['dedicated'] == "no":
2316 dataplane_iface['vlan'] = network['vlan']
2317 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
2318 dataplane_iface['mac_address'] = control_iface.get("mac_address")
2319 if control_iface.get("vpci"):
2320 dataplane_iface['vpci'] = control_iface.get("vpci")
2321 iface_found=True
2322 break
2323 if not iface_found:
2324 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
2325
2326 resources['host_id'] = host_id
2327 resources['image_id'] = server['image_id']
2328 resources['flavor_id'] = server['flavor_id']
2329 resources['tenant_id'] = server['tenant_id']
2330 resources['ram'] = requirements['ram']
2331 resources['vcpus'] = requirements['vcpus']
2332 resources['status'] = 'CREATING'
2333
2334 if 'description' in server: resources['description'] = server['description']
2335 if 'name' in server: resources['name'] = server['name']
2336
2337 resources['extended'] = {} #optional
2338 resources['extended']['numas'] = []
2339 numa_dict['numa_id'] = numa_id
2340 numa_dict['memory'] = requirements['numa']['memory']
2341 numa_dict['cores'] = []
2342
2343 for core in cpu_pinning:
2344 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
2345 for core in reserved_threads:
2346 numa_dict['cores'].append({'id': core})
2347 resources['extended']['numas'].append(numa_dict)
2348 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
2349 resources['extended']['devices'] = extended['devices']
2350
2351
2352 # '===================================={'
2353 #print json.dumps(resources, indent=4)
2354 #print '====================================}'
2355
2356 return 0, resources
2357