1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__
= "$10-jul-2014 12:07:15$"
37 from jsonschema
import validate
as js_v
, exceptions
as js_e
40 from vim_schema
import localinfo_schema
, hostinfo_schema
42 #from logging import Logger
43 #import auxiliary_functions as af
45 #TODO: insert a logging system
49 #lvirt_module=None #libvirt module is charged only if not in test mode
51 class host_thread(threading
.Thread
):
53 def __init__(self
, name
, host
, user
, db
, db_lock
, test
, image_path
, host_id
, version
, develop_mode
, develop_bridge_iface
):
58 'host','user': host ip or name to manage and user
59 'db', 'db_lock': database class and lock to use it in exclusion
61 threading
.Thread
.__init
__(self
)
66 self
.db_lock
= db_lock
69 if not test
and not host_thread
.lvirt_module
:
71 module_info
= imp
.find_module("libvirt")
72 host_thread
.lvirt_module
= imp
.load_module("libvirt", *module_info
)
73 except (IOError, ImportError) as e
:
74 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e
))
77 self
.develop_mode
= develop_mode
78 self
.develop_bridge_iface
= develop_bridge_iface
79 self
.image_path
= image_path
80 self
.host_id
= host_id
81 self
.version
= version
86 self
.server_status
= {} #dictionary with pairs server_uuid:server_status
87 self
.pending_terminate_server
=[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
88 self
.next_update_server_status
= 0 #time when must be check servers status
92 self
.queueLock
= threading
.Lock()
93 self
.taskQueue
= Queue
.Queue(2000)
96 def ssh_connect(self
):
99 self
.ssh_conn
= paramiko
.SSHClient()
100 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
101 self
.ssh_conn
.load_system_host_keys()
102 self
.ssh_conn
.connect(self
.host
, username
=self
.user
, timeout
=10) #, None)
103 except paramiko
.ssh_exception
.SSHException
as e
:
105 print self
.name
, ": ssh_connect ssh Exception:", text
107 def load_localinfo(self
):
113 command
= 'mkdir -p ' + self
.image_path
114 #print self.name, ': command:', command
115 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
116 content
= stderr
.read()
118 print self
.name
, ': command:', command
, "stderr:", content
120 command
= 'cat ' + self
.image_path
+ '/.openvim.yaml'
121 #print self.name, ': command:', command
122 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
123 content
= stdout
.read()
124 if len(content
) == 0:
125 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
126 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
127 self
.localinfo
= yaml
.load(content
)
128 js_v(self
.localinfo
, localinfo_schema
)
129 self
.localinfo_dirty
=False
130 if 'server_files' not in self
.localinfo
:
131 self
.localinfo
['server_files'] = {}
132 print self
.name
, ': localinfo load from host'
135 except paramiko
.ssh_exception
.SSHException
as e
:
137 print self
.name
, ": load_localinfo ssh Exception:", text
138 except host_thread
.lvirt_module
.libvirtError
as e
:
139 text
= e
.get_error_message()
140 print self
.name
, ": load_localinfo libvirt Exception:", text
141 except yaml
.YAMLError
as exc
:
143 if hasattr(exc
, 'problem_mark'):
144 mark
= exc
.problem_mark
145 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
146 print self
.name
, ": load_localinfo yaml format Exception", text
147 except js_e
.ValidationError
as e
:
149 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
150 print self
.name
, ": load_localinfo format Exception:", text
, e
.message
151 except Exception as e
:
153 print self
.name
, ": load_localinfo Exception:", text
155 #not loaded, insert a default data and force saving by activating dirty flag
156 self
.localinfo
= {'files':{}, 'server_files':{} }
157 #self.localinfo_dirty=True
158 self
.localinfo_dirty
=False
160 def load_hostinfo(self
):
168 command
= 'cat ' + self
.image_path
+ '/hostinfo.yaml'
169 #print self.name, ': command:', command
170 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
171 content
= stdout
.read()
172 if len(content
) == 0:
173 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
174 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
175 self
.hostinfo
= yaml
.load(content
)
176 js_v(self
.hostinfo
, hostinfo_schema
)
177 print self
.name
, ': hostlinfo load from host', self
.hostinfo
180 except paramiko
.ssh_exception
.SSHException
as e
:
182 print self
.name
, ": load_hostinfo ssh Exception:", text
183 except host_thread
.lvirt_module
.libvirtError
as e
:
184 text
= e
.get_error_message()
185 print self
.name
, ": load_hostinfo libvirt Exception:", text
186 except yaml
.YAMLError
as exc
:
188 if hasattr(exc
, 'problem_mark'):
189 mark
= exc
.problem_mark
190 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
191 print self
.name
, ": load_hostinfo yaml format Exception", text
192 except js_e
.ValidationError
as e
:
194 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
195 print self
.name
, ": load_hostinfo format Exception:", text
, e
.message
196 except Exception as e
:
198 print self
.name
, ": load_hostinfo Exception:", text
200 #not loaded, insert a default data
203 def save_localinfo(self
, tries
=3):
205 self
.localinfo_dirty
= False
212 command
= 'cat > ' + self
.image_path
+ '/.openvim.yaml'
213 print self
.name
, ': command:', command
214 (stdin
, _
, _
) = self
.ssh_conn
.exec_command(command
)
215 yaml
.safe_dump(self
.localinfo
, stdin
, explicit_start
=True, indent
=4, default_flow_style
=False, tags
=False, encoding
='utf-8', allow_unicode
=True)
216 self
.localinfo_dirty
= False
219 except paramiko
.ssh_exception
.SSHException
as e
:
221 print self
.name
, ": save_localinfo ssh Exception:", text
222 if "SSH session not active" in text
:
224 except host_thread
.lvirt_module
.libvirtError
as e
:
225 text
= e
.get_error_message()
226 print self
.name
, ": save_localinfo libvirt Exception:", text
227 except yaml
.YAMLError
as exc
:
229 if hasattr(exc
, 'problem_mark'):
230 mark
= exc
.problem_mark
231 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
232 print self
.name
, ": save_localinfo yaml format Exception", text
233 except Exception as e
:
235 print self
.name
, ": save_localinfo Exception:", text
237 def load_servers_from_db(self
):
238 self
.db_lock
.acquire()
239 r
,c
= self
.db
.get_table(SELECT
=('uuid','status', 'image_id'), FROM
='instances', WHERE
={'host_id': self
.host_id
})
240 self
.db_lock
.release()
242 self
.server_status
= {}
244 print self
.name
, ": Error getting data from database:", c
247 self
.server_status
[ server
['uuid'] ] = server
['status']
249 #convert from old version to new one
250 if 'inc_files' in self
.localinfo
and server
['uuid'] in self
.localinfo
['inc_files']:
251 server_files_dict
= {'source file': self
.localinfo
['inc_files'][ server
['uuid'] ] [0], 'file format':'raw' }
252 if server_files_dict
['source file'][-5:] == 'qcow2':
253 server_files_dict
['file format'] = 'qcow2'
255 self
.localinfo
['server_files'][ server
['uuid'] ] = { server
['image_id'] : server_files_dict
}
256 if 'inc_files' in self
.localinfo
:
257 del self
.localinfo
['inc_files']
258 self
.localinfo_dirty
= True
260 def delete_unused_files(self
):
261 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
262 Deletes unused entries at self.loacalinfo and the corresponding local files.
263 The only reason for this mismatch is the manual deletion of instances (VM) at database
267 for uuid
,images
in self
.localinfo
['server_files'].items():
268 if uuid
not in self
.server_status
:
269 for localfile
in images
.values():
271 print self
.name
, ": deleting file '%s' of unused server '%s'" %(localfile
['source file'], uuid
)
272 self
.delete_file(localfile
['source file'])
273 except paramiko
.ssh_exception
.SSHException
as e
:
274 print self
.name
, ": Exception deleting file '%s': %s" %(localfile
['source file'], str(e
))
275 del self
.localinfo
['server_files'][uuid
]
276 self
.localinfo_dirty
= True
278 def insert_task(self
, task
, *aditional
):
280 self
.queueLock
.acquire()
281 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
282 self
.queueLock
.release()
285 return -1, "timeout inserting a task over host " + self
.name
289 self
.load_localinfo()
291 self
.load_servers_from_db()
292 self
.delete_unused_files()
294 self
.queueLock
.acquire()
295 if not self
.taskQueue
.empty():
296 task
= self
.taskQueue
.get()
299 self
.queueLock
.release()
303 if self
.localinfo_dirty
:
304 self
.save_localinfo()
305 elif self
.next_update_server_status
< now
:
306 self
.update_servers_status()
307 self
.next_update_server_status
= now
+ 5
308 elif len(self
.pending_terminate_server
)>0 and self
.pending_terminate_server
[0][0]<now
:
309 self
.server_forceoff()
314 if task
[0] == 'instance':
315 print self
.name
, ": processing task instance", task
[1]['action']
319 r
=self
.action_on_server(task
[1], retry
==2)
322 elif task
[0] == 'image':
324 elif task
[0] == 'exit':
325 print self
.name
, ": processing task exit"
328 elif task
[0] == 'reload':
329 print self
.name
, ": processing task reload terminating and relaunching"
332 elif task
[0] == 'edit-iface':
333 print self
.name
, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task
[1], task
[2], task
[3])
334 self
.edit_iface(task
[1], task
[2], task
[3])
335 elif task
[0] == 'restore-iface':
336 print self
.name
, ": processing task restore-iface %s mac=%s" % (task
[1], task
[2])
337 self
.restore_iface(task
[1], task
[2])
338 elif task
[0] == 'new-ovsbridge':
339 print self
.name
, ": Creating compute OVS bridge"
340 self
.create_ovs_bridge()
342 elif task
[0] == 'new-vxlan':
343 print self
.name
, ": Creating vxlan tunnel=" + task
[1] + ", remote ip=" + task
[2]
344 self
.create_ovs_vxlan_tunnel(task
[1], task
[2])
346 elif task
[0] == 'del-ovsbridge':
347 print self
.name
, ": Deleting OVS bridge"
348 self
.delete_ovs_bridge()
350 elif task
[0] == 'del-vxlan':
351 print self
.name
, ": Deleting vxlan " + task
[1] + " tunnel"
352 self
.delete_ovs_vxlan_tunnel(task
[1])
354 elif task
[0] == 'create-ovs-bridge-port':
355 print self
.name
, ": Adding port ovim-" + task
[1] + " to OVS bridge"
356 self
.create_ovs_bridge_port(task
[1])
357 elif task
[0] == 'del-ovs-port':
358 self
.delete_bridge_port_attached_to_ovs(task
[1], task
[2])
360 print self
.name
, ": unknown task", task
362 def server_forceoff(self
, wait_until_finished
=False):
363 while len(self
.pending_terminate_server
)>0:
365 if self
.pending_terminate_server
[0][0]>now
:
366 if wait_until_finished
:
371 req
={'uuid':self
.pending_terminate_server
[0][1],
372 'action':{'terminate':'force'},
375 self
.action_on_server(req
)
376 self
.pending_terminate_server
.pop(0)
380 self
.server_forceoff(True)
381 if self
.localinfo_dirty
:
382 self
.save_localinfo()
384 self
.ssh_conn
.close()
385 except Exception as e
:
387 print self
.name
, ": terminate Exception:", text
388 print self
.name
, ": exit from host_thread"
390 def get_local_iface_name(self
, generic_name
):
391 if self
.hostinfo
!= None and "iface_names" in self
.hostinfo
and generic_name
in self
.hostinfo
["iface_names"]:
392 return self
.hostinfo
["iface_names"][generic_name
]
395 def create_xml_server(self
, server
, dev_list
, server_metadata
={}):
396 """Function that implements the generation of the VM XML definition.
397 Additional devices are in dev_list list
398 The main disk is upon dev_list[0]"""
400 #get if operating system is Windows
402 os_type
= server_metadata
.get('os_type', None)
403 if os_type
== None and 'metadata' in dev_list
[0]:
404 os_type
= dev_list
[0]['metadata'].get('os_type', None)
405 if os_type
!= None and os_type
.lower() == "windows":
407 #get type of hard disk bus
408 bus_ide
= True if windows_os
else False
409 bus
= server_metadata
.get('bus', None)
410 if bus
== None and 'metadata' in dev_list
[0]:
411 bus
= dev_list
[0]['metadata'].get('bus', None)
413 bus_ide
= True if bus
=='ide' else False
417 text
= "<domain type='kvm'>"
419 topo
= server_metadata
.get('topology', None)
420 if topo
== None and 'metadata' in dev_list
[0]:
421 topo
= dev_list
[0]['metadata'].get('topology', None)
423 name
= server
.get('name','') + "_" + server
['uuid']
424 name
= name
[:58] #qemu impose a length limit of 59 chars or not start. Using 58
425 text
+= self
.inc_tab() + "<name>" + name
+ "</name>"
427 text
+= self
.tab() + "<uuid>" + server
['uuid'] + "</uuid>"
430 if 'extended' in server
and server
['extended']!=None and 'numas' in server
['extended']:
431 numa
= server
['extended']['numas'][0]
434 memory
= int(numa
.get('memory',0))*1024*1024 #in KiB
436 memory
= int(server
['ram'])*1024;
438 if not self
.develop_mode
:
441 return -1, 'No memory assigned to instance'
443 text
+= self
.tab() + "<memory unit='KiB'>" +memory
+"</memory>"
444 text
+= self
.tab() + "<currentMemory unit='KiB'>" +memory
+ "</currentMemory>"
446 text
+= self
.tab()+'<memoryBacking>'+ \
447 self
.inc_tab() + '<hugepages/>'+ \
448 self
.dec_tab()+ '</memoryBacking>'
451 use_cpu_pinning
=False
452 vcpus
= int(server
.get("vcpus",0))
454 if 'cores-source' in numa
:
456 for index
in range(0, len(numa
['cores-source'])):
457 cpu_pinning
.append( [ numa
['cores-id'][index
], numa
['cores-source'][index
] ] )
459 if 'threads-source' in numa
:
461 for index
in range(0, len(numa
['threads-source'])):
462 cpu_pinning
.append( [ numa
['threads-id'][index
], numa
['threads-source'][index
] ] )
464 if 'paired-threads-source' in numa
:
466 for index
in range(0, len(numa
['paired-threads-source'])):
467 cpu_pinning
.append( [numa
['paired-threads-id'][index
][0], numa
['paired-threads-source'][index
][0] ] )
468 cpu_pinning
.append( [numa
['paired-threads-id'][index
][1], numa
['paired-threads-source'][index
][1] ] )
471 if use_cpu_pinning
and not self
.develop_mode
:
472 text
+= self
.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning
)) +"</vcpu>" + \
473 self
.tab()+'<cputune>'
475 for i
in range(0, len(cpu_pinning
)):
476 text
+= self
.tab() + "<vcpupin vcpu='" +str(cpu_pinning
[i
][0])+ "' cpuset='" +str(cpu_pinning
[i
][1]) +"'/>"
477 text
+= self
.dec_tab()+'</cputune>'+ \
478 self
.tab() + '<numatune>' +\
479 self
.inc_tab() + "<memory mode='strict' nodeset='" +str(numa
['source'])+ "'/>" +\
480 self
.dec_tab() + '</numatune>'
483 return -1, "Instance without number of cpus"
484 text
+= self
.tab()+"<vcpu>" + str(vcpus
) + "</vcpu>"
489 if dev
['type']=='cdrom' :
492 text
+= self
.tab()+ '<os>' + \
493 self
.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
495 text
+= self
.tab() + "<boot dev='cdrom'/>"
496 text
+= self
.tab() + "<boot dev='hd'/>" + \
497 self
.dec_tab()+'</os>'
499 text
+= self
.tab()+'<features>'+\
500 self
.inc_tab()+'<acpi/>' +\
501 self
.tab()+'<apic/>' +\
502 self
.tab()+'<pae/>'+ \
503 self
.dec_tab() +'</features>'
504 if windows_os
or topo
=="oneSocket":
505 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
507 text
+= self
.tab() + "<cpu mode='host-model'></cpu>"
508 text
+= self
.tab() + "<clock offset='utc'/>" +\
509 self
.tab() + "<on_poweroff>preserve</on_poweroff>" + \
510 self
.tab() + "<on_reboot>restart</on_reboot>" + \
511 self
.tab() + "<on_crash>restart</on_crash>"
512 text
+= self
.tab() + "<devices>" + \
513 self
.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
514 self
.tab() + "<serial type='pty'>" +\
515 self
.inc_tab() + "<target port='0'/>" + \
516 self
.dec_tab() + "</serial>" +\
517 self
.tab() + "<console type='pty'>" + \
518 self
.inc_tab()+ "<target type='serial' port='0'/>" + \
519 self
.dec_tab()+'</console>'
521 text
+= self
.tab() + "<controller type='usb' index='0'/>" + \
522 self
.tab() + "<controller type='ide' index='0'/>" + \
523 self
.tab() + "<input type='mouse' bus='ps2'/>" + \
524 self
.tab() + "<sound model='ich6'/>" + \
525 self
.tab() + "<video>" + \
526 self
.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
527 self
.dec_tab() + "</video>" + \
528 self
.tab() + "<memballoon model='virtio'/>" + \
529 self
.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
531 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
532 #> self.dec_tab()+'</hostdev>\n' +\
533 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
535 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
537 #If image contains 'GRAPH' include graphics
538 #if 'GRAPH' in image:
539 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
540 self
.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
541 self
.dec_tab() + "</graphics>"
545 bus_ide_dev
= bus_ide
546 if dev
['type']=='cdrom' or dev
['type']=='disk':
547 if dev
['type']=='cdrom':
549 text
+= self
.tab() + "<disk type='file' device='"+dev
['type']+"'>"
550 if 'file format' in dev
:
551 text
+= self
.inc_tab() + "<driver name='qemu' type='" +dev
['file format']+ "' cache='writethrough'/>"
552 if 'source file' in dev
:
553 text
+= self
.tab() + "<source file='" +dev
['source file']+ "'/>"
554 #elif v['type'] == 'block':
555 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
557 # return -1, 'Unknown disk type ' + v['type']
558 vpci
= dev
.get('vpci',None)
560 vpci
= dev
['metadata'].get('vpci',None)
561 text
+= self
.pci2xml(vpci
)
564 text
+= self
.tab() + "<target dev='hd" +vd_index
+ "' bus='ide'/>" #TODO allows several type of disks
566 text
+= self
.tab() + "<target dev='vd" +vd_index
+ "' bus='virtio'/>"
567 text
+= self
.dec_tab() + '</disk>'
568 vd_index
= chr(ord(vd_index
)+1)
569 elif dev
['type']=='xml':
570 dev_text
= dev
['xml']
572 dev_text
= dev_text
.replace('__vpci__', dev
['vpci'])
573 if 'source file' in dev
:
574 dev_text
= dev_text
.replace('__file__', dev
['source file'])
575 if 'file format' in dev
:
576 dev_text
= dev_text
.replace('__format__', dev
['source file'])
577 if '__dev__' in dev_text
:
578 dev_text
= dev_text
.replace('__dev__', vd_index
)
579 vd_index
= chr(ord(vd_index
)+1)
582 return -1, 'Unknown device type ' + dev
['type']
585 bridge_interfaces
= server
.get('networks', [])
586 for v
in bridge_interfaces
:
588 self
.db_lock
.acquire()
589 result
, content
= self
.db
.get_table(FROM
='nets', SELECT
=('provider',),WHERE
={'uuid':v
['net_id']} )
590 self
.db_lock
.release()
592 print "create_xml_server ERROR getting nets",result
, content
594 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
595 #I know it is not secure
596 #for v in sorted(desc['network interfaces'].itervalues()):
597 model
= v
.get("model", None)
598 if content
[0]['provider']=='default':
599 text
+= self
.tab() + "<interface type='network'>" + \
600 self
.inc_tab() + "<source network='" +content
[0]['provider']+ "'/>"
601 elif content
[0]['provider'][0:7]=='macvtap':
602 text
+= self
.tab()+"<interface type='direct'>" + \
603 self
.inc_tab() + "<source dev='" + self
.get_local_iface_name(content
[0]['provider'][8:]) + "' mode='bridge'/>" + \
604 self
.tab() + "<target dev='macvtap0'/>"
606 text
+= self
.tab() + "<alias name='net" + str(net_nb
) + "'/>"
609 elif content
[0]['provider'][0:6]=='bridge':
610 text
+= self
.tab() + "<interface type='bridge'>" + \
611 self
.inc_tab()+"<source bridge='" +self
.get_local_iface_name(content
[0]['provider'][7:])+ "'/>"
613 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
614 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
617 elif content
[0]['provider'][0:3] == "OVS":
618 vlan
= content
[0]['provider'].replace('OVS:', '')
619 text
+= self
.tab() + "<interface type='bridge'>" + \
620 self
.inc_tab() + "<source bridge='ovim-" + vlan
+ "'/>"
622 return -1, 'Unknown Bridge net provider ' + content
[0]['provider']
624 text
+= self
.tab() + "<model type='" +model
+ "'/>"
625 if v
.get('mac_address', None) != None:
626 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
627 text
+= self
.pci2xml(v
.get('vpci',None))
628 text
+= self
.dec_tab()+'</interface>'
632 interfaces
= numa
.get('interfaces', [])
636 if self
.develop_mode
: #map these interfaces to bridges
637 text
+= self
.tab() + "<interface type='bridge'>" + \
638 self
.inc_tab()+"<source bridge='" +self
.develop_bridge_iface
+ "'/>"
640 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
641 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
643 text
+= self
.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
644 if v
.get('mac_address', None) != None:
645 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
646 text
+= self
.pci2xml(v
.get('vpci',None))
647 text
+= self
.dec_tab()+'</interface>'
650 if v
['dedicated'] == 'yes': #passthrought
651 text
+= self
.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
652 self
.inc_tab() + "<source>"
654 text
+= self
.pci2xml(v
['source'])
655 text
+= self
.dec_tab()+'</source>'
656 text
+= self
.pci2xml(v
.get('vpci',None))
658 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
659 text
+= self
.dec_tab()+'</hostdev>'
661 else: #sriov_interfaces
662 #skip not connected interfaces
663 if v
.get("net_id") == None:
665 text
+= self
.tab() + "<interface type='hostdev' managed='yes'>"
667 if v
.get('mac_address', None) != None:
668 text
+= self
.tab() + "<mac address='" +v
['mac_address']+ "'/>"
669 text
+= self
.tab()+'<source>'
671 text
+= self
.pci2xml(v
['source'])
672 text
+= self
.dec_tab()+'</source>'
673 if v
.get('vlan',None) != None:
674 text
+= self
.tab() + "<vlan> <tag id='" + str(v
['vlan']) + "'/> </vlan>"
675 text
+= self
.pci2xml(v
.get('vpci',None))
677 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
678 text
+= self
.dec_tab()+'</interface>'
681 text
+= self
.dec_tab()+'</devices>'+\
682 self
.dec_tab()+'</domain>'
685 def pci2xml(self
, pci
):
686 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
687 alows an empty pci text'''
690 first_part
= pci
.split(':')
691 second_part
= first_part
[2].split('.')
692 return self
.tab() + "<address type='pci' domain='0x" + first_part
[0] + \
693 "' bus='0x" + first_part
[1] + "' slot='0x" + second_part
[0] + \
694 "' function='0x" + second_part
[1] + "'/>"
697 """Return indentation according to xml_level"""
698 return "\n" + (' '*self
.xml_level
)
701 """Increment and return indentation according to xml_level"""
706 """Decrement and return indentation according to xml_level"""
710 def create_ovs_bridge(self
):
712 Create a bridge in compute OVS to allocate VMs
713 :return: True if success
717 command
= 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
718 print self
.name
, ': command:', command
719 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
720 content
= stdout
.read()
721 if len(content
) == 0:
726 def delete_port_to_ovs_bridge(self
, vlan
, net_uuid
):
728 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
729 :param vlan: vlan port id
730 :param net_uuid: network id
734 command
= 'sudo ovs-vsctl del-port br-int ovim-' + vlan
735 print self
.name
, ': command:', command
736 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
737 content
= stdout
.read()
738 if len(content
) == 0:
743 def is_port_free(self
, vlan
, net_uuid
):
745 Check if there not ovs ports of a network in a compute host.
746 :param vlan: vlan port id
747 :param net_uuid: network id
748 :return: True if is not free
750 self
.db_lock
.acquire()
751 result
, content
= self
.db
.get_table(
752 FROM
='ports as p join instances as i on p.instance_id=i.uuid',
753 WHERE
={"i.host_id": self
.host_id
, 'p.type': 'instance:ovs', 'p.net_id': net_uuid
}
755 self
.db_lock
.release()
762 def add_port_to_ovs_bridge(self
, vlan
):
764 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
765 :param vlan: vlan port id
768 command
= 'sudo ovs-vsctl add-port br-int ovim-' + vlan
+ ' tag=' + vlan
769 print self
.name
, ': command:', command
770 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
771 content
= stdout
.read()
772 if len(content
) == 0:
777 def delete_bridge_port_attached_to_ovs(self
, vlan
, net_uuid
):
779 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
782 :return: True if success
786 if not self
.is_port_free(vlan
, net_uuid
):
788 self
.delete_port_to_ovs_bridge(vlan
, net_uuid
)
789 self
.delete_linux_bridge(vlan
)
792 def delete_linux_bridge(self
, vlan
):
794 Delete a linux bridge in a scpecific compute.
795 :param vlan: vlan port id
796 :return: True if success
798 command
= 'sudo ifconfig ovim-' + vlan
+ ' down && sudo brctl delbr ovim-' + vlan
799 print self
.name
, ': command:', command
800 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
801 content
= stdout
.read()
802 if len(content
) == 0:
807 def create_ovs_bridge_port(self
, vlan
):
809 Generate a linux bridge and attache the port to a OVS bridge
810 :param vlan: vlan port id
815 self
.create_linux_bridge(vlan
)
816 self
.add_port_to_ovs_bridge(vlan
)
818 def create_linux_bridge(self
, vlan
):
820 Create a linux bridge with STP active
821 :param vlan: netowrk vlan id
824 command
= 'sudo brctl addbr ovim-' + vlan
+ ' && sudo ifconfig ovim-' + vlan
+ ' up'
825 print self
.name
, ': command:', command
826 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
827 content
= stdout
.read()
829 if len(content
) != 0:
832 command
= 'sudo brctl stp ovim-' + vlan
+ ' on'
833 print self
.name
, ': command:', command
834 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
835 content
= stdout
.read()
837 if len(content
) == 0:
842 def create_ovs_vxlan_tunnel(self
, vxlan_interface
, remote_ip
):
844 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
845 :param vxlan_interface: vlxan inteface name.
846 :param remote_ip: tunnel endpoint remote compute ip.
851 command
= 'sudo ovs-vsctl add-port br-int ' + vxlan_interface
+ \
852 ' -- set Interface ' + vxlan_interface
+ ' type=vxlan options:remote_ip=' + remote_ip
+ \
853 ' -- set Port ' + vxlan_interface
+ ' other_config:stp-path-cost=10'
854 print self
.name
, ': command:', command
855 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
856 content
= stdout
.read()
858 if len(content
) == 0:
863 def delete_ovs_vxlan_tunnel(self
, vxlan_interface
):
865 Delete a vlxan tunnel port from a OVS brdige.
866 :param vxlan_interface: vlxan name to be delete it.
867 :return: True if success.
871 command
= 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
872 print self
.name
, ': command:', command
873 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
874 content
= stdout
.read()
876 if len(content
) == 0:
881 def delete_ovs_bridge(self
):
883 Delete a OVS bridge from a compute.
884 :return: True if success
888 command
= 'sudo ovs-vsctl del-br br-int'
889 print self
.name
, ': command:', command
890 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
891 content
= stdout
.read()
892 if len(content
) == 0:
897 def get_file_info(self
, path
):
898 command
= 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
899 print self
.name
, ': command:', command
900 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
901 content
= stdout
.read()
902 if len(content
) == 0:
903 return None # file does not exist
905 return content
.split(" ") #(permission, 1, owner, group, size, date, file)
907 def qemu_get_info(self
, path
):
908 command
= 'qemu-img info ' + path
909 print self
.name
, ': command:', command
910 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
911 content
= stdout
.read()
912 if len(content
) == 0:
913 error
= stderr
.read()
914 print self
.name
, ": get_qemu_info error ", error
915 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info: " + error
)
918 return yaml
.load(content
)
919 except yaml
.YAMLError
as exc
:
921 if hasattr(exc
, 'problem_mark'):
922 mark
= exc
.problem_mark
923 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
924 print self
.name
, ": get_qemu_info yaml format Exception", text
925 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info yaml format" + text
)
927 def qemu_change_backing(self
, inc_file
, new_backing_file
):
928 command
= 'qemu-img rebase -u -b ' + new_backing_file
+ ' ' + inc_file
929 print self
.name
, ': command:', command
930 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
931 content
= stderr
.read()
932 if len(content
) == 0:
935 print self
.name
, ": qemu_change_backing error: ", content
938 def get_notused_filename(self
, proposed_name
, suffix
=''):
939 '''Look for a non existing file_name in the host
940 proposed_name: proposed file name, includes path
941 suffix: suffix to be added to the name, before the extention
943 extension
= proposed_name
.rfind(".")
944 slash
= proposed_name
.rfind("/")
945 if extension
< 0 or extension
< slash
: # no extension
946 extension
= len(proposed_name
)
947 target_name
= proposed_name
[:extension
] + suffix
+ proposed_name
[extension
:]
948 info
= self
.get_file_info(target_name
)
953 while info
is not None:
954 target_name
= proposed_name
[:extension
] + suffix
+ "-" + str(index
) + proposed_name
[extension
:]
956 info
= self
.get_file_info(target_name
)
959 def get_notused_path(self
, proposed_path
, suffix
=''):
960 '''Look for a non existing path at database for images
961 proposed_path: proposed file name, includes path
962 suffix: suffix to be added to the name, before the extention
964 extension
= proposed_path
.rfind(".")
966 extension
= len(proposed_path
)
968 target_path
= proposed_path
[:extension
] + suffix
+ proposed_path
[extension
:]
971 r
,_
=self
.db
.get_table(FROM
="images",WHERE
={"path":target_path
})
974 target_path
= proposed_path
[:extension
] + suffix
+ "-" + str(index
) + proposed_path
[extension
:]
978 def delete_file(self
, file_name
):
979 command
= 'rm -f '+file_name
980 print self
.name
, ': command:', command
981 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
982 error_msg
= stderr
.read()
983 if len(error_msg
) > 0:
984 raise paramiko
.ssh_exception
.SSHException("Error deleting file: " + error_msg
)
986 def copy_file(self
, source
, destination
, perserve_time
=True):
987 if source
[0:4]=="http":
988 command
= "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
989 dst
=destination
, src
=source
, dst_result
=destination
+ ".result" )
991 command
= 'cp --no-preserve=mode'
993 command
+= ' --preserve=timestamps'
994 command
+= " '{}' '{}'".format(source
, destination
)
995 print self
.name
, ': command:', command
996 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
997 error_msg
= stderr
.read()
998 if len(error_msg
) > 0:
999 raise paramiko
.ssh_exception
.SSHException("Error copying image to local host: " + error_msg
)
1001 def copy_remote_file(self
, remote_file
, use_incremental
):
1002 ''' Copy a file from the repository to local folder and recursively
1003 copy the backing files in case the remote file is incremental
1004 Read and/or modified self.localinfo['files'] that contain the
1005 unmodified copies of images in the local path
1007 remote_file: path of remote file
1008 use_incremental: None (leave the decision to this function), True, False
1010 local_file: name of local file
1011 qemu_info: dict with quemu information of local file
1012 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1015 use_incremental_out
= use_incremental
1016 new_backing_file
= None
1018 file_from_local
= True
1020 #in case incremental use is not decided, take the decision depending on the image
1021 #avoid the use of incremental if this image is already incremental
1022 if remote_file
[0:4] == "http":
1023 file_from_local
= False
1025 qemu_remote_info
= self
.qemu_get_info(remote_file
)
1026 if use_incremental_out
==None:
1027 use_incremental_out
= not ( file_from_local
and 'backing file' in qemu_remote_info
)
1028 #copy recursivelly the backing files
1029 if file_from_local
and 'backing file' in qemu_remote_info
:
1030 new_backing_file
, _
, _
= self
.copy_remote_file(qemu_remote_info
['backing file'], True)
1032 #check if remote file is present locally
1033 if use_incremental_out
and remote_file
in self
.localinfo
['files']:
1034 local_file
= self
.localinfo
['files'][remote_file
]
1035 local_file_info
= self
.get_file_info(local_file
)
1037 remote_file_info
= self
.get_file_info(remote_file
)
1038 if local_file_info
== None:
1040 elif file_from_local
and (local_file_info
[4]!=remote_file_info
[4] or local_file_info
[5]!=remote_file_info
[5]):
1041 #local copy of file not valid because date or size are different.
1042 #TODO DELETE local file if this file is not used by any active virtual machine
1044 self
.delete_file(local_file
)
1045 del self
.localinfo
['files'][remote_file
]
1049 else: #check that the local file has the same backing file, or there are not backing at all
1050 qemu_info
= self
.qemu_get_info(local_file
)
1051 if new_backing_file
!= qemu_info
.get('backing file'):
1055 if local_file
== None: #copy the file
1056 img_name
= remote_file
.split('/') [-1]
1057 img_local
= self
.image_path
+ '/' + img_name
1058 local_file
= self
.get_notused_filename(img_local
)
1059 self
.copy_file(remote_file
, local_file
, use_incremental_out
)
1061 if use_incremental_out
:
1062 self
.localinfo
['files'][remote_file
] = local_file
1063 if new_backing_file
:
1064 self
.qemu_change_backing(local_file
, new_backing_file
)
1065 qemu_info
= self
.qemu_get_info(local_file
)
1067 return local_file
, qemu_info
, use_incremental_out
1069 def launch_server(self
, conn
, server
, rebuild
=False, domain
=None):
1071 time
.sleep(random
.randint(20,150)) #sleep random timeto be make it a bit more real
1074 server_id
= server
['uuid']
1075 paused
= server
.get('paused','no')
1077 if domain
!=None and rebuild
==False:
1079 #self.server_status[server_id] = 'ACTIVE'
1082 self
.db_lock
.acquire()
1083 result
, server_data
= self
.db
.get_instance(server_id
)
1084 self
.db_lock
.release()
1086 print self
.name
, ": launch_server ERROR getting server from DB",result
, server_data
1087 return result
, server_data
1089 #0: get image metadata
1090 server_metadata
= server
.get('metadata', {})
1091 use_incremental
= None
1093 if "use_incremental" in server_metadata
:
1094 use_incremental
= False if server_metadata
["use_incremental"]=="no" else True
1096 server_host_files
= self
.localinfo
['server_files'].get( server
['uuid'], {})
1098 #delete previous incremental files
1099 for file_
in server_host_files
.values():
1100 self
.delete_file(file_
['source file'] )
1101 server_host_files
={}
1103 #1: obtain aditional devices (disks)
1104 #Put as first device the main disk
1105 devices
= [ {"type":"disk", "image_id":server
['image_id'], "vpci":server_metadata
.get('vpci', None) } ]
1106 if 'extended' in server_data
and server_data
['extended']!=None and "devices" in server_data
['extended']:
1107 devices
+= server_data
['extended']['devices']
1110 if dev
['image_id'] == None:
1113 self
.db_lock
.acquire()
1114 result
, content
= self
.db
.get_table(FROM
='images', SELECT
=('path','metadata'),WHERE
={'uuid':dev
['image_id']} )
1115 self
.db_lock
.release()
1117 error_text
= "ERROR", result
, content
, "when getting image", dev
['image_id']
1118 print self
.name
, ": launch_server", error_text
1119 return -1, error_text
1120 if content
[0]['metadata'] is not None:
1121 dev
['metadata'] = json
.loads(content
[0]['metadata'])
1123 dev
['metadata'] = {}
1125 if dev
['image_id'] in server_host_files
:
1126 dev
['source file'] = server_host_files
[ dev
['image_id'] ] ['source file'] #local path
1127 dev
['file format'] = server_host_files
[ dev
['image_id'] ] ['file format'] # raw or qcow2
1130 #2: copy image to host
1131 remote_file
= content
[0]['path']
1132 use_incremental_image
= use_incremental
1133 if dev
['metadata'].get("use_incremental") == "no":
1134 use_incremental_image
= False
1135 local_file
, qemu_info
, use_incremental_image
= self
.copy_remote_file(remote_file
, use_incremental_image
)
1137 #create incremental image
1138 if use_incremental_image
:
1139 local_file_inc
= self
.get_notused_filename(local_file
, '.inc')
1140 command
= 'qemu-img create -f qcow2 '+local_file_inc
+ ' -o backing_file='+ local_file
1141 print 'command:', command
1142 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1143 error_msg
= stderr
.read()
1144 if len(error_msg
) > 0:
1145 raise paramiko
.ssh_exception
.SSHException("Error creating incremental file: " + error_msg
)
1146 local_file
= local_file_inc
1147 qemu_info
= {'file format':'qcow2'}
1149 server_host_files
[ dev
['image_id'] ] = {'source file': local_file
, 'file format': qemu_info
['file format']}
1151 dev
['source file'] = local_file
1152 dev
['file format'] = qemu_info
['file format']
1154 self
.localinfo
['server_files'][ server
['uuid'] ] = server_host_files
1155 self
.localinfo_dirty
= True
1158 result
, xml
= self
.create_xml_server(server_data
, devices
, server_metadata
) #local_file
1160 print self
.name
, ": create xml server error:", xml
1162 print self
.name
, ": create xml:", xml
1163 atribute
= host_thread
.lvirt_module
.VIR_DOMAIN_START_PAUSED
if paused
== "yes" else 0
1165 if not rebuild
: #ensures that any pending destroying server is done
1166 self
.server_forceoff(True)
1167 #print self.name, ": launching instance" #, xml
1168 conn
.createXML(xml
, atribute
)
1169 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1173 except paramiko
.ssh_exception
.SSHException
as e
:
1175 print self
.name
, ": launch_server(%s) ssh Exception: %s" %(server_id
, text
)
1176 if "SSH session not active" in text
:
1178 except host_thread
.lvirt_module
.libvirtError
as e
:
1179 text
= e
.get_error_message()
1180 print self
.name
, ": launch_server(%s) libvirt Exception: %s" %(server_id
, text
)
1181 except Exception as e
:
1183 print self
.name
, ": launch_server(%s) Exception: %s" %(server_id
, text
)
1186 def update_servers_status(self
):
1188 # VIR_DOMAIN_NOSTATE = 0
1189 # VIR_DOMAIN_RUNNING = 1
1190 # VIR_DOMAIN_BLOCKED = 2
1191 # VIR_DOMAIN_PAUSED = 3
1192 # VIR_DOMAIN_SHUTDOWN = 4
1193 # VIR_DOMAIN_SHUTOFF = 5
1194 # VIR_DOMAIN_CRASHED = 6
1195 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1197 if self
.test
or len(self
.server_status
)==0:
1201 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1202 domains
= conn
.listAllDomains()
1204 for domain
in domains
:
1205 uuid
= domain
.UUIDString() ;
1206 libvirt_status
= domain
.state()
1207 #print libvirt_status
1208 if libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_RUNNING
or libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTDOWN
:
1209 new_status
= "ACTIVE"
1210 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_PAUSED
:
1211 new_status
= "PAUSED"
1212 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTOFF
:
1213 new_status
= "INACTIVE"
1214 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_CRASHED
:
1215 new_status
= "ERROR"
1218 domain_dict
[uuid
] = new_status
1220 except host_thread
.lvirt_module
.libvirtError
as e
:
1221 print self
.name
, ": get_state() Exception '", e
.get_error_message()
1224 for server_id
, current_status
in self
.server_status
.iteritems():
1226 if server_id
in domain_dict
:
1227 new_status
= domain_dict
[server_id
]
1229 new_status
= "INACTIVE"
1231 if new_status
== None or new_status
== current_status
:
1233 if new_status
== 'INACTIVE' and current_status
== 'ERROR':
1234 continue #keep ERROR status, because obviously this machine is not running
1236 print self
.name
, ": server ", server_id
, "status change from ", current_status
, "to", new_status
1237 STATUS
={'progress':100, 'status':new_status
}
1238 if new_status
== 'ERROR':
1239 STATUS
['last_error'] = 'machine has crashed'
1240 self
.db_lock
.acquire()
1241 r
,_
= self
.db
.update_rows('instances', STATUS
, {'uuid':server_id
}, log
=False)
1242 self
.db_lock
.release()
1244 self
.server_status
[server_id
] = new_status
1246 def action_on_server(self
, req
, last_retry
=True):
1247 '''Perform an action on a req
1249 req: dictionary that contain:
1250 server properties: 'uuid','name','tenant_id','status'
1252 host properties: 'user', 'ip_name'
1253 return (error, text)
1254 0: No error. VM is updated to new state,
1255 -1: Invalid action, as trying to pause a PAUSED VM
1256 -2: Error accessing host
1258 -4: Error at DB access
1259 -5: Error while trying to perform action. VM is updated to ERROR
1261 server_id
= req
['uuid']
1264 old_status
= req
['status']
1268 if 'terminate' in req
['action']:
1269 new_status
= 'deleted'
1270 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action'] or 'forceOff' in req
['action']:
1271 if req
['status']!='ERROR':
1273 new_status
= 'INACTIVE'
1274 elif 'start' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1275 elif 'resume' in req
['action'] and req
['status']!='ERROR' and req
['status']!='INACTIVE' : new_status
= 'ACTIVE'
1276 elif 'pause' in req
['action'] and req
['status']!='ERROR': new_status
= 'PAUSED'
1277 elif 'reboot' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1278 elif 'rebuild' in req
['action']:
1279 time
.sleep(random
.randint(20,150))
1280 new_status
= 'ACTIVE'
1281 elif 'createImage' in req
['action']:
1283 self
.create_image(None, req
)
1286 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1288 dom
= conn
.lookupByUUIDString(server_id
)
1289 except host_thread
.lvirt_module
.libvirtError
as e
:
1290 text
= e
.get_error_message()
1291 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1294 print self
.name
, ": action_on_server(",server_id
,") libvirt exception:", text
1297 if 'forceOff' in req
['action']:
1299 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1302 print self
.name
, ": sending DESTROY to server", server_id
1304 except Exception as e
:
1305 if "domain is not running" not in e
.get_error_message():
1306 print self
.name
, ": action_on_server(",server_id
,") Exception while sending force off:", e
.get_error_message()
1307 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1308 new_status
= 'ERROR'
1310 elif 'terminate' in req
['action']:
1312 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1313 new_status
= 'deleted'
1316 if req
['action']['terminate'] == 'force':
1317 print self
.name
, ": sending DESTROY to server", server_id
1319 new_status
= 'deleted'
1321 print self
.name
, ": sending SHUTDOWN to server", server_id
1323 self
.pending_terminate_server
.append( (time
.time()+10,server_id
) )
1324 except Exception as e
:
1325 print self
.name
, ": action_on_server(",server_id
,") Exception while destroy:", e
.get_error_message()
1326 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1327 new_status
= 'ERROR'
1328 if "domain is not running" in e
.get_error_message():
1331 new_status
= 'deleted'
1333 print self
.name
, ": action_on_server(",server_id
,") Exception while undefine:", e
.get_error_message()
1334 last_error
= 'action_on_server Exception2 while undefine:', e
.get_error_message()
1335 #Exception: 'virDomainDetachDevice() failed'
1336 if new_status
=='deleted':
1337 if server_id
in self
.server_status
:
1338 del self
.server_status
[server_id
]
1339 if req
['uuid'] in self
.localinfo
['server_files']:
1340 for file_
in self
.localinfo
['server_files'][ req
['uuid'] ].values():
1342 self
.delete_file(file_
['source file'])
1345 del self
.localinfo
['server_files'][ req
['uuid'] ]
1346 self
.localinfo_dirty
= True
1348 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action']:
1351 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1354 # new_status = 'INACTIVE'
1355 #TODO: check status for changing at database
1356 except Exception as e
:
1357 new_status
= 'ERROR'
1358 print self
.name
, ": action_on_server(",server_id
,") Exception while shutdown:", e
.get_error_message()
1359 last_error
= 'action_on_server Exception while shutdown: ' + e
.get_error_message()
1361 elif 'rebuild' in req
['action']:
1364 r
= self
.launch_server(conn
, req
, True, None)
1366 new_status
= 'ERROR'
1369 new_status
= 'ACTIVE'
1370 elif 'start' in req
['action']:
1371 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1372 rebuild
= True if req
['action']['start'] == 'rebuild' else False
1373 r
= self
.launch_server(conn
, req
, rebuild
, dom
)
1375 new_status
= 'ERROR'
1378 new_status
= 'ACTIVE'
1380 elif 'resume' in req
['action']:
1386 # new_status = 'ACTIVE'
1387 except Exception as e
:
1388 print self
.name
, ": action_on_server(",server_id
,") Exception while resume:", e
.get_error_message()
1390 elif 'pause' in req
['action']:
1396 # new_status = 'PAUSED'
1397 except Exception as e
:
1398 print self
.name
, ": action_on_server(",server_id
,") Exception while pause:", e
.get_error_message()
1400 elif 'reboot' in req
['action']:
1406 print self
.name
, ": action_on_server(",server_id
,") reboot:"
1407 #new_status = 'ACTIVE'
1408 except Exception as e
:
1409 print self
.name
, ": action_on_server(",server_id
,") Exception while reboot:", e
.get_error_message()
1410 elif 'createImage' in req
['action']:
1411 self
.create_image(dom
, req
)
1415 except host_thread
.lvirt_module
.libvirtError
as e
:
1416 if conn
is not None: conn
.close()
1417 text
= e
.get_error_message()
1418 new_status
= "ERROR"
1420 print self
.name
, ": action_on_server(",server_id
,") Exception '", text
1421 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1422 print self
.name
, ": action_on_server(",server_id
,") Exception removed from host"
1423 #end of if self.test
1424 if new_status
== None:
1427 print self
.name
, ": action_on_server(",server_id
,") new status", new_status
, last_error
1428 UPDATE
= {'progress':100, 'status':new_status
}
1430 if new_status
=='ERROR':
1431 if not last_retry
: #if there will be another retry do not update database
1433 elif 'terminate' in req
['action']:
1434 #PUT a log in the database
1435 print self
.name
, ": PANIC deleting server", server_id
, last_error
1436 self
.db_lock
.acquire()
1437 self
.db
.new_row('logs',
1438 {'uuid':server_id
, 'tenant_id':req
['tenant_id'], 'related':'instances','level':'panic',
1439 'description':'PANIC deleting server from host '+self
.name
+': '+last_error
}
1441 self
.db_lock
.release()
1442 if server_id
in self
.server_status
:
1443 del self
.server_status
[server_id
]
1446 UPDATE
['last_error'] = last_error
1447 if new_status
!= 'deleted' and (new_status
!= old_status
or new_status
== 'ERROR') :
1448 self
.db_lock
.acquire()
1449 self
.db
.update_rows('instances', UPDATE
, {'uuid':server_id
}, log
=True)
1450 self
.server_status
[server_id
] = new_status
1451 self
.db_lock
.release()
1452 if new_status
== 'ERROR':
1457 def restore_iface(self
, name
, mac
, lib_conn
=None):
1458 ''' make an ifdown, ifup to restore default parameter of na interface
1460 mac: mac address of the interface
1461 lib_conn: connection to the libvirt, if None a new connection is created
1462 Return 0,None if ok, -1,text if fails
1468 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1472 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1476 #wait to the pending VM deletion
1477 #TODO.Revise self.server_forceoff(True)
1479 iface
= conn
.interfaceLookupByMACString(mac
)
1482 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1483 except host_thread
.lvirt_module
.libvirtError
as e
:
1484 error_text
= e
.get_error_message()
1485 print self
.name
, ": restore_iface '%s' '%s' libvirt exception: %s" %(name
, mac
, error_text
)
1488 if lib_conn
is None and conn
is not None:
1490 return ret
, error_text
1493 def create_image(self
,dom
, req
):
1495 if 'path' in req
['action']['createImage']:
1496 file_dst
= req
['action']['createImage']['path']
1498 createImage
=req
['action']['createImage']
1499 img_name
= createImage
['source']['path']
1500 index
=img_name
.rfind('/')
1501 file_dst
= self
.get_notused_path(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1502 image_status
='ACTIVE'
1506 server_id
= req
['uuid']
1507 createImage
=req
['action']['createImage']
1508 file_orig
= self
.localinfo
['server_files'][server_id
] [ createImage
['source']['image_id'] ] ['source file']
1509 if 'path' in req
['action']['createImage']:
1510 file_dst
= req
['action']['createImage']['path']
1512 img_name
= createImage
['source']['path']
1513 index
=img_name
.rfind('/')
1514 file_dst
= self
.get_notused_filename(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1516 self
.copy_file(file_orig
, file_dst
)
1517 qemu_info
= self
.qemu_get_info(file_orig
)
1518 if 'backing file' in qemu_info
:
1519 for k
,v
in self
.localinfo
['files'].items():
1520 if v
==qemu_info
['backing file']:
1521 self
.qemu_change_backing(file_dst
, k
)
1523 image_status
='ACTIVE'
1525 except paramiko
.ssh_exception
.SSHException
as e
:
1526 image_status
='ERROR'
1527 error_text
= e
.args
[0]
1528 print self
.name
, "': create_image(",server_id
,") ssh Exception:", error_text
1529 if "SSH session not active" in error_text
and retry
==0:
1531 except Exception as e
:
1532 image_status
='ERROR'
1534 print self
.name
, "': create_image(",server_id
,") Exception:", error_text
1536 #TODO insert a last_error at database
1537 self
.db_lock
.acquire()
1538 self
.db
.update_rows('images', {'status':image_status
, 'progress': 100, 'path':file_dst
},
1539 {'uuid':req
['new_image']['uuid']}, log
=True)
1540 self
.db_lock
.release()
1542 def edit_iface(self
, port_id
, old_net
, new_net
):
1543 #This action imply remove and insert interface to put proper parameters
1548 self
.db_lock
.acquire()
1549 r
,c
= self
.db
.get_table(FROM
='ports as p join resources_port as rp on p.uuid=rp.port_id',
1550 WHERE
={'port_id': port_id
})
1551 self
.db_lock
.release()
1553 print self
.name
, ": edit_iface(",port_id
,") DDBB error:", c
1556 print self
.name
, ": edit_iface(",port_id
,") por not found"
1559 if port
["model"]!="VF":
1560 print self
.name
, ": edit_iface(",port_id
,") ERROR model must be VF"
1562 #create xml detach file
1565 xml
.append("<interface type='hostdev' managed='yes'>")
1566 xml
.append(" <mac address='" +port
['mac']+ "'/>")
1567 xml
.append(" <source>"+ self
.pci2xml(port
['pci'])+"\n </source>")
1568 xml
.append('</interface>')
1573 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1574 dom
= conn
.lookupByUUIDString(port
["instance_id"])
1577 print self
.name
, ": edit_iface detaching SRIOV interface", text
1578 dom
.detachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
1580 xml
[-1] =" <vlan> <tag id='" + str(port
['vlan']) + "'/> </vlan>"
1582 xml
.append(self
.pci2xml(port
.get('vpci',None)) )
1583 xml
.append('</interface>')
1585 print self
.name
, ": edit_iface attaching SRIOV interface", text
1586 dom
.attachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
1588 except host_thread
.lvirt_module
.libvirtError
as e
:
1589 text
= e
.get_error_message()
1590 print self
.name
, ": edit_iface(",port
["instance_id"],") libvirt exception:", text
1593 if conn
is not None: conn
.close()
1596 def create_server(server
, db
, db_lock
, only_of_ports
):
1603 # host_id = server.get('host_id', None)
1604 extended
= server
.get('extended', None)
1606 # print '----------------------'
1607 # print json.dumps(extended, indent=4)
1610 requirements
['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1611 requirements
['ram'] = server
['flavor'].get('ram', 0)
1612 if requirements
['ram']== None:
1613 requirements
['ram'] = 0
1614 requirements
['vcpus'] = server
['flavor'].get('vcpus', 0)
1615 if requirements
['vcpus']== None:
1616 requirements
['vcpus'] = 0
1617 #If extended is not defined get requirements from flavor
1618 if extended
is None:
1619 #If extended is defined in flavor convert to dictionary and use it
1620 if 'extended' in server
['flavor'] and server
['flavor']['extended'] != None:
1621 json_acceptable_string
= server
['flavor']['extended'].replace("'", "\"")
1622 extended
= json
.loads(json_acceptable_string
)
1625 #print json.dumps(extended, indent=4)
1627 #For simplicity only one numa VM are supported in the initial implementation
1628 if extended
!= None:
1629 numas
= extended
.get('numas', [])
1631 return (-2, "Multi-NUMA VMs are not supported yet")
1633 # return (-1, "At least one numa must be specified")
1635 #a for loop is used in order to be ready to multi-NUMA VMs
1639 numa_req
['memory'] = numa
.get('memory', 0)
1641 numa_req
['proc_req_nb'] = numa
['cores'] #number of cores or threads to be reserved
1642 numa_req
['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1643 numa_req
['proc_req_list'] = numa
.get('cores-id', None) #list of ids to be assigned to the cores or threads
1644 elif 'paired-threads' in numa
:
1645 numa_req
['proc_req_nb'] = numa
['paired-threads']
1646 numa_req
['proc_req_type'] = 'paired-threads'
1647 numa_req
['proc_req_list'] = numa
.get('paired-threads-id', None)
1648 elif 'threads' in numa
:
1649 numa_req
['proc_req_nb'] = numa
['threads']
1650 numa_req
['proc_req_type'] = 'threads'
1651 numa_req
['proc_req_list'] = numa
.get('threads-id', None)
1653 numa_req
['proc_req_nb'] = 0 # by default
1654 numa_req
['proc_req_type'] = 'threads'
1658 #Generate a list of sriov and another for physical interfaces
1659 interfaces
= numa
.get('interfaces', [])
1662 for iface
in interfaces
:
1663 iface
['bandwidth'] = int(iface
['bandwidth'])
1664 if iface
['dedicated'][:3]=='yes':
1665 port_list
.append(iface
)
1667 sriov_list
.append(iface
)
1669 #Save lists ordered from more restrictive to less bw requirements
1670 numa_req
['sriov_list'] = sorted(sriov_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1671 numa_req
['port_list'] = sorted(port_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1674 request
.append(numa_req
)
1676 # print "----------\n"+json.dumps(request[0], indent=4)
1677 # print '----------\n\n'
1679 #Search in db for an appropriate numa for each requested numa
1680 #at the moment multi-NUMA VMs are not supported
1682 requirements
['numa'].update(request
[0])
1683 if requirements
['numa']['memory']>0:
1684 requirements
['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1685 elif requirements
['ram']==0:
1686 return (-1, "Memory information not set neither at extended field not at ram")
1687 if requirements
['numa']['proc_req_nb']>0:
1688 requirements
['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1689 elif requirements
['vcpus']==0:
1690 return (-1, "Processor information not set neither at extended field not at vcpus")
1694 result
, content
= db
.get_numas(requirements
, server
.get('host_id', None), only_of_ports
)
1698 return (-1, content
)
1700 numa_id
= content
['numa_id']
1701 host_id
= content
['host_id']
1703 #obtain threads_id and calculate pinning
1706 if requirements
['numa']['proc_req_nb']>0:
1708 result
, content
= db
.get_table(FROM
='resources_core',
1709 SELECT
=('id','core_id','thread_id'),
1710 WHERE
={'numa_id':numa_id
,'instance_id': None, 'status':'ok'} )
1716 #convert rows to a dictionary indexed by core_id
1719 if not row
['core_id'] in cores_dict
:
1720 cores_dict
[row
['core_id']] = []
1721 cores_dict
[row
['core_id']].append([row
['thread_id'],row
['id']])
1723 #In case full cores are requested
1725 if requirements
['numa']['proc_req_type'] == 'cores':
1726 #Get/create the list of the vcpu_ids
1727 vcpu_id_list
= requirements
['numa']['proc_req_list']
1728 if vcpu_id_list
== None:
1729 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1731 for threads
in cores_dict
.itervalues():
1733 if len(threads
) != 2:
1736 #set pinning for the first thread
1737 cpu_pinning
.append( [ vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1] ] )
1739 #reserve so it is not used the second thread
1740 reserved_threads
.append(threads
[1][1])
1742 if len(vcpu_id_list
) == 0:
1745 #In case paired threads are requested
1746 elif requirements
['numa']['proc_req_type'] == 'paired-threads':
1748 #Get/create the list of the vcpu_ids
1749 if requirements
['numa']['proc_req_list'] != None:
1751 for pair
in requirements
['numa']['proc_req_list']:
1753 return -1, "Field paired-threads-id not properly specified"
1755 vcpu_id_list
.append(pair
[0])
1756 vcpu_id_list
.append(pair
[1])
1758 vcpu_id_list
= range(0,2*int(requirements
['numa']['proc_req_nb']))
1760 for threads
in cores_dict
.itervalues():
1762 if len(threads
) != 2:
1764 #set pinning for the first thread
1765 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1767 #set pinning for the second thread
1768 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1770 if len(vcpu_id_list
) == 0:
1773 #In case normal threads are requested
1774 elif requirements
['numa']['proc_req_type'] == 'threads':
1775 #Get/create the list of the vcpu_ids
1776 vcpu_id_list
= requirements
['numa']['proc_req_list']
1777 if vcpu_id_list
== None:
1778 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1780 for threads_index
in sorted(cores_dict
, key
=lambda k
: len(cores_dict
[k
])):
1781 threads
= cores_dict
[threads_index
]
1782 #set pinning for the first thread
1783 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1785 #if exists, set pinning for the second thread
1786 if len(threads
) == 2 and len(vcpu_id_list
) != 0:
1787 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1789 if len(vcpu_id_list
) == 0:
1792 #Get the source pci addresses for the selected numa
1793 used_sriov_ports
= []
1794 for port
in requirements
['numa']['sriov_list']:
1796 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1802 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1804 port
['pci'] = row
['pci']
1805 if 'mac_address' not in port
:
1806 port
['mac_address'] = row
['mac']
1808 port
['port_id']=row
['id']
1809 port
['Mbps_used'] = port
['bandwidth']
1810 used_sriov_ports
.append(row
['id'])
1813 for port
in requirements
['numa']['port_list']:
1814 port
['Mbps_used'] = None
1815 if port
['dedicated'] != "yes:sriov":
1816 port
['mac_address'] = port
['mac']
1820 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac', 'Mbps'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1825 port
['Mbps_used'] = content
[0]['Mbps']
1827 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1829 port
['pci'] = row
['pci']
1830 if 'mac_address' not in port
:
1831 port
['mac_address'] = row
['mac'] # mac cannot be set to passthrough ports
1833 port
['port_id']=row
['id']
1834 used_sriov_ports
.append(row
['id'])
1837 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1838 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1840 server
['host_id'] = host_id
1843 #Generate dictionary for saving in db the instance resources
1845 resources
['bridged-ifaces'] = []
1848 numa_dict
['interfaces'] = []
1850 numa_dict
['interfaces'] += requirements
['numa']['port_list']
1851 numa_dict
['interfaces'] += requirements
['numa']['sriov_list']
1853 #Check bridge information
1854 unified_dataplane_iface
=[]
1855 unified_dataplane_iface
+= requirements
['numa']['port_list']
1856 unified_dataplane_iface
+= requirements
['numa']['sriov_list']
1858 for control_iface
in server
.get('networks', []):
1859 control_iface
['net_id']=control_iface
.pop('uuid')
1860 #Get the brifge name
1862 result
, content
= db
.get_table(FROM
= 'nets',
1863 SELECT
= ('name','type', 'vlan', 'provider'),
1864 WHERE
= {'uuid':control_iface
['net_id']})
1869 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface
['net_id']
1872 if control_iface
.get("type", 'virtual') == 'virtual':
1873 if network
['type']!='bridge_data' and network
['type']!='bridge_man':
1874 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface
['net_id']
1875 resources
['bridged-ifaces'].append(control_iface
)
1876 if network
.get("provider") and network
["provider"][0:3] == "OVS":
1877 control_iface
["type"] = "instance:ovs"
1879 control_iface
["type"] = "instance:bridge"
1880 if network
.get("vlan"):
1881 control_iface
["vlan"] = network
["vlan"]
1883 if network
['type']!='data' and network
['type']!='ptp':
1884 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface
['net_id']
1885 #dataplane interface, look for it in the numa tree and asign this network
1887 for dataplane_iface
in numa_dict
['interfaces']:
1888 if dataplane_iface
['name'] == control_iface
.get("name"):
1889 if (dataplane_iface
['dedicated'] == "yes" and control_iface
["type"] != "PF") or \
1890 (dataplane_iface
['dedicated'] == "no" and control_iface
["type"] != "VF") or \
1891 (dataplane_iface
['dedicated'] == "yes:sriov" and control_iface
["type"] != "VFnotShared") :
1892 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1893 (control_iface
.get("name"), dataplane_iface
['dedicated'], control_iface
["type"])
1894 dataplane_iface
['uuid'] = control_iface
['net_id']
1895 if dataplane_iface
['dedicated'] == "no":
1896 dataplane_iface
['vlan'] = network
['vlan']
1897 if dataplane_iface
['dedicated'] != "yes" and control_iface
.get("mac_address"):
1898 dataplane_iface
['mac_address'] = control_iface
.get("mac_address")
1899 if control_iface
.get("vpci"):
1900 dataplane_iface
['vpci'] = control_iface
.get("vpci")
1904 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface
.get("name")
1906 resources
['host_id'] = host_id
1907 resources
['image_id'] = server
['image_id']
1908 resources
['flavor_id'] = server
['flavor_id']
1909 resources
['tenant_id'] = server
['tenant_id']
1910 resources
['ram'] = requirements
['ram']
1911 resources
['vcpus'] = requirements
['vcpus']
1912 resources
['status'] = 'CREATING'
1914 if 'description' in server
: resources
['description'] = server
['description']
1915 if 'name' in server
: resources
['name'] = server
['name']
1917 resources
['extended'] = {} #optional
1918 resources
['extended']['numas'] = []
1919 numa_dict
['numa_id'] = numa_id
1920 numa_dict
['memory'] = requirements
['numa']['memory']
1921 numa_dict
['cores'] = []
1923 for core
in cpu_pinning
:
1924 numa_dict
['cores'].append({'id': core
[2], 'vthread': core
[0], 'paired': paired
})
1925 for core
in reserved_threads
:
1926 numa_dict
['cores'].append({'id': core
})
1927 resources
['extended']['numas'].append(numa_dict
)
1928 if extended
!=None and 'devices' in extended
: #TODO allow extra devices without numa
1929 resources
['extended']['devices'] = extended
['devices']
1932 print '===================================={'
1933 print json
.dumps(resources
, indent
=4)
1934 print '====================================}'